RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.robot.internal.sync; 00002 00003 import java.util.LinkedList; 00004 import java.util.concurrent.Semaphore; 00005 import java.util.concurrent.TimeUnit; 00006 00007 import rlpark.plugin.robot.observations.ObservationReceiver; 00008 import rlpark.plugin.robot.observations.ObservationVersatile; 00009 import rlpark.plugin.robot.observations.ObservationVersatileArray; 00010 00011 public class ObservationSynchronizer { 00012 static private int BufferSize = 100; 00013 private final ObservationReceiver receiver; 00014 private final LinkedList<ObservationVersatile> lastObsBuffer = new LinkedList<ObservationVersatile>(); 00015 private boolean terminated = false; 00016 private final Runnable observationReader = new Runnable() { 00017 @Override 00018 public void run() { 00019 observationReaderMainLoop(); 00020 } 00021 }; 00022 private boolean persistent = false; 00023 private final Semaphore firstInitialization = new Semaphore(0); 00024 00025 public ObservationSynchronizer(ObservationReceiver receiver, boolean persistent) { 00026 assert receiver != null; 00027 this.receiver = receiver; 00028 this.persistent = persistent; 00029 start(); 00030 try { 00031 if (firstInitialization.tryAcquire(5, TimeUnit.SECONDS)) 00032 firstInitialization.release(); 00033 } catch (InterruptedException e) { 00034 e.printStackTrace(); 00035 } 00036 } 00037 00038 protected void observationReaderMainLoop() { 00039 while (!terminated) { 00040 receiver.initialize(); 00041 firstInitialization.release(); 00042 while (!receiver.isClosed() && !terminated) { 00043 ObservationVersatile obs = receiver.waitForData(); 00044 if (obs != null) 00045 setLastObs(obs); 00046 } 00047 if (!persistent) { 00048 terminate(); 00049 break; 00050 } 00051 } 00052 } 00053 00054 private void start() { 00055 Thread thread = new Thread(observationReader); 00056 thread.setName("ObservationReader"); 00057 thread.setDaemon(true); 00058 thread.start(); 00059 } 00060 00061 public void setPersistent(boolean persistent) { 00062 this.persistent = true; 00063 } 00064 00065 synchronized private void setLastObs(ObservationVersatile obs) { 00066 lastObsBuffer.add(obs); 00067 if (lastObsBuffer.size() > BufferSize) 00068 lastObsBuffer.poll(); 00069 notifyAll(); 00070 } 00071 00072 synchronized public ObservationVersatileArray waitNewObs() { 00073 if (lastObsBuffer.size() > 0) 00074 return useLastObs(); 00075 try { 00076 wait(); 00077 } catch (InterruptedException e) { 00078 e.printStackTrace(); 00079 } 00080 return useLastObs(); 00081 } 00082 00083 synchronized public ObservationVersatileArray newObsNow() { 00084 return useLastObs(); 00085 } 00086 00087 synchronized private ObservationVersatileArray useLastObs() { 00088 if (lastObsBuffer.size() == 0) 00089 return null; 00090 ObservationVersatileArray result = new ObservationVersatileArray(lastObsBuffer); 00091 lastObsBuffer.clear(); 00092 return result; 00093 } 00094 00095 public ObservationReceiver receiver() { 00096 return receiver; 00097 } 00098 00099 synchronized public void terminate() { 00100 receiver.close(); 00101 terminated = true; 00102 notifyAll(); 00103 } 00104 00105 public boolean isTerminated() { 00106 return terminated; 00107 } 00108 }