RLPark 1.0.0
Reinforcement Learning Framework in Java

NetworkClient.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.network;
00002 
00003 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent;
00004 import rlpark.plugin.rltoys.experiments.scheduling.schedulers.LocalScheduler;
00005 import zephyr.plugin.core.api.signals.Listener;
00006 import zephyr.plugin.core.api.synchronization.Chrono;
00007 
00008 public class NetworkClient {
00009   static private double maximumMinutesTime = -1;
00010   static private String serverHost = "";
00011   static private int serverPort = ServerScheduler.DefaultPort;
00012   static private int nbCore = LocalScheduler.getDefaultNbThreads();
00013 
00014   private final LocalScheduler localScheduler;
00015   final protected NetworkJobQueue networkJobQueue;
00016 
00017   public NetworkClient(int nbThread, String serverHost, int port, boolean multipleAttempts) {
00018     this(new LocalScheduler(nbThread, createJobQueue(serverHost, port, nbThread, multipleAttempts)));
00019   }
00020 
00021   public NetworkClient(final LocalScheduler localScheduler) {
00022     this.localScheduler = localScheduler;
00023     networkJobQueue = (NetworkJobQueue) localScheduler.queue();
00024   }
00025 
00026   private static NetworkJobQueue createJobQueue(String serverHost, int port, int nbCore,
00027       boolean multipleConnectionAttempts) {
00028     return new NetworkJobQueue(serverHost, port, nbCore, multipleConnectionAttempts);
00029   }
00030 
00031   private void setMaximumTime(final double wallTime) {
00032     networkJobQueue.onJobDone().connect(new Listener<JobDoneEvent>() {
00033       final Chrono chrono = new Chrono();
00034 
00035       @Override
00036       public void listen(JobDoneEvent event) {
00037         if (chrono.getCurrentChrono() > wallTime)
00038           networkJobQueue.denyNewJobRequest();
00039       }
00040     });
00041   }
00042 
00043   public void run() {
00044     localScheduler.start();
00045     localScheduler.waitAll();
00046   }
00047 
00048   public void asyncRun() {
00049     Thread thread = new Thread(new Runnable() {
00050       @Override
00051       public void run() {
00052         NetworkClient.this.run();
00053       }
00054     });
00055     thread.setDaemon(true);
00056     thread.start();
00057   }
00058 
00059   public void dispose() {
00060     localScheduler.dispose();
00061     networkJobQueue.dispose();
00062   }
00063 
00064   private static void readParams(String[] args) {
00065     for (String arg : args)
00066       if (arg.startsWith("-"))
00067         readOption(arg);
00068       else
00069         readServerInfo(arg);
00070   }
00071 
00072   private static void readOption(String arg) {
00073     switch (arg.charAt(1)) {
00074     case 't':
00075       maximumMinutesTime = Double.parseDouble(arg.substring(2));
00076       break;
00077     case 'c':
00078       nbCore = Integer.parseInt(arg.substring(2));
00079       break;
00080     default:
00081       System.err.println("Unknown option: " + arg);
00082     }
00083   }
00084 
00085   private static void readServerInfo(String arg) {
00086     int portSeparator = arg.lastIndexOf(":");
00087     serverHost = portSeparator >= 0 ? arg.substring(0, portSeparator) : arg;
00088     if (portSeparator >= 0)
00089       serverPort = Integer.parseInt(arg.substring(portSeparator + 1));
00090   }
00091 
00092   public static void runClient() {
00093     NetworkClient scheduler = new NetworkClient(nbCore, serverHost, serverPort, true);
00094     if (maximumMinutesTime > 0)
00095       scheduler.setMaximumTime(maximumMinutesTime * 60);
00096     scheduler.run();
00097     scheduler.dispose();
00098   }
00099 
00100   private static void printParams() {
00101     System.out.println("maximumMinutesTime: " + String.valueOf(maximumMinutesTime));
00102     System.out.println("nbCore: " + String.valueOf(nbCore));
00103   }
00104 
00105   public NetworkJobQueue queue() {
00106     return networkJobQueue;
00107   }
00108 
00109   public static void main(String[] args) {
00110     if (args.length < 1) {
00111       System.err.println("Usage: java -jar <jarfile.jar> -t<max time: 30,60,... mins> -c<nb cores> <hostname:port>");
00112       return;
00113     }
00114     readParams(args);
00115     printParams();
00116     try {
00117       runClient();
00118     } catch (Exception e) {
00119       e.printStackTrace();
00120     }
00121   }
00122 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark