RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.internal.network; 00002 00003 import java.net.Socket; 00004 import java.util.ArrayList; 00005 import java.util.Collection; 00006 import java.util.HashMap; 00007 import java.util.List; 00008 import java.util.Map; 00009 00010 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue; 00011 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.ClientInfo; 00012 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Message; 00013 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageClassData; 00014 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageJob; 00015 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageRequestClass; 00016 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageSendClientInfo; 00017 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages.MessageType; 00018 import zephyr.plugin.core.api.signals.Signal; 00019 00020 public class SocketClient { 00021 static private volatile int nbJobSendPerRequest = 1; 00022 static public final Signal<String> onClassRequested = new Signal<String>(); 00023 public final Signal<SocketClient> onClosed = new Signal<SocketClient>(); 00024 private final Runnable clientRunnable = new Runnable() { 00025 @Override 00026 public void run() { 00027 try { 00028 clientReadMainLoop(); 00029 } catch (Throwable t) { 00030 t.printStackTrace(); 00031 } 00032 } 00033 }; 00034 private final SyncSocket clientSocket; 00035 private final Thread clientThread = new Thread(clientRunnable, "ServerScheduler-ClientThread"); 00036 private final Map<Integer, Runnable> idtoJob = new HashMap<Integer, Runnable>(); 00037 private int id; 00038 private ClientInfo clientInfo; 00039 private final JobQueue jobQueue; 00040 private int nbJobDone = 0; 00041 00042 public SocketClient(JobQueue jobQueue, Socket clientSocket) { 00043 this.jobQueue = jobQueue; 00044 this.clientSocket = new SyncSocket(clientSocket); 00045 clientThread.setPriority(Thread.MAX_PRIORITY); 00046 clientThread.setDaemon(true); 00047 } 00048 00049 public void start() { 00050 clientThread.start(); 00051 } 00052 00053 public boolean readName() { 00054 Message message = SyncSocket.readNextMessage(clientSocket); 00055 if (message == null || message.type() != MessageType.SendClientName) { 00056 System.err.println("error: client did not declare its name, it is rejected."); 00057 return false; 00058 } 00059 clientInfo = ((MessageSendClientInfo) message).clientInfo(); 00060 return true; 00061 } 00062 00063 @SuppressWarnings("incomplete-switch") 00064 protected void clientReadMainLoop() { 00065 while (!clientSocket.isClosed()) { 00066 Message message = SyncSocket.readNextMessage(clientSocket); 00067 if (message == null) 00068 break; 00069 switch (message.type()) { 00070 case RequestJob: 00071 sendJob(); 00072 break; 00073 case Job: 00074 jobDone((MessageJob) message); 00075 break; 00076 case RequestClass: 00077 requestClassData(((MessageRequestClass) message).className()); 00078 break; 00079 case SendClientName: 00080 System.err.println("error: a client is trying to change its name"); 00081 break; 00082 } 00083 } 00084 close(); 00085 } 00086 00087 private void requestClassData(String className) { 00088 clientSocket.write(new MessageClassData(className)); 00089 onClassRequested.fire(className); 00090 } 00091 00092 synchronized private void jobDone(MessageJob message) { 00093 for (int i = 0; i < message.nbJobs(); i++) { 00094 Runnable todo = idtoJob.remove(message.jobIds()[i]); 00095 if (todo == null) 00096 continue; 00097 jobQueue.done(todo, message.jobs()[i]); 00098 nbJobDone++; 00099 } 00100 } 00101 00102 synchronized private void sendJob() { 00103 List<Runnable> jobs = new ArrayList<Runnable>(); 00104 List<Integer> jobIds = new ArrayList<Integer>(); 00105 for (int i = 0; i < nbJobSendPerRequest; i++) { 00106 Runnable todo = jobQueue.request(); 00107 if (todo == null) 00108 break; 00109 int jobId = newId(); 00110 idtoJob.put(jobId, todo); 00111 jobs.add(todo); 00112 jobIds.add(jobId); 00113 } 00114 clientSocket.write(new MessageJob(jobs, jobIds)); 00115 } 00116 00117 private int newId() { 00118 id++; 00119 if (id < 0) 00120 id = 0; 00121 return id; 00122 } 00123 00124 public void close() { 00125 clientSocket.close(); 00126 onClosed.fire(this); 00127 } 00128 00129 public ClientInfo clientInfo() { 00130 return clientInfo; 00131 } 00132 00133 public Collection<Runnable> pendingJobs() { 00134 return idtoJob.values(); 00135 } 00136 00137 public static void nbJobSendPerRequest(int nbJobSendPerRequest) { 00138 SocketClient.nbJobSendPerRequest = Math.min(Math.max(1, nbJobSendPerRequest), 20); 00139 } 00140 00141 public int nbJobDone() { 00142 return nbJobDone; 00143 } 00144 }