RLPark 1.0.0
Reinforcement Learning Framework in Java

ServerScheduler.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.network;
00002 
00003 import java.io.IOException;
00004 import java.net.ServerSocket;
00005 import java.net.Socket;
00006 import java.util.ArrayList;
00007 import java.util.Collection;
00008 import java.util.Collections;
00009 import java.util.HashSet;
00010 import java.util.Set;
00011 
00012 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent;
00013 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler;
00014 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages;
00015 import rlpark.plugin.rltoys.experiments.scheduling.internal.network.SocketClient;
00016 import rlpark.plugin.rltoys.experiments.scheduling.internal.serverlog.ServerLog;
00017 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue;
00018 import rlpark.plugin.rltoys.experiments.scheduling.schedulers.LocalScheduler;
00019 import zephyr.plugin.core.api.signals.Listener;
00020 import zephyr.plugin.core.api.signals.Signal;
00021 
00022 public class ServerScheduler implements Scheduler {
00023   static final public double StatPeriod = 3600;
00024 
00025   public final class AcceptClientsRunnable implements Runnable {
00026     private ServerSocket serverSocket = null;
00027     private boolean terminate = false;
00028 
00029     AcceptClientsRunnable(ServerSocket serverSocket) {
00030       this.serverSocket = serverSocket;
00031     }
00032 
00033     @Override
00034     public void run() {
00035       if (serverSocket == null)
00036         return;
00037       Messages.println("Listening on port " + serverSocket.getLocalPort() + "...");
00038       while (!terminate) {
00039         try {
00040           Socket clientSocket = serverSocket.accept();
00041           SocketClient socketClient = new SocketClient(localQueue, clientSocket);
00042           if (!socketClient.readName()) {
00043             socketClient.close();
00044             continue;
00045           }
00046           addClient(socketClient);
00047           socketClient.start();
00048         } catch (IOException e) {
00049         }
00050       }
00051       terminate();
00052     }
00053 
00054     void terminate() {
00055       terminate = true;
00056       if (serverSocket == null)
00057         return;
00058       Messages.println("Closing port " + serverSocket.getLocalPort());
00059       try {
00060         serverSocket.close();
00061       } catch (IOException e) {
00062         e.printStackTrace();
00063       }
00064     }
00065   }
00066 
00067   class JobStatListener implements Listener<JobDoneEvent> {
00068     @Override
00069     public void listen(JobDoneEvent eventInfo) {
00070       serverLog.jobEvent(eventInfo.done);
00071     }
00072   }
00073 
00074   static final public int DefaultPort = 5000;
00075   static public boolean serverVerbose = true;
00076   public Signal<ServerScheduler> onClientDisconnected = new Signal<ServerScheduler>();
00077   private final Listener<SocketClient> clientClosedListener = new Listener<SocketClient>() {
00078     @Override
00079     public void listen(SocketClient client) {
00080       removeClient(client);
00081       onClientDisconnected.fire(ServerScheduler.this);
00082     }
00083   };
00084   private final AcceptClientsRunnable acceptClientsRunnable;
00085   protected final LocalQueue localQueue = new LocalQueue();
00086   final ServerLog serverLog = new ServerLog();
00087 
00088   private final LocalScheduler localScheduler;
00089   private final Thread serverThread;
00090   private final Set<SocketClient> clients = Collections.synchronizedSet(new HashSet<SocketClient>());
00091 
00092   public ServerScheduler() throws IOException {
00093     this(DefaultPort, LocalScheduler.getDefaultNbThreads());
00094   }
00095 
00096   public ServerScheduler(int port, int nbLocalThread) throws IOException {
00097     ServerSocket serverSocket = new ServerSocket(port);
00098     acceptClientsRunnable = new AcceptClientsRunnable(serverSocket);
00099     serverThread = new Thread(acceptClientsRunnable, "AcceptThread");
00100     serverThread.setDaemon(true);
00101     localScheduler = nbLocalThread > 0 ? new LocalScheduler(nbLocalThread, localQueue) : null;
00102     localQueue.onJobDone().connect(new JobStatListener());
00103     localQueue.enablePoolFromPending();
00104   }
00105 
00106   synchronized protected void addClient(SocketClient client) {
00107     clients.add(client);
00108     client.onClosed.connect(clientClosedListener);
00109     serverLog.clientEvent(clients, client.clientInfo().hostName + " connected");
00110     SocketClient.nbJobSendPerRequest(clients.size());
00111   }
00112 
00113   @Override
00114   public void waitAll() {
00115     LocalQueue.waitAllDone(localQueue);
00116     if (localScheduler != null) {
00117       Throwable exceptionOccured = localScheduler.exceptionOccured();
00118       if (exceptionOccured != null)
00119         throw new RuntimeException(exceptionOccured);
00120     }
00121   }
00122 
00123   @Override
00124   synchronized public void start() {
00125     serverThread.start();
00126     if (localScheduler != null)
00127       localScheduler.start();
00128   }
00129 
00130   synchronized void removeClient(SocketClient client) {
00131     boolean removed = clients.remove(client);
00132     if (!removed)
00133       return;
00134     client.onClosed.disconnect(clientClosedListener);
00135     client.close();
00136     Collection<Runnable> pendingJobs = new ArrayList<Runnable>(client.pendingJobs());
00137     for (Runnable pendingJob : pendingJobs)
00138       localQueue.requestCancel(pendingJob);
00139     String message = String.format("%s disconnected. Canceling %d job(s). Did %d job(s).",
00140                                    client.clientInfo().hostName, pendingJobs.size(), client.nbJobDone());
00141     serverLog.clientEvent(clients, message);
00142     client.close();
00143   }
00144 
00145   @Override
00146   synchronized public void dispose() {
00147     for (SocketClient client : new ArrayList<SocketClient>(clients))
00148       removeClient(client);
00149     acceptClientsRunnable.terminate();
00150     if (localScheduler != null)
00151       localScheduler.dispose();
00152     localQueue.dispose();
00153   }
00154 
00155   public boolean isLocalSchedulingEnabled() {
00156     return localScheduler != null;
00157   }
00158 
00159   @Override
00160   public LocalQueue queue() {
00161     return localQueue;
00162   }
00163 
00164   synchronized public void waitClients() {
00165     System.out.println("All jobs done. Answering to new clients only.");
00166     try {
00167       wait();
00168     } catch (InterruptedException e) {
00169       e.printStackTrace();
00170     }
00171   }
00172 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark