package thorium; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; /** * The main connection class. This is instanced once thorium is started up. * @param the td of this class. * @date 080213 * @author Stacx */ public class Connection implements EventSource { public interface Callback { void messages(Connection connection, Iterator messages); void idle(Connection connection); void closed(Connection connection); void garbledMessage(String message, byte[] data); } private List txBuffers = Collections.synchronizedList(new LinkedList()); private ByteBuffer rxBuffer = ByteBuffer.allocate(4096); private SelectionKey selectionKey; private SocketChannel channel; private MessageParser parser; private Callback callback; public static Connection attemptToConnect(InetSocketAddress address, MessageParser parser, Callback callback) throws IOException { return attemptToConnect(address, parser, callback, 3, 100); } public static Connection attemptToConnect(InetSocketAddress address, MessageParser parser, Callback callback, int maxNumAttempts, int retryDelay) throws IOException { for (int numAttempts = 0; numAttempts < maxNumAttempts; numAttempts++) { try { return Connection.connect(address, parser, callback); } catch (ConnectException e1) { try { Thread.sleep(retryDelay); } catch (InterruptedException e2) { } } } throw new ConnectException("Could not be connected"); } public static Connection connect(InetSocketAddress address, MessageParser parser, Callback callback) throws IOException { SocketChannel channel = SocketChannel.open(); channel.connect(address); channel.configureBlocking(false); return new Connection(channel, parser, callback); } public static Connection accept(InetSocketAddress address, MessageParser parser, Callback callback) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket socket = serverChannel.socket(); socket.bind(address); SocketChannel channel = serverChannel.accept(); serverChannel.close(); channel.configureBlocking(false); return new Connection(channel, parser, callback); } public Connection(SocketChannel channel, MessageParser parser, Callback callback) { this.channel = channel; this.callback = callback; this.parser = parser; } @Override public SelectionKey register(Selector selector, int ops) throws IOException { return selectionKey = channel.register(selector, ops); } @Override public void unregister() { callback.closed(this); } @Override public void read(SelectionKey key) throws IOException { SocketChannel sc = (SocketChannel) key.channel(); if (sc.isOpen()) { int len; try { len = sc.read(rxBuffer); } catch (IOException e) { len = -1; } if (len > 0) { Iterator messages = parse(); if (messages.hasNext()) { callback.messages(this, messages); } } else if (len < 0) { close(); } } } private Iterator parse() throws IOException { rxBuffer.flip(); List result = new ArrayList(); while (rxBuffer.hasRemaining()) { rxBuffer.mark(); try { result.add(parser.parse(rxBuffer)); } catch (PartialMessageException e) { rxBuffer.reset(); break; } catch (GarbledMessageException e) { callback.garbledMessage(e.getMessage(), e.getMessageData()); } } rxBuffer.compact(); return result.iterator(); } public void send(byte[] byteArray) { send(ByteBuffer.wrap(byteArray)); } public void send(Message message) { send(message.toByteBuffer()); } public void send(ByteBuffer buffer) { txBuffers.add(buffer); if (selectionKey == null) throw new IllegalStateException("Connection is not registered"); selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); selectionKey.selector().wakeup(); } @Override public void write(SelectionKey key) throws IOException { try { flush(); } catch (IOException e) { close(); } if (txBuffers.isEmpty()) key.interestOps(SelectionKey.OP_READ); } public void close() { try { while (!txBuffers.isEmpty()) flush(); } catch (IOException e) { } SocketChannel sc = (SocketChannel) selectionKey.channel(); Socket socket = sc.socket(); if (socket != null) { try { socket.shutdownInput(); } catch (IOException e) { } try { socket.shutdownOutput(); } catch (IOException e) { } try { socket.close(); } catch (IOException e) { } } try { sc.close(); } catch (IOException e) { } selectionKey.attach(null); selectionKey.cancel(); selectionKey.selector().wakeup(); } private void flush() throws IOException { synchronized(txBuffers) { while (!txBuffers.isEmpty()) { ByteBuffer txBuffer = txBuffers.get(0); if (!write(txBuffer)) { break; } txBuffers.remove(0); } } } private boolean write(ByteBuffer txBuffer) throws IOException { while (txBuffer.hasRemaining()) { if (channel.write(txBuffer) == 0) { return false; } } return true; } @Override public void timeout() { callback.idle(this); } @Override public EventSource accept(SelectionKey key) throws IOException { throw new UnsupportedOperationException(); } @Override public boolean isClosed() { SocketChannel sc = (SocketChannel) selectionKey.channel(); return !sc.isOpen(); } }