RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.pools; 00002 00003 import java.util.ArrayList; 00004 import java.util.Collections; 00005 import java.util.Iterator; 00006 import java.util.List; 00007 00008 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00009 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobPool; 00010 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.PoolResult; 00011 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler; 00012 import rlpark.plugin.rltoys.experiments.scheduling.queue.LocalQueue; 00013 import zephyr.plugin.core.api.signals.Listener; 00014 00015 00016 public abstract class AbstractJobPool implements JobPool { 00017 class RunnableIterator implements Iterator<Runnable> { 00018 private final Iterator<Runnable> iterator; 00019 00020 RunnableIterator(Iterator<Runnable> iterator) { 00021 this.iterator = iterator; 00022 } 00023 00024 @Override 00025 synchronized public boolean hasNext() { 00026 return iterator.hasNext(); 00027 } 00028 00029 @Override 00030 synchronized public Runnable next() { 00031 if (nbRequestedJob == 0) 00032 onPoolStart(); 00033 Runnable next = iterator.next(); 00034 jobSubmitted.add(next); 00035 nbRequestedJob++; 00036 return next; 00037 } 00038 00039 @Override 00040 public void remove() { 00041 } 00042 00043 synchronized public boolean noRemainingJob() { 00044 return jobSubmitted.isEmpty() && !hasNext(); 00045 } 00046 } 00047 00048 protected final JobPoolListener onAllJobDone; 00049 private final Listener<JobDoneEvent> poolListener = new Listener<JobDoneEvent>() { 00050 @Override 00051 public void listen(JobDoneEvent eventInfo) { 00052 onJobDone(eventInfo); 00053 } 00054 }; 00055 protected final Listener<JobDoneEvent> onJobDone; 00056 final List<Runnable> jobSubmitted = Collections.synchronizedList(new ArrayList<Runnable>()); 00057 protected RunnableIterator jobIterator = null; 00058 protected int nbRequestedJob = 0; 00059 protected PoolResult poolResult = null; 00060 00061 public AbstractJobPool(JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) { 00062 this.onAllJobDone = onAllJobDone; 00063 this.onJobDone = onJobDone; 00064 } 00065 00066 protected void onPoolStart() { 00067 } 00068 00069 protected void onPoolEnd() { 00070 } 00071 00072 protected boolean hasBeenSubmitted() { 00073 return jobIterator != null && poolResult != null; 00074 } 00075 00076 @Override 00077 public PoolResult submitTo(Scheduler scheduler) { 00078 checkHasBeenSubmitted(); 00079 poolResult = new PoolResult(); 00080 jobIterator = new RunnableIterator(createIterator()); 00081 ((LocalQueue) scheduler.queue()).add(jobIterator, poolListener); 00082 return poolResult; 00083 } 00084 00085 protected void checkHasBeenSubmitted() { 00086 if (hasBeenSubmitted()) 00087 throw new RuntimeException("The pool has already been submitted"); 00088 } 00089 00090 protected void onJobDone(JobDoneEvent event) { 00091 assert jobSubmitted.contains(event.todo); 00092 if (onJobDone != null) 00093 onJobDone.listen(event); 00094 jobSubmitted.remove(event.todo); 00095 if (jobIterator.noRemainingJob()) { 00096 onAllJobDone.listen(AbstractJobPool.this); 00097 onPoolEnd(); 00098 poolResult.poolDone(); 00099 } 00100 } 00101 00102 abstract protected Iterator<Runnable> createIterator(); 00103 }