RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.pools; 00002 00003 00004 import java.io.BufferedInputStream; 00005 import java.io.BufferedOutputStream; 00006 import java.io.Closeable; 00007 import java.io.EOFException; 00008 import java.io.File; 00009 import java.io.FileInputStream; 00010 import java.io.FileNotFoundException; 00011 import java.io.FileOutputStream; 00012 import java.io.IOException; 00013 import java.io.ObjectInputStream; 00014 import java.io.ObjectOutputStream; 00015 import java.util.Iterator; 00016 import java.util.zip.GZIPInputStream; 00017 import java.util.zip.GZIPOutputStream; 00018 00019 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00020 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.PoolResult; 00021 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler; 00022 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages; 00023 import zephyr.plugin.core.api.signals.Listener; 00024 import zephyr.plugin.core.api.synchronization.Chrono; 00025 00026 00027 public class FileJobPool extends AbstractJobPool { 00028 class FileJobIterator implements Iterator<Runnable> { 00029 final ObjectInputStream objin; 00030 private Runnable nextJob; 00031 00032 FileJobIterator(File file) { 00033 try { 00034 objin = new ObjectInputStream(new GZIPInputStream(new BufferedInputStream(new FileInputStream(file)))); 00035 } catch (FileNotFoundException e) { 00036 throw new RuntimeException(e); 00037 } catch (IOException e) { 00038 throw new RuntimeException(e); 00039 } 00040 nextJob = readJob(); 00041 } 00042 00043 private Runnable readJob() { 00044 try { 00045 return (Runnable) objin.readObject(); 00046 } catch (EOFException e) { 00047 close(objin); 00048 return null; 00049 } catch (Exception e) { 00050 e.printStackTrace(); 00051 } 00052 return null; 00053 } 00054 00055 @Override 00056 public boolean hasNext() { 00057 return nextJob != null; 00058 } 00059 00060 @Override 00061 public Runnable next() { 00062 Runnable job = nextJob; 00063 nextJob = readJob(); 00064 return job; 00065 } 00066 00067 @Override 00068 public void remove() { 00069 } 00070 } 00071 00072 static private int nbFilePool = 0; 00073 private final ObjectOutputStream objout; 00074 private final File file; 00075 private final String name; 00076 private final Chrono chrono = new Chrono(); 00077 private int nbJobs = 0; 00078 00079 public FileJobPool(JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) { 00080 this("pool" + nbFilePool, onAllJobDone, onJobDone); 00081 } 00082 00083 public FileJobPool(String name, JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) { 00084 super(onAllJobDone, onJobDone); 00085 this.name = name; 00086 nbFilePool++; 00087 try { 00088 file = File.createTempFile("jobpool", null); 00089 objout = new ObjectOutputStream(new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream(file)))); 00090 } catch (IOException e) { 00091 throw new RuntimeException(e); 00092 } 00093 } 00094 00095 @Override 00096 protected void onPoolStart() { 00097 Messages.println(String.format("Starting %s: %d jobs to do", name, nbJobs)); 00098 chrono.start(); 00099 } 00100 00101 @Override 00102 protected void onPoolEnd() { 00103 Messages.println(String.format("Closing %s: %d jobs in %s", name, nbJobs, chrono.toString())); 00104 } 00105 00106 00107 @Override 00108 public void add(Runnable job) { 00109 checkHasBeenSubmitted(); 00110 nbJobs++; 00111 try { 00112 objout.writeObject(job); 00113 objout.reset(); 00114 } catch (IOException e) { 00115 throw new RuntimeException(e); 00116 } 00117 } 00118 00119 @Override 00120 public PoolResult submitTo(Scheduler scheduler) { 00121 close(objout); 00122 if (nbJobs > 0) 00123 return super.submitTo(scheduler); 00124 poolResult = new PoolResult(); 00125 poolResult.poolDone(); 00126 return poolResult; 00127 } 00128 00129 void close(Closeable closeable) { 00130 try { 00131 closeable.close(); 00132 } catch (IOException e) { 00133 e.printStackTrace(); 00134 } 00135 } 00136 00137 @Override 00138 protected Iterator<Runnable> createIterator() { 00139 return new FileJobIterator(file); 00140 } 00141 }