RLPark 1.0.0
Reinforcement Learning Framework in Java

AbstractJobPool.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.pools;
00002 
00003 import java.util.ArrayList;
00004 import java.util.Collections;
00005 import java.util.Iterator;
00006 import java.util.List;
00007 
00008 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent;
00009 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobPool;
00010 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.PoolResult;
00011 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler;
00012 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue;
00013 import zephyr.plugin.core.api.signals.Listener;
00014 
00015 
00016 public abstract class AbstractJobPool implements JobPool {
00017   class RunnableIterator implements Iterator<Runnable> {
00018     private final Iterator<Runnable> iterator;
00019 
00020     RunnableIterator(Iterator<Runnable> iterator) {
00021       this.iterator = iterator;
00022     }
00023 
00024     @Override
00025     synchronized public boolean hasNext() {
00026       return iterator.hasNext();
00027     }
00028 
00029     @Override
00030     synchronized public Runnable next() {
00031       if (nbRequestedJob == 0)
00032         onPoolStart();
00033       Runnable next = iterator.next();
00034       jobSubmitted.add(next);
00035       nbRequestedJob++;
00036       return next;
00037     }
00038 
00039     @Override
00040     public void remove() {
00041     }
00042 
00043     synchronized public boolean noRemainingJob() {
00044       return jobSubmitted.isEmpty() && !hasNext();
00045     }
00046   }
00047 
00048   protected final JobPoolListener onAllJobDone;
00049   private final Listener<JobDoneEvent> poolListener = new Listener<JobDoneEvent>() {
00050     @Override
00051     public void listen(JobDoneEvent eventInfo) {
00052       onJobDone(eventInfo);
00053     }
00054   };
00055   protected final Listener<JobDoneEvent> onJobDone;
00056   final List<Runnable> jobSubmitted = Collections.synchronizedList(new ArrayList<Runnable>());
00057   protected RunnableIterator jobIterator = null;
00058   protected int nbRequestedJob = 0;
00059   protected PoolResult poolResult = null;
00060 
00061   public AbstractJobPool(JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) {
00062     this.onAllJobDone = onAllJobDone;
00063     this.onJobDone = onJobDone;
00064   }
00065 
00066   protected void onPoolStart() {
00067   }
00068 
00069   protected void onPoolEnd() {
00070   }
00071 
00072   protected boolean hasBeenSubmitted() {
00073     return jobIterator != null && poolResult != null;
00074   }
00075 
00076   @Override
00077   public PoolResult submitTo(Scheduler scheduler) {
00078     checkHasBeenSubmitted();
00079     poolResult = new PoolResult();
00080     jobIterator = new RunnableIterator(createIterator());
00081     ((LocalQueue) scheduler.queue()).add(jobIterator, poolListener);
00082     return poolResult;
00083   }
00084 
00085   protected void checkHasBeenSubmitted() {
00086     if (hasBeenSubmitted())
00087       throw new RuntimeException("The pool has already been submitted");
00088   }
00089 
00090   protected void onJobDone(JobDoneEvent event) {
00091     assert jobSubmitted.contains(event.todo);
00092     if (onJobDone != null)
00093       onJobDone.listen(event);
00094     jobSubmitted.remove(event.todo);
00095     if (jobIterator.noRemainingJob()) {
00096       onAllJobDone.listen(AbstractJobPool.this);
00097       onPoolEnd();
00098       poolResult.poolDone();
00099     }
00100   }
00101 
00102   abstract protected Iterator<Runnable> createIterator();
00103 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark