RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.network; 00002 00003 import java.io.IOException; 00004 import java.net.ServerSocket; 00005 import java.net.Socket; 00006 import java.util.ArrayList; 00007 import java.util.Collection; 00008 import java.util.Collections; 00009 import java.util.HashSet; 00010 import java.util.Set; 00011 00012 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00013 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler; 00014 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages; 00015 import rlpark.plugin.rltoys.experiments.scheduling.internal.network.SocketClient; 00016 import rlpark.plugin.rltoys.experiments.scheduling.internal.serverlog.ServerLog; 00017 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue; 00018 import rlpark.plugin.rltoys.experiments.scheduling.schedulers.LocalScheduler; 00019 import zephyr.plugin.core.api.signals.Listener; 00020 import zephyr.plugin.core.api.signals.Signal; 00021 00022 public class ServerScheduler implements Scheduler { 00023 static final public double StatPeriod = 3600; 00024 00025 public final class AcceptClientsRunnable implements Runnable { 00026 private ServerSocket serverSocket = null; 00027 private boolean terminate = false; 00028 00029 AcceptClientsRunnable(ServerSocket serverSocket) { 00030 this.serverSocket = serverSocket; 00031 } 00032 00033 @Override 00034 public void run() { 00035 if (serverSocket == null) 00036 return; 00037 Messages.println("Listening on port " + serverSocket.getLocalPort() + "..."); 00038 while (!terminate) { 00039 try { 00040 Socket clientSocket = serverSocket.accept(); 00041 SocketClient socketClient = new SocketClient(localQueue, clientSocket); 00042 if (!socketClient.readName()) { 00043 socketClient.close(); 00044 continue; 00045 } 00046 addClient(socketClient); 00047 socketClient.start(); 00048 } catch (IOException e) { 00049 } 00050 } 00051 terminate(); 00052 } 00053 00054 void terminate() { 00055 terminate = true; 00056 if (serverSocket == null) 00057 return; 00058 Messages.println("Closing port " + serverSocket.getLocalPort()); 00059 try { 00060 serverSocket.close(); 00061 } catch (IOException e) { 00062 e.printStackTrace(); 00063 } 00064 } 00065 } 00066 00067 class JobStatListener implements Listener<JobDoneEvent> { 00068 @Override 00069 public void listen(JobDoneEvent eventInfo) { 00070 serverLog.jobEvent(eventInfo.done); 00071 } 00072 } 00073 00074 static final public int DefaultPort = 5000; 00075 static public boolean serverVerbose = true; 00076 public Signal<ServerScheduler> onClientDisconnected = new Signal<ServerScheduler>(); 00077 private final Listener<SocketClient> clientClosedListener = new Listener<SocketClient>() { 00078 @Override 00079 public void listen(SocketClient client) { 00080 removeClient(client); 00081 onClientDisconnected.fire(ServerScheduler.this); 00082 } 00083 }; 00084 private final AcceptClientsRunnable acceptClientsRunnable; 00085 protected final LocalQueue localQueue = new LocalQueue(); 00086 final ServerLog serverLog = new ServerLog(); 00087 00088 private final LocalScheduler localScheduler; 00089 private final Thread serverThread; 00090 private final Set<SocketClient> clients = Collections.synchronizedSet(new HashSet<SocketClient>()); 00091 00092 public ServerScheduler() throws IOException { 00093 this(DefaultPort, LocalScheduler.getDefaultNbThreads()); 00094 } 00095 00096 public ServerScheduler(int port, int nbLocalThread) throws IOException { 00097 ServerSocket serverSocket = new ServerSocket(port); 00098 acceptClientsRunnable = new AcceptClientsRunnable(serverSocket); 00099 serverThread = new Thread(acceptClientsRunnable, "AcceptThread"); 00100 serverThread.setDaemon(true); 00101 localScheduler = nbLocalThread > 0 ? new LocalScheduler(nbLocalThread, localQueue) : null; 00102 localQueue.onJobDone().connect(new JobStatListener()); 00103 localQueue.enablePoolFromPending(); 00104 } 00105 00106 synchronized protected void addClient(SocketClient client) { 00107 clients.add(client); 00108 client.onClosed.connect(clientClosedListener); 00109 serverLog.clientEvent(clients, client.clientInfo().hostName + " connected"); 00110 SocketClient.nbJobSendPerRequest(clients.size()); 00111 } 00112 00113 @Override 00114 public void waitAll() { 00115 LocalQueue.waitAllDone(localQueue); 00116 if (localScheduler != null) { 00117 Throwable exceptionOccured = localScheduler.exceptionOccured(); 00118 if (exceptionOccured != null) 00119 throw new RuntimeException(exceptionOccured); 00120 } 00121 } 00122 00123 @Override 00124 synchronized public void start() { 00125 serverThread.start(); 00126 if (localScheduler != null) 00127 localScheduler.start(); 00128 } 00129 00130 synchronized void removeClient(SocketClient client) { 00131 boolean removed = clients.remove(client); 00132 if (!removed) 00133 return; 00134 client.onClosed.disconnect(clientClosedListener); 00135 client.close(); 00136 Collection<Runnable> pendingJobs = new ArrayList<Runnable>(client.pendingJobs()); 00137 for (Runnable pendingJob : pendingJobs) 00138 localQueue.requestCancel(pendingJob); 00139 String message = String.format("%s disconnected. Canceling %d job(s). Did %d job(s).", 00140 client.clientInfo().hostName, pendingJobs.size(), client.nbJobDone()); 00141 serverLog.clientEvent(clients, message); 00142 client.close(); 00143 } 00144 00145 @Override 00146 synchronized public void dispose() { 00147 for (SocketClient client : new ArrayList<SocketClient>(clients)) 00148 removeClient(client); 00149 acceptClientsRunnable.terminate(); 00150 if (localScheduler != null) 00151 localScheduler.dispose(); 00152 localQueue.dispose(); 00153 } 00154 00155 public boolean isLocalSchedulingEnabled() { 00156 return localScheduler != null; 00157 } 00158 00159 @Override 00160 public LocalQueue queue() { 00161 return localQueue; 00162 } 00163 00164 synchronized public void waitClients() { 00165 System.out.println("All jobs done. Answering to new clients only."); 00166 try { 00167 wait(); 00168 } catch (InterruptedException e) { 00169 e.printStackTrace(); 00170 } 00171 } 00172 }