RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.queue; 00002 00003 import java.util.HashMap; 00004 import java.util.Iterator; 00005 import java.util.LinkedHashMap; 00006 import java.util.LinkedList; 00007 import java.util.Map; 00008 import java.util.Random; 00009 import java.util.concurrent.Semaphore; 00010 00011 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00012 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue; 00013 import rlpark.plugin.rltoys.experiments.scheduling.internal.JobDoneEventQueue; 00014 import rlpark.plugin.rltoys.utils.Utils; 00015 import zephyr.plugin.core.api.signals.Listener; 00016 import zephyr.plugin.core.api.signals.Signal; 00017 00018 00019 public class LocalQueue implements JobQueue { 00020 static class JobInfo { 00021 final Runnable job; 00022 final Listener<JobDoneEvent> listener; 00023 00024 JobInfo(Runnable job, Listener<JobDoneEvent> listener) { 00025 this.job = job; 00026 this.listener = listener; 00027 } 00028 } 00029 00030 private final Map<Iterator<? extends Runnable>, Listener<JobDoneEvent>> listeners = new HashMap<Iterator<? extends Runnable>, Listener<JobDoneEvent>>(); 00031 private final LinkedList<Iterator<? extends Runnable>> waiting = new LinkedList<Iterator<? extends Runnable>>(); 00032 private final Map<Runnable, Listener<JobDoneEvent>> pending = new LinkedHashMap<Runnable, Listener<JobDoneEvent>>(); 00033 private final LinkedList<JobInfo> canceled = new LinkedList<JobInfo>(); 00034 private final JobDoneEventQueue jobDoneEventQueue = new JobDoneEventQueue(); 00035 private final Random random = new Random(0); 00036 private Iterator<? extends Runnable> currentJobIterator = null; 00037 private int nbJobsDone = 0; 00038 private boolean poolFromPending = false; 00039 00040 synchronized public void requestCancel(Runnable pendingJob) { 00041 if (!pending.containsKey(pendingJob)) 00042 return; 00043 Listener<JobDoneEvent> listener = pending.remove(pendingJob); 00044 canceled.addFirst(new JobInfo(pendingJob, listener)); 00045 } 00046 00047 private JobInfo findJob() { 00048 JobInfo jobInfo = canceled.poll(); 00049 while (jobInfo == null) { 00050 if (currentJobIterator == null) 00051 currentJobIterator = waiting.poll(); 00052 if (currentJobIterator == null) 00053 break; 00054 if (currentJobIterator.hasNext()) { 00055 Runnable job = currentJobIterator.next(); 00056 jobInfo = new JobInfo(job, listeners.get(currentJobIterator)); 00057 } 00058 if (!currentJobIterator.hasNext()) { 00059 listeners.remove(currentJobIterator); 00060 currentJobIterator = null; 00061 } 00062 } 00063 return jobInfo; 00064 } 00065 00066 public void enablePoolFromPending() { 00067 poolFromPending = true; 00068 } 00069 00070 @Override 00071 synchronized public Runnable request() { 00072 JobInfo jobInfo = findJob(); 00073 if (jobInfo == null) 00074 return findPendingJob(); 00075 pending.put(jobInfo.job, jobInfo.listener); 00076 return jobInfo.job; 00077 } 00078 00079 private Runnable findPendingJob() { 00080 if (!poolFromPending || pending.isEmpty()) 00081 return null; 00082 Runnable[] jobs = new Runnable[pending.size()]; 00083 pending.keySet().toArray(jobs); 00084 return Utils.choose(random, jobs); 00085 } 00086 00087 @Override 00088 synchronized public void done(Runnable todo, Runnable done) { 00089 boolean removed = pending.containsKey(todo); 00090 if (!removed) 00091 return; 00092 Listener<JobDoneEvent> listener = pending.remove(todo); 00093 jobDoneEventQueue.onJobDone(new JobDoneEvent(todo, done), listener); 00094 nbJobsDone++; 00095 } 00096 00097 synchronized public boolean areAllDone() { 00098 return currentJobIterator == null && waiting.isEmpty() && pending.isEmpty() && canceled.isEmpty() 00099 && jobDoneEventQueue.isEmpty(); 00100 } 00101 00102 synchronized public void add(Iterator<? extends Runnable> jobIterator, Listener<JobDoneEvent> listener) { 00103 listeners.put(jobIterator, listener); 00104 waiting.add(jobIterator); 00105 } 00106 00107 static public void waitAllDone(LocalQueue localQueue) { 00108 final Semaphore semaphore = new Semaphore(0); 00109 final Listener<JobDoneEvent> listener = new Listener<JobDoneEvent>() { 00110 @Override 00111 public void listen(JobDoneEvent eventInfo) { 00112 semaphore.release(); 00113 } 00114 }; 00115 localQueue.onJobDone().connect(listener); 00116 while (!localQueue.areAllDone()) { 00117 try { 00118 semaphore.acquire(); 00119 } catch (InterruptedException e) { 00120 } 00121 } 00122 localQueue.onJobDone().disconnect(listener); 00123 } 00124 00125 @Override 00126 public Signal<JobDoneEvent> onJobDone() { 00127 return jobDoneEventQueue.onJobDone; 00128 } 00129 00130 public int nbJobsDone() { 00131 return nbJobsDone; 00132 } 00133 00134 @Override 00135 public void dispose() { 00136 jobDoneEventQueue.dispose(); 00137 } 00138 }