RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.schedulers; 00002 00003 import java.util.ArrayList; 00004 import java.util.List; 00005 import java.util.concurrent.ExecutorService; 00006 import java.util.concurrent.Future; 00007 00008 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue; 00009 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler; 00010 import rlpark.plugin.rltoys.experiments.scheduling.internal.SchedulingThreadFactory; 00011 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue; 00012 import zephyr.plugin.core.api.synchronization.Chrono; 00013 00014 public class LocalScheduler implements Scheduler { 00015 protected class RunnableProcessor implements Runnable { 00016 @Override 00017 public void run() { 00018 try { 00019 Runnable runnable = runnables.request(); 00020 while (exceptionThrown == null && runnable != null) { 00021 runnable.run(); 00022 runnables.done(runnable, runnable); 00023 runnable = runnables.request(); 00024 } 00025 } catch (Throwable exception) { 00026 if (exceptionThrown == null) 00027 exceptionThrown = exception; 00028 exception.printStackTrace(); 00029 } 00030 } 00031 } 00032 00033 Throwable exceptionThrown = null; 00034 final ExecutorService executor; 00035 private final List<RunnableProcessor> updaters = new ArrayList<RunnableProcessor>(); 00036 private final Future<?>[] futurs; 00037 protected final JobQueue runnables; 00038 private final Chrono chrono = new Chrono(); 00039 protected final int nbThread; 00040 00041 public LocalScheduler() { 00042 this(getDefaultNbThreads() + 1); 00043 } 00044 00045 public LocalScheduler(int nbThread) { 00046 this(nbThread, new LocalQueue()); 00047 } 00048 00049 public LocalScheduler(JobQueue runnables) { 00050 this(getDefaultNbThreads() + 1, runnables); 00051 } 00052 00053 public static int getDefaultNbThreads() { 00054 return Runtime.getRuntime().availableProcessors(); 00055 } 00056 00057 public LocalScheduler(int nbThread, JobQueue runnables) { 00058 this.nbThread = nbThread; 00059 this.runnables = runnables; 00060 for (int i = 0; i < nbThread; i++) 00061 updaters.add(new RunnableProcessor()); 00062 futurs = new Future<?>[nbThread]; 00063 executor = SchedulingThreadFactory.newFixedThreadPool("LocalScheduler", nbThread); 00064 } 00065 00066 @Override 00067 public void start() { 00068 exceptionThrown = null; 00069 chrono.start(); 00070 for (int i = 0; i < updaters.size(); i++) 00071 if (futurs[i] == null || futurs[i].isDone()) 00072 futurs[i] = executor.submit(updaters.get(i)); 00073 } 00074 00075 @Override 00076 public void waitAll() { 00077 if (runnables instanceof LocalQueue) 00078 LocalQueue.waitAllDone((LocalQueue) runnables); 00079 else 00080 waitFuturs(); 00081 if (exceptionThrown != null) 00082 throw new RuntimeException(exceptionThrown); 00083 } 00084 00085 private void waitFuturs() { 00086 for (Future<?> future : futurs) 00087 try { 00088 future.get(); 00089 } catch (Exception e) { 00090 exceptionThrown = e; 00091 break; 00092 } 00093 } 00094 00095 public long updateTimeAverage() { 00096 return chrono.getCurrentNano(); 00097 } 00098 00099 @Override 00100 public void dispose() { 00101 executor.shutdown(); 00102 runnables.dispose(); 00103 } 00104 00105 public Chrono chrono() { 00106 return chrono; 00107 } 00108 00109 @Override 00110 public JobQueue queue() { 00111 return runnables; 00112 } 00113 00114 public Throwable exceptionOccured() { 00115 return exceptionThrown; 00116 } 00117 00118 public boolean isShutdown() { 00119 return executor.isShutdown(); 00120 } 00121 }