RLPark 1.0.0
Reinforcement Learning Framework in Java
|
00001 package rlpark.plugin.robot.internal.disco.io; 00002 00003 import java.io.DataInputStream; 00004 import java.io.DataOutputStream; 00005 import java.io.IOException; 00006 import java.net.Socket; 00007 import java.net.UnknownHostException; 00008 import java.nio.ByteOrder; 00009 import java.util.Arrays; 00010 00011 import rlpark.plugin.robot.internal.disco.drops.Drop; 00012 import rlpark.plugin.robot.internal.disco.drops.DropString; 00013 import rlpark.plugin.robot.internal.disco.io.DiscoPacket.Direction; 00014 import rlpark.plugin.robot.internal.sync.LiteByteBuffer; 00015 import zephyr.plugin.core.api.signals.Signal; 00016 import zephyr.plugin.core.api.synchronization.Chrono; 00017 00018 public class DiscoSocket { 00019 static final boolean Verbose = false; 00020 static final float TIMEOUT = 30; 00021 private final ByteOrder byteOrder; 00022 protected Socket socket; 00023 private final DataInputStream in; 00024 private final DataOutputStream out; 00025 final public Signal<DiscoPacket> onPacket = new Signal<DiscoPacket>(); 00026 public double readLatency; 00027 private final LiteByteBuffer sizeBuffer; 00028 00029 public DiscoSocket(int port) throws UnknownHostException, IOException { 00030 this("localhost", port); 00031 } 00032 00033 public DiscoSocket(String host, int serverPort) throws UnknownHostException, IOException { 00034 this(host, serverPort, ByteOrder.BIG_ENDIAN); 00035 } 00036 00037 public DiscoSocket(String host, int serverPort, ByteOrder byteOrder) throws UnknownHostException, IOException { 00038 this(createSocket(host, serverPort), byteOrder); 00039 } 00040 00041 static private void log(String message) { 00042 if (Verbose) 00043 System.out.print(message); 00044 System.out.flush(); 00045 } 00046 00047 static private Socket createSocket(String host, int serverPort) throws UnknownHostException, IOException { 00048 Chrono chrono = new Chrono(); 00049 Socket socket = null; 00050 log(String.format("Connecting to %s:%d", host, serverPort)); 00051 while (socket == null) 00052 try { 00053 socket = new Socket(host, serverPort); 00054 } catch (java.net.ConnectException e) { 00055 if (chrono.getCurrentChrono() > TIMEOUT) 00056 throw e; 00057 try { 00058 Thread.sleep(1000); 00059 log("."); 00060 } catch (InterruptedException ie) { 00061 throw new RuntimeException(e); 00062 } 00063 } 00064 log(" ok\n"); 00065 return socket; 00066 } 00067 00068 public DiscoSocket(Socket socket, ByteOrder byteOrder) throws IOException { 00069 this.socket = socket; 00070 in = new DataInputStream(socket.getInputStream()); 00071 out = new DataOutputStream(socket.getOutputStream()); 00072 this.byteOrder = byteOrder; 00073 sizeBuffer = allocate(4); 00074 } 00075 00076 private LiteByteBuffer allocate(int capacity) { 00077 return new LiteByteBuffer(capacity, byteOrder); 00078 } 00079 00080 synchronized public void send(Drop sendDrop) throws IOException { 00081 LiteByteBuffer buffer = allocate(sendDrop.packetSize()); 00082 sendDrop.putData(buffer); 00083 byte[] byteArray = Arrays.copyOfRange(buffer.array(), sendDrop.headerSize(), buffer.array().length); 00084 onPacket.fire(new DiscoPacket(Direction.Send, sendDrop.name(), buffer.order(), byteArray)); 00085 out.write(buffer.array(), 0, buffer.capacity()); 00086 } 00087 00088 private String readName() throws IOException { 00089 int stringSize = readSize(); 00090 if (stringSize > 100 || stringSize <= 0) 00091 throw new RuntimeException("Name error: length is not > 0 && < 100"); 00092 LiteByteBuffer stringBuffer = allocate(stringSize); 00093 in.readFully(stringBuffer.array(), 0, stringSize); 00094 return DropString.getData(stringBuffer, 0); 00095 } 00096 00097 protected int readSize() throws IOException { 00098 sizeBuffer.clear(); 00099 in.readFully(sizeBuffer.array()); 00100 return sizeBuffer.getInt(); 00101 } 00102 00103 synchronized public void close() { 00104 try { 00105 socket.close(); 00106 } catch (IOException e) { 00107 e.printStackTrace(); 00108 } 00109 } 00110 00111 public DiscoPacket recv() throws IOException { 00112 String name = readName(); 00113 LiteByteBuffer buffer = allocate(readSize()); 00114 in.readFully(buffer.array(), 0, buffer.capacity()); 00115 DiscoPacket packet = new DiscoPacket(Direction.Recv, name, buffer); 00116 onPacket.fire(packet); 00117 return packet; 00118 } 00119 00120 public int dataAvailable() throws IOException { 00121 return in.available(); 00122 } 00123 00124 public boolean isSocketClosed() { 00125 return socket == null || socket.isClosed() || !socket.isConnected() || !socket.isBound(); 00126 } 00127 }