RLPark 1.0.0
Reinforcement Learning Framework in Java

LocalScheduler.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.schedulers;
00002 
00003 import java.util.ArrayList;
00004 import java.util.List;
00005 import java.util.concurrent.ExecutorService;
00006 import java.util.concurrent.Future;
00007 
00008 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue;
00009 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler;
00010 import rlpark.plugin.rltoys.experiments.scheduling.internal.SchedulingThreadFactory;
00011 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue;
00012 import zephyr.plugin.core.api.synchronization.Chrono;
00013 
00014 public class LocalScheduler implements Scheduler {
00015   protected class RunnableProcessor implements Runnable {
00016     @Override
00017     public void run() {
00018       try {
00019         Runnable runnable = runnables.request();
00020         while (exceptionThrown == null && runnable != null) {
00021           runnable.run();
00022           runnables.done(runnable, runnable);
00023           runnable = runnables.request();
00024         }
00025       } catch (Throwable exception) {
00026         if (exceptionThrown == null)
00027           exceptionThrown = exception;
00028         exception.printStackTrace();
00029       }
00030     }
00031   }
00032 
00033   Throwable exceptionThrown = null;
00034   final ExecutorService executor;
00035   private final List<RunnableProcessor> updaters = new ArrayList<RunnableProcessor>();
00036   private final Future<?>[] futurs;
00037   protected final JobQueue runnables;
00038   private final Chrono chrono = new Chrono();
00039   protected final int nbThread;
00040 
00041   public LocalScheduler() {
00042     this(getDefaultNbThreads() + 1);
00043   }
00044 
00045   public LocalScheduler(int nbThread) {
00046     this(nbThread, new LocalQueue());
00047   }
00048 
00049   public LocalScheduler(JobQueue runnables) {
00050     this(getDefaultNbThreads() + 1, runnables);
00051   }
00052 
00053   public static int getDefaultNbThreads() {
00054     return Runtime.getRuntime().availableProcessors();
00055   }
00056 
00057   public LocalScheduler(int nbThread, JobQueue runnables) {
00058     this.nbThread = nbThread;
00059     this.runnables = runnables;
00060     for (int i = 0; i < nbThread; i++)
00061       updaters.add(new RunnableProcessor());
00062     futurs = new Future<?>[nbThread];
00063     executor = SchedulingThreadFactory.newFixedThreadPool("LocalScheduler", nbThread);
00064   }
00065 
00066   @Override
00067   public void start() {
00068     exceptionThrown = null;
00069     chrono.start();
00070     for (int i = 0; i < updaters.size(); i++)
00071       if (futurs[i] == null || futurs[i].isDone())
00072         futurs[i] = executor.submit(updaters.get(i));
00073   }
00074 
00075   @Override
00076   public void waitAll() {
00077     if (runnables instanceof LocalQueue)
00078       LocalQueue.waitAllDone((LocalQueue) runnables);
00079     else
00080       waitFuturs();
00081     if (exceptionThrown != null)
00082       throw new RuntimeException(exceptionThrown);
00083   }
00084 
00085   private void waitFuturs() {
00086     for (Future<?> future : futurs)
00087       try {
00088         future.get();
00089       } catch (Exception e) {
00090         exceptionThrown = e;
00091         break;
00092       }
00093   }
00094 
00095   public long updateTimeAverage() {
00096     return chrono.getCurrentNano();
00097   }
00098 
00099   @Override
00100   public void dispose() {
00101     executor.shutdown();
00102     runnables.dispose();
00103   }
00104 
00105   public Chrono chrono() {
00106     return chrono;
00107   }
00108 
00109   @Override
00110   public JobQueue queue() {
00111     return runnables;
00112   }
00113 
00114   public Throwable exceptionOccured() {
00115     return exceptionThrown;
00116   }
00117 
00118   public boolean isShutdown() {
00119     return executor.isShutdown();
00120   }
00121 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark