RLPark 1.0.0
Reinforcement Learning Framework in Java

DiscoSocket.java

Go to the documentation of this file.
00001 package rlpark.plugin.robot.internal.disco.io;
00002 
00003 import java.io.DataInputStream;
00004 import java.io.DataOutputStream;
00005 import java.io.IOException;
00006 import java.net.Socket;
00007 import java.net.UnknownHostException;
00008 import java.nio.ByteOrder;
00009 import java.util.Arrays;
00010 
00011 import rlpark.plugin.robot.internal.disco.drops.Drop;
00012 import rlpark.plugin.robot.internal.disco.drops.DropString;
00013 import rlpark.plugin.robot.internal.disco.io.DiscoPacket.Direction;
00014 import rlpark.plugin.robot.internal.sync.LiteByteBuffer;
00015 import zephyr.plugin.core.api.signals.Signal;
00016 import zephyr.plugin.core.api.synchronization.Chrono;
00017 
00018 public class DiscoSocket {
00019   static final boolean Verbose = false;
00020   static final float TIMEOUT = 30;
00021   private final ByteOrder byteOrder;
00022   protected Socket socket;
00023   private final DataInputStream in;
00024   private final DataOutputStream out;
00025   final public Signal<DiscoPacket> onPacket = new Signal<DiscoPacket>();
00026   public double readLatency;
00027   private final LiteByteBuffer sizeBuffer;
00028 
00029   public DiscoSocket(int port) throws UnknownHostException, IOException {
00030     this("localhost", port);
00031   }
00032 
00033   public DiscoSocket(String host, int serverPort) throws UnknownHostException, IOException {
00034     this(host, serverPort, ByteOrder.BIG_ENDIAN);
00035   }
00036 
00037   public DiscoSocket(String host, int serverPort, ByteOrder byteOrder) throws UnknownHostException, IOException {
00038     this(createSocket(host, serverPort), byteOrder);
00039   }
00040 
00041   static private void log(String message) {
00042     if (Verbose)
00043       System.out.print(message);
00044     System.out.flush();
00045   }
00046 
00047   static private Socket createSocket(String host, int serverPort) throws UnknownHostException, IOException {
00048     Chrono chrono = new Chrono();
00049     Socket socket = null;
00050     log(String.format("Connecting to %s:%d", host, serverPort));
00051     while (socket == null)
00052       try {
00053         socket = new Socket(host, serverPort);
00054       } catch (java.net.ConnectException e) {
00055         if (chrono.getCurrentChrono() > TIMEOUT)
00056           throw e;
00057         try {
00058           Thread.sleep(1000);
00059           log(".");
00060         } catch (InterruptedException ie) {
00061           throw new RuntimeException(e);
00062         }
00063       }
00064     log(" ok\n");
00065     return socket;
00066   }
00067 
00068   public DiscoSocket(Socket socket, ByteOrder byteOrder) throws IOException {
00069     this.socket = socket;
00070     in = new DataInputStream(socket.getInputStream());
00071     out = new DataOutputStream(socket.getOutputStream());
00072     this.byteOrder = byteOrder;
00073     sizeBuffer = allocate(4);
00074   }
00075 
00076   private LiteByteBuffer allocate(int capacity) {
00077     return new LiteByteBuffer(capacity, byteOrder);
00078   }
00079 
00080   synchronized public void send(Drop sendDrop) throws IOException {
00081     LiteByteBuffer buffer = allocate(sendDrop.packetSize());
00082     sendDrop.putData(buffer);
00083     byte[] byteArray = Arrays.copyOfRange(buffer.array(), sendDrop.headerSize(), buffer.array().length);
00084     onPacket.fire(new DiscoPacket(Direction.Send, sendDrop.name(), buffer.order(), byteArray));
00085     out.write(buffer.array(), 0, buffer.capacity());
00086   }
00087 
00088   private String readName() throws IOException {
00089     int stringSize = readSize();
00090     if (stringSize > 100 || stringSize <= 0)
00091       throw new RuntimeException("Name error: length is not > 0 && < 100");
00092     LiteByteBuffer stringBuffer = allocate(stringSize);
00093     in.readFully(stringBuffer.array(), 0, stringSize);
00094     return DropString.getData(stringBuffer, 0);
00095   }
00096 
00097   protected int readSize() throws IOException {
00098     sizeBuffer.clear();
00099     in.readFully(sizeBuffer.array());
00100     return sizeBuffer.getInt();
00101   }
00102 
00103   synchronized public void close() {
00104     try {
00105       socket.close();
00106     } catch (IOException e) {
00107       e.printStackTrace();
00108     }
00109   }
00110 
00111   public DiscoPacket recv() throws IOException {
00112     String name = readName();
00113     LiteByteBuffer buffer = allocate(readSize());
00114     in.readFully(buffer.array(), 0, buffer.capacity());
00115     DiscoPacket packet = new DiscoPacket(Direction.Recv, name, buffer);
00116     onPacket.fire(packet);
00117     return packet;
00118   }
00119 
00120   public int dataAvailable() throws IOException {
00121     return in.available();
00122   }
00123 
00124   public boolean isSocketClosed() {
00125     return socket == null || socket.isClosed() || !socket.isConnected() || !socket.isBound();
00126   }
00127 }
 All Classes Namespaces Files Functions Variables Enumerations
Zephyr
RLPark