For a messenger program I am writing a Server class that will run a non-blocking server in one thread. A messenger thread will use this server thread to communicate with other clients.
What the server should do
- Accept connections from / connect to other instances of the server and associate the selection keys for the connections in
Map<Integer, SelectionKey> keyswit an ID so the messenger thread can access the connections by ID - Read from / write to connections
- Store incoming messages in a queue
- Messenger thread can
- Fetch incoming messages
- Queue messages to be sent:
send_message(int id, String msg)
package snserver; /* imports */ //class SNServer (Simple non-blocking Server) public class SNServer extends Thread { private int port; private Selector selector; private ConcurrentMap<Integer, SelectionKey> keys; // ID -> associated key private ConcurrentMap<SocketChannel,List<byte[]>> dataMap_out; ConcurrentLinkedQueue<String> in_msg; //incoming messages to be fetched by messenger thread public SNServer(int port) { this.port = port; dataMap_out = new ConcurrentHashMap<SocketChannel, List<byte[]>>(); keys = new ConcurrentHashMap<Integer, SelectionKey>(); } public void start_server() throws IOException { // create selector and channel this.selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // bind to port InetSocketAddress listenAddr = new InetSocketAddress((InetAddress)null, this.port); serverChannel.socket().bind(listenAddr); serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); log("Echo server ready. Ctrl-C to stop."); // processing while (true) { // wait for events this.selector.select(); // wakeup to work on selected keys Iterator keys = this.selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = (SelectionKey) keys.next(); // this is necessary to prevent the same key from coming up // again the next time around. keys.remove(); if (! key.isValid()) { continue; } if (key.isAcceptable()) { this.accept(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } else if(key.isConnectable()) { this.connect(key); } } } } private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); send_message(key, "Welcome."); //DEBUG Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connected to: " + remoteAddr); // register channel with selector for further IO dataMap_out.put(channel, new ArrayList<byte[]>()); channel.register(this.selector, SelectionKey.OP_READ); //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID keys.put(0, key); } //TODO verify, test public void init_connect(String addr, int port){ try { SocketChannel channel = createSocketChannel(addr, port); channel.register(this.selector, channel.validOps()/*, SelectionKey.OP_?*/); } catch (IOException e) { //TODO handle } } //TODO verify, test private void connect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); try { channel.finishConnect(); //try to finish connection - if 'false' is returned keep 'OP_CONNECT' registered //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID keys.put(0, key); } catch (IOException e0) { try { //TODO handle ok? channel.close(); } catch (IOException e1) { //TODO handle } } } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); int numRead = -1; try { numRead = channel.read(buffer); } catch (IOException e) { e.printStackTrace(); } if (numRead == -1) { this.dataMap_out.remove(channel); Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connection closed by client: " + remoteAddr); //TODO handle channel.close(); return; } byte[] data = new byte[numRead]; System.arraycopy(buffer.array(), 0, data, 0, numRead); in_msg.add(new String(data, "utf-8")); } private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); List<byte[]> pendingData = this.dataMap_out.get(channel); Iterator<byte[]> items = pendingData.iterator(); while (items.hasNext()) { byte[] item = items.next(); items.remove(); //TODO is this correct? -> re-doing write in loop with same buffer object ByteBuffer buffer = ByteBuffer.wrap(item); int bytes_to_write = buffer.capacity(); while (bytes_to_write > 0) { bytes_to_write -= channel.write(buffer); } } key.interestOps(SelectionKey.OP_READ); } public void queue_data(SelectionKey key, byte[] data) { SocketChannel channel = (SocketChannel) key.channel(); List<byte[]> pendingData = this.dataMap_out.get(channel); key.interestOps(SelectionKey.OP_WRITE); pendingData.add(data); } public void send_message(int id, String msg) { SelectionKey key = keys.get(id); if (key != null) send_message(key, msg); //else //TODO handle } public void send_message(SelectionKey key, String msg) { try { queue_data(key, msg.getBytes("utf-8")); } catch (UnsupportedEncodingException ex) { //is not thrown: utf-8 is always defined } } public String get_message() { return in_msg.poll(); } private static void log(String s) { System.out.println(s); } @Override public void run() { try { start_server(); } catch (IOException e) { System.out.println("IOException: " + e); //TODO handle exception } } // Creates a non-blocking socket channel for the specified host name and port. // connect() is called on the new channel before it is returned. public static SocketChannel createSocketChannel(String hostName, int port) throws IOException { // Create a non-blocking socket channel SocketChannel sChannel = SocketChannel.open(); sChannel.configureBlocking(false); // Send a connection request to the server; this method is non-blocking sChannel.connect(new InetSocketAddress(hostName, port)); return sChannel; } } General problem
Because I am new to Java and networking there may be several things incorrect or not good in this code. Please help me improve this code so it does what I'd like it to do. Also give suggestions to improve the concept!
Current problems:
- After calling
init_connect()there seems to be no event on the selector so the connection is not built.