RLPark 1.0.0
Reinforcement Learning Framework in Java

SocketClient.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.internal.network;
00002 
00003 import java.net.Socket;
00004 import java.util.ArrayList;
00005 import java.util.Collection;
00006 import java.util.HashMap;
00007 import java.util.List;
00008 import java.util.Map;
00009 
00010 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue;
00011 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.ClientInfo;
00012 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Message;
00013 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageClassData;
00014 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageJob;
00015 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageRequestClass;
00016 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageSendClientInfo;
00017 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages.MessageType;
00018 import zephyr.plugin.core.api.signals.Signal;
00019 
00020 public class SocketClient {
00021   static private volatile int nbJobSendPerRequest = 1;
00022   static public final Signal<String> onClassRequested = new Signal<String>();
00023   public final Signal<SocketClient> onClosed = new Signal<SocketClient>();
00024   private final Runnable clientRunnable = new Runnable() {
00025     @Override
00026     public void run() {
00027       try {
00028         clientReadMainLoop();
00029       } catch (Throwable t) {
00030         t.printStackTrace();
00031       }
00032     }
00033   };
00034   private final SyncSocket clientSocket;
00035   private final Thread clientThread = new Thread(clientRunnable, "ServerScheduler-ClientThread");
00036   private final Map<Integer, Runnable> idtoJob = new HashMap<Integer, Runnable>();
00037   private int id;
00038   private ClientInfo clientInfo;
00039   private final JobQueue jobQueue;
00040   private int nbJobDone = 0;
00041 
00042   public SocketClient(JobQueue jobQueue, Socket clientSocket) {
00043     this.jobQueue = jobQueue;
00044     this.clientSocket = new SyncSocket(clientSocket);
00045     clientThread.setPriority(Thread.MAX_PRIORITY);
00046     clientThread.setDaemon(true);
00047   }
00048 
00049   public void start() {
00050     clientThread.start();
00051   }
00052 
00053   public boolean readName() {
00054     Message message = SyncSocket.readNextMessage(clientSocket);
00055     if (message == null || message.type() != MessageType.SendClientName) {
00056       System.err.println("error: client did not declare its name, it is rejected.");
00057       return false;
00058     }
00059     clientInfo = ((MessageSendClientInfo) message).clientInfo();
00060     return true;
00061   }
00062 
00063   @SuppressWarnings("incomplete-switch")
00064   protected void clientReadMainLoop() {
00065     while (!clientSocket.isClosed()) {
00066       Message message = SyncSocket.readNextMessage(clientSocket);
00067       if (message == null)
00068         break;
00069       switch (message.type()) {
00070       case RequestJob:
00071         sendJob();
00072         break;
00073       case Job:
00074         jobDone((MessageJob) message);
00075         break;
00076       case RequestClass:
00077         requestClassData(((MessageRequestClass) message).className());
00078         break;
00079       case SendClientName:
00080         System.err.println("error: a client is trying to change its name");
00081         break;
00082       }
00083     }
00084     close();
00085   }
00086 
00087   private void requestClassData(String className) {
00088     clientSocket.write(new MessageClassData(className));
00089     onClassRequested.fire(className);
00090   }
00091 
00092   synchronized private void jobDone(MessageJob message) {
00093     for (int i = 0; i < message.nbJobs(); i++) {
00094       Runnable todo = idtoJob.remove(message.jobIds()[i]);
00095       if (todo == null)
00096         continue;
00097       jobQueue.done(todo, message.jobs()[i]);
00098       nbJobDone++;
00099     }
00100   }
00101 
00102   synchronized private void sendJob() {
00103     List<Runnable> jobs = new ArrayList<Runnable>();
00104     List<Integer> jobIds = new ArrayList<Integer>();
00105     for (int i = 0; i < nbJobSendPerRequest; i++) {
00106       Runnable todo = jobQueue.request();
00107       if (todo == null)
00108         break;
00109       int jobId = newId();
00110       idtoJob.put(jobId, todo);
00111       jobs.add(todo);
00112       jobIds.add(jobId);
00113     }
00114     clientSocket.write(new MessageJob(jobs, jobIds));
00115   }
00116 
00117   private int newId() {
00118     id++;
00119     if (id < 0)
00120       id = 0;
00121     return id;
00122   }
00123 
00124   public void close() {
00125     clientSocket.close();
00126     onClosed.fire(this);
00127   }
00128 
00129   public ClientInfo clientInfo() {
00130     return clientInfo;
00131   }
00132 
00133   public Collection<Runnable> pendingJobs() {
00134     return idtoJob.values();
00135   }
00136 
00137   public static void nbJobSendPerRequest(int nbJobSendPerRequest) {
00138     SocketClient.nbJobSendPerRequest = Math.min(Math.max(1, nbJobSendPerRequest), 20);
00139   }
00140 
00141   public int nbJobDone() {
00142     return nbJobDone;
00143   }
00144 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark