RLPark 1.0.0
Reinforcement Learning Framework in Java

FileJobPool.java

Go to the documentation of this file.
00001 package rlpark.plugin.rltoys.experiments.scheduling.pools;
00002 
00003 
00004 import java.io.BufferedInputStream;
00005 import java.io.BufferedOutputStream;
00006 import java.io.Closeable;
00007 import java.io.EOFException;
00008 import java.io.File;
00009 import java.io.FileInputStream;
00010 import java.io.FileNotFoundException;
00011 import java.io.FileOutputStream;
00012 import java.io.IOException;
00013 import java.io.ObjectInputStream;
00014 import java.io.ObjectOutputStream;
00015 import java.util.Iterator;
00016 import java.util.zip.GZIPInputStream;
00017 import java.util.zip.GZIPOutputStream;
00018 
00019 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent;
00020 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.PoolResult;
00021 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.Scheduler;
00022 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages;
00023 import zephyr.plugin.core.api.signals.Listener;
00024 import zephyr.plugin.core.api.synchronization.Chrono;
00025 
00026 
00027 public class FileJobPool extends AbstractJobPool {
00028   class FileJobIterator implements Iterator<Runnable> {
00029     final ObjectInputStream objin;
00030     private Runnable nextJob;
00031 
00032     FileJobIterator(File file) {
00033       try {
00034         objin = new ObjectInputStream(new GZIPInputStream(new BufferedInputStream(new FileInputStream(file))));
00035       } catch (FileNotFoundException e) {
00036         throw new RuntimeException(e);
00037       } catch (IOException e) {
00038         throw new RuntimeException(e);
00039       }
00040       nextJob = readJob();
00041     }
00042 
00043     private Runnable readJob() {
00044       try {
00045         return (Runnable) objin.readObject();
00046       } catch (EOFException e) {
00047         close(objin);
00048         return null;
00049       } catch (Exception e) {
00050         e.printStackTrace();
00051       }
00052       return null;
00053     }
00054 
00055     @Override
00056     public boolean hasNext() {
00057       return nextJob != null;
00058     }
00059 
00060     @Override
00061     public Runnable next() {
00062       Runnable job = nextJob;
00063       nextJob = readJob();
00064       return job;
00065     }
00066 
00067     @Override
00068     public void remove() {
00069     }
00070   }
00071 
00072   static private int nbFilePool = 0;
00073   private final ObjectOutputStream objout;
00074   private final File file;
00075   private final String name;
00076   private final Chrono chrono = new Chrono();
00077   private int nbJobs = 0;
00078 
00079   public FileJobPool(JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) {
00080     this("pool" + nbFilePool, onAllJobDone, onJobDone);
00081   }
00082 
00083   public FileJobPool(String name, JobPoolListener onAllJobDone, Listener<JobDoneEvent> onJobDone) {
00084     super(onAllJobDone, onJobDone);
00085     this.name = name;
00086     nbFilePool++;
00087     try {
00088       file = File.createTempFile("jobpool", null);
00089       objout = new ObjectOutputStream(new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream(file))));
00090     } catch (IOException e) {
00091       throw new RuntimeException(e);
00092     }
00093   }
00094 
00095   @Override
00096   protected void onPoolStart() {
00097     Messages.println(String.format("Starting %s: %d jobs to do", name, nbJobs));
00098     chrono.start();
00099   }
00100 
00101   @Override
00102   protected void onPoolEnd() {
00103     Messages.println(String.format("Closing %s: %d jobs in %s", name, nbJobs, chrono.toString()));
00104   }
00105 
00106 
00107   @Override
00108   public void add(Runnable job) {
00109     checkHasBeenSubmitted();
00110     nbJobs++;
00111     try {
00112       objout.writeObject(job);
00113       objout.reset();
00114     } catch (IOException e) {
00115       throw new RuntimeException(e);
00116     }
00117   }
00118 
00119   @Override
00120   public PoolResult submitTo(Scheduler scheduler) {
00121     close(objout);
00122     if (nbJobs > 0)
00123       return super.submitTo(scheduler);
00124     poolResult = new PoolResult();
00125     poolResult.poolDone();
00126     return poolResult;
00127   }
00128 
00129   void close(Closeable closeable) {
00130     try {
00131       closeable.close();
00132     } catch (IOException e) {
00133       e.printStackTrace();
00134     }
00135   }
00136 
00137   @Override
00138   protected Iterator<Runnable> createIterator() {
00139     return new FileJobIterator(file);
00140   }
00141 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark