RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.network; 00002 00003 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00004 import rlpark.plugin.rltoys.experiments.scheduling.schedulers.LocalScheduler; 00005 import zephyr.plugin.core.api.signals.Listener; 00006 import zephyr.plugin.core.api.synchronization.Chrono; 00007 00008 public class NetworkClient { 00009 static private double maximumMinutesTime = -1; 00010 static private String serverHost = ""; 00011 static private int serverPort = ServerScheduler.DefaultPort; 00012 static private int nbCore = LocalScheduler.getDefaultNbThreads(); 00013 00014 private final LocalScheduler localScheduler; 00015 final protected NetworkJobQueue networkJobQueue; 00016 00017 public NetworkClient(int nbThread, String serverHost, int port, boolean multipleAttempts) { 00018 this(new LocalScheduler(nbThread, createJobQueue(serverHost, port, nbThread, multipleAttempts))); 00019 } 00020 00021 public NetworkClient(final LocalScheduler localScheduler) { 00022 this.localScheduler = localScheduler; 00023 networkJobQueue = (NetworkJobQueue) localScheduler.queue(); 00024 } 00025 00026 private static NetworkJobQueue createJobQueue(String serverHost, int port, int nbCore, 00027 boolean multipleConnectionAttempts) { 00028 return new NetworkJobQueue(serverHost, port, nbCore, multipleConnectionAttempts); 00029 } 00030 00031 private void setMaximumTime(final double wallTime) { 00032 networkJobQueue.onJobDone().connect(new Listener<JobDoneEvent>() { 00033 final Chrono chrono = new Chrono(); 00034 00035 @Override 00036 public void listen(JobDoneEvent event) { 00037 if (chrono.getCurrentChrono() > wallTime) 00038 networkJobQueue.denyNewJobRequest(); 00039 } 00040 }); 00041 } 00042 00043 public void run() { 00044 localScheduler.start(); 00045 localScheduler.waitAll(); 00046 } 00047 00048 public void asyncRun() { 00049 Thread thread = new Thread(new Runnable() { 00050 @Override 00051 public void run() { 00052 NetworkClient.this.run(); 00053 } 00054 }); 00055 thread.setDaemon(true); 00056 thread.start(); 00057 } 00058 00059 public void dispose() { 00060 localScheduler.dispose(); 00061 networkJobQueue.dispose(); 00062 } 00063 00064 private static void readParams(String[] args) { 00065 for (String arg : args) 00066 if (arg.startsWith("-")) 00067 readOption(arg); 00068 else 00069 readServerInfo(arg); 00070 } 00071 00072 private static void readOption(String arg) { 00073 switch (arg.charAt(1)) { 00074 case 't': 00075 maximumMinutesTime = Double.parseDouble(arg.substring(2)); 00076 break; 00077 case 'c': 00078 nbCore = Integer.parseInt(arg.substring(2)); 00079 break; 00080 default: 00081 System.err.println("Unknown option: " + arg); 00082 } 00083 } 00084 00085 private static void readServerInfo(String arg) { 00086 int portSeparator = arg.lastIndexOf(":"); 00087 serverHost = portSeparator >= 0 ? arg.substring(0, portSeparator) : arg; 00088 if (portSeparator >= 0) 00089 serverPort = Integer.parseInt(arg.substring(portSeparator + 1)); 00090 } 00091 00092 public static void runClient() { 00093 NetworkClient scheduler = new NetworkClient(nbCore, serverHost, serverPort, true); 00094 if (maximumMinutesTime > 0) 00095 scheduler.setMaximumTime(maximumMinutesTime * 60); 00096 scheduler.run(); 00097 scheduler.dispose(); 00098 } 00099 00100 private static void printParams() { 00101 System.out.println("maximumMinutesTime: " + String.valueOf(maximumMinutesTime)); 00102 System.out.println("nbCore: " + String.valueOf(nbCore)); 00103 } 00104 00105 public NetworkJobQueue queue() { 00106 return networkJobQueue; 00107 } 00108 00109 public static void main(String[] args) { 00110 if (args.length < 1) { 00111 System.err.println("Usage: java -jar <jarfile.jar> -t<max time: 30,60,... mins> -c<nb cores> <hostname:port>"); 00112 return; 00113 } 00114 readParams(args); 00115 printParams(); 00116 try { 00117 runClient(); 00118 } catch (Exception e) { 00119 e.printStackTrace(); 00120 } 00121 } 00122 }