RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.rltoys.experiments.scheduling.internal.network; 00002 00003 import java.io.IOException; 00004 import java.io.InputStream; 00005 import java.io.OutputStream; 00006 import java.net.Socket; 00007 00008 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.ClientInfo; 00009 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Message; 00010 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageBinary; 00011 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageClassData; 00012 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageJob; 00013 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageRequestClass; 00014 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageRequestJob; 00015 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.MessageSendClientInfo; 00016 import rlpark.plugin.rltoys.experiments.scheduling.internal.messages.Messages; 00017 00018 public class SyncSocket { 00019 private final Socket socket; 00020 private final InputStream in; 00021 private final OutputStream out; 00022 00023 public SyncSocket(Socket socket) { 00024 this.socket = socket; 00025 out = outputStream(); 00026 in = inputStream(); 00027 } 00028 00029 private InputStream inputStream() { 00030 InputStream in = null; 00031 try { 00032 in = socket.getInputStream(); 00033 } catch (IOException e) { 00034 e.printStackTrace(); 00035 close(); 00036 } 00037 return in; 00038 } 00039 00040 private OutputStream outputStream() { 00041 OutputStream out = null; 00042 try { 00043 out = socket.getOutputStream(); 00044 } catch (IOException e) { 00045 e.printStackTrace(); 00046 close(); 00047 } 00048 return out; 00049 } 00050 00051 private MessageBinary transaction(Message message) { 00052 MessageBinary messageBinary = null; 00053 synchronized (out) { 00054 synchronized (in) { 00055 write(message); 00056 messageBinary = read(); 00057 } 00058 } 00059 return messageBinary; 00060 } 00061 00062 public MessageBinary read() { 00063 if (isClosed()) 00064 return null; 00065 MessageBinary messageBinary = new MessageBinary(); 00066 synchronized (in) { 00067 try { 00068 messageBinary.read(in); 00069 } catch (IOException e) { 00070 Messages.displayError(e); 00071 close(); 00072 return null; 00073 } 00074 } 00075 Messages.debug(this.toString() + " reads " + messageBinary.type().toString()); 00076 return messageBinary; 00077 } 00078 00079 synchronized public void write(Message message) { 00080 Messages.debug(this.toString() + " writes " + message.type().toString()); 00081 if (isClosed()) 00082 return; 00083 synchronized (out) { 00084 try { 00085 message.write(out); 00086 } catch (IOException e) { 00087 Messages.displayError(e); 00088 close(); 00089 } 00090 } 00091 } 00092 00093 public MessageClassData classTransaction(String className) { 00094 Messages.println("Downloading code for " + className); 00095 MessageClassData messageClassData = null; 00096 try { 00097 MessageBinary message = transaction(new MessageRequestClass(className)); 00098 messageClassData = (MessageClassData) Messages.cast(message, null); 00099 } catch (Throwable e) { 00100 Messages.displayError(e); 00101 close(); 00102 } 00103 return messageClassData; 00104 } 00105 00106 public MessageJob jobTransaction(ClassLoader classLoader) { 00107 MessageJob messageJobTodo = null; 00108 try { 00109 MessageBinary message = transaction(new MessageRequestJob()); 00110 if (message == null) 00111 return null; 00112 messageJobTodo = (MessageJob) Messages.cast(message, classLoader); 00113 } catch (Throwable e) { 00114 Messages.displayError(e); 00115 close(); 00116 } 00117 return messageJobTodo; 00118 } 00119 00120 public void sendClientInfo(ClientInfo clientInfo) { 00121 write(new MessageSendClientInfo(clientInfo)); 00122 } 00123 00124 public void close() { 00125 try { 00126 socket.close(); 00127 } catch (IOException e) { 00128 Messages.displayError(e); 00129 } 00130 } 00131 00132 public boolean isClosed() { 00133 return socket.isClosed(); 00134 } 00135 00136 public static Message readNextMessage(SyncSocket clientSocket) { 00137 return readNextMessage(clientSocket, null); 00138 } 00139 00140 public static Message readNextMessage(SyncSocket clientSocket, ClassLoader classLoader) { 00141 Message message = null; 00142 try { 00143 MessageBinary nextClientMessage = clientSocket.read(); 00144 if (nextClientMessage == null) 00145 return null; 00146 message = Messages.cast(nextClientMessage, classLoader); 00147 } catch (Throwable e) { 00148 Messages.displayError(e); 00149 clientSocket.close(); 00150 return null; 00151 } 00152 return message; 00153 } 00154 }