RLPark 1.0.0
Reinforcement Learning Framework in Java

ObservationSynchronizer.java

Go to the documentation of this file.
00001 package rlpark.plugin.robot.internal.sync;
00002 
00003 import java.util.LinkedList;
00004 import java.util.concurrent.Semaphore;
00005 import java.util.concurrent.TimeUnit;
00006 
00007 import rlpark.plugin.robot.observations.ObservationReceiver;
00008 import rlpark.plugin.robot.observations.ObservationVersatile;
00009 import rlpark.plugin.robot.observations.ObservationVersatileArray;
00010 
00011 public class ObservationSynchronizer {
00012   static private int BufferSize = 100;
00013   private final ObservationReceiver receiver;
00014   private final LinkedList<ObservationVersatile> lastObsBuffer = new LinkedList<ObservationVersatile>();
00015   private boolean terminated = false;
00016   private final Runnable observationReader = new Runnable() {
00017     @Override
00018     public void run() {
00019       observationReaderMainLoop();
00020     }
00021   };
00022   private boolean persistent = false;
00023   private final Semaphore firstInitialization = new Semaphore(0);
00024 
00025   public ObservationSynchronizer(ObservationReceiver receiver, boolean persistent) {
00026     assert receiver != null;
00027     this.receiver = receiver;
00028     this.persistent = persistent;
00029     start();
00030     try {
00031       if (firstInitialization.tryAcquire(5, TimeUnit.SECONDS))
00032         firstInitialization.release();
00033     } catch (InterruptedException e) {
00034       e.printStackTrace();
00035     }
00036   }
00037 
00038   protected void observationReaderMainLoop() {
00039     while (!terminated) {
00040       receiver.initialize();
00041       firstInitialization.release();
00042       while (!receiver.isClosed() && !terminated) {
00043         ObservationVersatile obs = receiver.waitForData();
00044         if (obs != null)
00045           setLastObs(obs);
00046       }
00047       if (!persistent) {
00048         terminate();
00049         break;
00050       }
00051     }
00052   }
00053 
00054   private void start() {
00055     Thread thread = new Thread(observationReader);
00056     thread.setName("ObservationReader");
00057     thread.setDaemon(true);
00058     thread.start();
00059   }
00060 
00061   public void setPersistent(boolean persistent) {
00062     this.persistent = true;
00063   }
00064 
00065   synchronized private void setLastObs(ObservationVersatile obs) {
00066     lastObsBuffer.add(obs);
00067     if (lastObsBuffer.size() > BufferSize)
00068       lastObsBuffer.poll();
00069     notifyAll();
00070   }
00071 
00072   synchronized public ObservationVersatileArray waitNewObs() {
00073     if (lastObsBuffer.size() > 0)
00074       return useLastObs();
00075     try {
00076       wait();
00077     } catch (InterruptedException e) {
00078       e.printStackTrace();
00079     }
00080     return useLastObs();
00081   }
00082 
00083   synchronized public ObservationVersatileArray newObsNow() {
00084     return useLastObs();
00085   }
00086 
00087   synchronized private ObservationVersatileArray useLastObs() {
00088     if (lastObsBuffer.size() == 0)
00089       return null;
00090     ObservationVersatileArray result = new ObservationVersatileArray(lastObsBuffer);
00091     lastObsBuffer.clear();
00092     return result;
00093   }
00094 
00095   public ObservationReceiver receiver() {
00096     return receiver;
00097   }
00098 
00099   synchronized public void terminate() {
00100     receiver.close();
00101     terminated = true;
00102     notifyAll();
00103   }
00104 
00105   public boolean isTerminated() {
00106     return terminated;
00107   }
00108 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark