RLPark 1.0.0
Reinforcement Learning Framework in Java

LocalQueue.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.queue;
00002 
00003 import java.util.HashMap;
00004 import java.util.Iterator;
00005 import java.util.LinkedHashMap;
00006 import java.util.LinkedList;
00007 import java.util.Map;
00008 import java.util.Random;
00009 import java.util.concurrent.Semaphore;
00010 
00011 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent;
00012 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobQueue;
00013 import rlpark.plugin.rltoys.experiments.scheduling.internal.JobDoneEventQueue;
00014 import rlpark.plugin.rltoys.utils.Utils;
00015 import zephyr.plugin.core.api.signals.Listener;
00016 import zephyr.plugin.core.api.signals.Signal;
00017 
00018 
00019 public class LocalQueue implements JobQueue {
00020   static class JobInfo {
00021     final Runnable job;
00022     final Listener<JobDoneEvent> listener;
00023 
00024     JobInfo(Runnable job, Listener<JobDoneEvent> listener) {
00025       this.job = job;
00026       this.listener = listener;
00027     }
00028   }
00029 
00030   private final Map<Iterator<? extends Runnable>, Listener<JobDoneEvent>> listeners = new HashMap<Iterator<? extends Runnable>, Listener<JobDoneEvent>>();
00031   private final LinkedList<Iterator<? extends Runnable>> waiting = new LinkedList<Iterator<? extends Runnable>>();
00032   private final Map<Runnable, Listener<JobDoneEvent>> pending = new LinkedHashMap<Runnable, Listener<JobDoneEvent>>();
00033   private final LinkedList<JobInfo> canceled = new LinkedList<JobInfo>();
00034   private final JobDoneEventQueue jobDoneEventQueue = new JobDoneEventQueue();
00035   private final Random random = new Random(0);
00036   private Iterator<? extends Runnable> currentJobIterator = null;
00037   private int nbJobsDone = 0;
00038   private boolean poolFromPending = false;
00039 
00040   synchronized public void requestCancel(Runnable pendingJob) {
00041     if (!pending.containsKey(pendingJob))
00042       return;
00043     Listener<JobDoneEvent> listener = pending.remove(pendingJob);
00044     canceled.addFirst(new JobInfo(pendingJob, listener));
00045   }
00046 
00047   private JobInfo findJob() {
00048     JobInfo jobInfo = canceled.poll();
00049     while (jobInfo == null) {
00050       if (currentJobIterator == null)
00051         currentJobIterator = waiting.poll();
00052       if (currentJobIterator == null)
00053         break;
00054       if (currentJobIterator.hasNext()) {
00055         Runnable job = currentJobIterator.next();
00056         jobInfo = new JobInfo(job, listeners.get(currentJobIterator));
00057       }
00058       if (!currentJobIterator.hasNext()) {
00059         listeners.remove(currentJobIterator);
00060         currentJobIterator = null;
00061       }
00062     }
00063     return jobInfo;
00064   }
00065 
00066   public void enablePoolFromPending() {
00067     poolFromPending = true;
00068   }
00069 
00070   @Override
00071   synchronized public Runnable request() {
00072     JobInfo jobInfo = findJob();
00073     if (jobInfo == null)
00074       return findPendingJob();
00075     pending.put(jobInfo.job, jobInfo.listener);
00076     return jobInfo.job;
00077   }
00078 
00079   private Runnable findPendingJob() {
00080     if (!poolFromPending || pending.isEmpty())
00081       return null;
00082     Runnable[] jobs = new Runnable[pending.size()];
00083     pending.keySet().toArray(jobs);
00084     return Utils.choose(random, jobs);
00085   }
00086 
00087   @Override
00088   synchronized public void done(Runnable todo, Runnable done) {
00089     boolean removed = pending.containsKey(todo);
00090     if (!removed)
00091       return;
00092     Listener<JobDoneEvent> listener = pending.remove(todo);
00093     jobDoneEventQueue.onJobDone(new JobDoneEvent(todo, done), listener);
00094     nbJobsDone++;
00095   }
00096 
00097   synchronized public boolean areAllDone() {
00098     return currentJobIterator == null && waiting.isEmpty() && pending.isEmpty() && canceled.isEmpty()
00099         && jobDoneEventQueue.isEmpty();
00100   }
00101 
00102   synchronized public void add(Iterator<? extends Runnable> jobIterator, Listener<JobDoneEvent> listener) {
00103     listeners.put(jobIterator, listener);
00104     waiting.add(jobIterator);
00105   }
00106 
00107   static public void waitAllDone(LocalQueue localQueue) {
00108     final Semaphore semaphore = new Semaphore(0);
00109     final Listener<JobDoneEvent> listener = new Listener<JobDoneEvent>() {
00110       @Override
00111       public void listen(JobDoneEvent eventInfo) {
00112         semaphore.release();
00113       }
00114     };
00115     localQueue.onJobDone().connect(listener);
00116     while (!localQueue.areAllDone()) {
00117       try {
00118         semaphore.acquire();
00119       } catch (InterruptedException e) {
00120       }
00121     }
00122     localQueue.onJobDone().disconnect(listener);
00123   }
00124 
00125   @Override
00126   public Signal<JobDoneEvent> onJobDone() {
00127     return jobDoneEventQueue.onJobDone;
00128   }
00129 
00130   public int nbJobsDone() {
00131     return nbJobsDone;
00132   }
00133 
00134   @Override
00135   public void dispose() {
00136     jobDoneEventQueue.dispose();
00137   }
00138 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark