RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.internal; 00002 00003 import java.util.concurrent.BlockingQueue; 00004 import java.util.concurrent.LinkedBlockingQueue; 00005 00006 import rlpark.plugin.rltoys.experiments.scheduling.interfaces.JobDoneEvent; 00007 import zephyr.plugin.core.api.signals.Listener; 00008 import zephyr.plugin.core.api.signals.Signal; 00009 00010 public class JobDoneEventQueue { 00011 class JobEventInternal { 00012 final Listener<JobDoneEvent> listener; 00013 final JobDoneEvent jobDoneEvent; 00014 00015 JobEventInternal(Listener<JobDoneEvent> listener, JobDoneEvent event) { 00016 this.listener = listener; 00017 this.jobDoneEvent = event; 00018 } 00019 } 00020 00021 public final Signal<JobDoneEvent> onJobDone = new Signal<JobDoneEvent>(); 00022 private final BlockingQueue<JobEventInternal> queue = new LinkedBlockingQueue<JobEventInternal>(); 00023 private final Thread currentThread = new Thread(new Runnable() { 00024 @Override 00025 public void run() { 00026 while (!terminated) 00027 processEvent(); 00028 } 00029 }); 00030 boolean terminated = false; 00031 00032 public JobDoneEventQueue() { 00033 currentThread.setDaemon(true); 00034 currentThread.setName("JobDoneEventQueue"); 00035 currentThread.start(); 00036 } 00037 00038 public void onJobDone(JobDoneEvent event, Listener<JobDoneEvent> listener) { 00039 try { 00040 queue.put(new JobEventInternal(listener, event)); 00041 } catch (InterruptedException e) { 00042 e.printStackTrace(); 00043 } 00044 if (terminated) 00045 processEvents(); 00046 } 00047 00048 protected void processEvents() { 00049 while (!queue.isEmpty()) { 00050 processEvent(); 00051 } 00052 } 00053 00054 synchronized void processEvent() { 00055 try { 00056 JobEventInternal event = queue.take(); 00057 if (event.listener != null) 00058 event.listener.listen(event.jobDoneEvent); 00059 onJobDone.fire(event.jobDoneEvent); 00060 } catch (InterruptedException e) { 00061 } 00062 } 00063 00064 public boolean isEmpty() { 00065 return queue.isEmpty(); 00066 } 00067 00068 public void dispose() { 00069 terminated = true; 00070 currentThread.interrupt(); 00071 } 00072 }