2
\$\begingroup\$

For a messenger program I am writing a Server class that will run a non-blocking server in one thread. A messenger thread will use this server thread to communicate with other clients.

What the server should do

  • Accept connections from / connect to other instances of the server and associate the selection keys for the connections in Map<Integer, SelectionKey> keys wit an ID so the messenger thread can access the connections by ID
  • Read from / write to connections
  • Store incoming messages in a queue
  • Messenger thread can
    • Fetch incoming messages
    • Queue messages to be sent: send_message(int id, String msg)

package snserver; /* imports */ //class SNServer (Simple non-blocking Server) public class SNServer extends Thread { private int port; private Selector selector; private ConcurrentMap<Integer, SelectionKey> keys; // ID -> associated key private ConcurrentMap<SocketChannel,List<byte[]>> dataMap_out; ConcurrentLinkedQueue<String> in_msg; //incoming messages to be fetched by messenger thread public SNServer(int port) { this.port = port; dataMap_out = new ConcurrentHashMap<SocketChannel, List<byte[]>>(); keys = new ConcurrentHashMap<Integer, SelectionKey>(); } public void start_server() throws IOException { // create selector and channel this.selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // bind to port InetSocketAddress listenAddr = new InetSocketAddress((InetAddress)null, this.port); serverChannel.socket().bind(listenAddr); serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); log("Echo server ready. Ctrl-C to stop."); // processing while (true) { // wait for events this.selector.select(); // wakeup to work on selected keys Iterator keys = this.selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = (SelectionKey) keys.next(); // this is necessary to prevent the same key from coming up // again the next time around. keys.remove(); if (! key.isValid()) { continue; } if (key.isAcceptable()) { this.accept(key); } else if (key.isReadable()) { this.read(key); } else if (key.isWritable()) { this.write(key); } else if(key.isConnectable()) { this.connect(key); } } } } private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); send_message(key, "Welcome."); //DEBUG Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connected to: " + remoteAddr); // register channel with selector for further IO dataMap_out.put(channel, new ArrayList<byte[]>()); channel.register(this.selector, SelectionKey.OP_READ); //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID keys.put(0, key); } //TODO verify, test public void init_connect(String addr, int port){ try { SocketChannel channel = createSocketChannel(addr, port); channel.register(this.selector, channel.validOps()/*, SelectionKey.OP_?*/); } catch (IOException e) { //TODO handle } } //TODO verify, test private void connect(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); try { channel.finishConnect(); //try to finish connection - if 'false' is returned keep 'OP_CONNECT' registered //store key in 'keys' to be accessable by ID from messenger thread //TODO first get ID keys.put(0, key); } catch (IOException e0) { try { //TODO handle ok? channel.close(); } catch (IOException e1) { //TODO handle } } } private void read(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(8192); int numRead = -1; try { numRead = channel.read(buffer); } catch (IOException e) { e.printStackTrace(); } if (numRead == -1) { this.dataMap_out.remove(channel); Socket socket = channel.socket(); SocketAddress remoteAddr = socket.getRemoteSocketAddress(); log("Connection closed by client: " + remoteAddr); //TODO handle channel.close(); return; } byte[] data = new byte[numRead]; System.arraycopy(buffer.array(), 0, data, 0, numRead); in_msg.add(new String(data, "utf-8")); } private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); List<byte[]> pendingData = this.dataMap_out.get(channel); Iterator<byte[]> items = pendingData.iterator(); while (items.hasNext()) { byte[] item = items.next(); items.remove(); //TODO is this correct? -> re-doing write in loop with same buffer object ByteBuffer buffer = ByteBuffer.wrap(item); int bytes_to_write = buffer.capacity(); while (bytes_to_write > 0) { bytes_to_write -= channel.write(buffer); } } key.interestOps(SelectionKey.OP_READ); } public void queue_data(SelectionKey key, byte[] data) { SocketChannel channel = (SocketChannel) key.channel(); List<byte[]> pendingData = this.dataMap_out.get(channel); key.interestOps(SelectionKey.OP_WRITE); pendingData.add(data); } public void send_message(int id, String msg) { SelectionKey key = keys.get(id); if (key != null) send_message(key, msg); //else //TODO handle } public void send_message(SelectionKey key, String msg) { try { queue_data(key, msg.getBytes("utf-8")); } catch (UnsupportedEncodingException ex) { //is not thrown: utf-8 is always defined } } public String get_message() { return in_msg.poll(); } private static void log(String s) { System.out.println(s); } @Override public void run() { try { start_server(); } catch (IOException e) { System.out.println("IOException: " + e); //TODO handle exception } } // Creates a non-blocking socket channel for the specified host name and port. // connect() is called on the new channel before it is returned. public static SocketChannel createSocketChannel(String hostName, int port) throws IOException { // Create a non-blocking socket channel SocketChannel sChannel = SocketChannel.open(); sChannel.configureBlocking(false); // Send a connection request to the server; this method is non-blocking sChannel.connect(new InetSocketAddress(hostName, port)); return sChannel; } } 

General problem

Because I am new to Java and networking there may be several things incorrect or not good in this code. Please help me improve this code so it does what I'd like it to do. Also give suggestions to improve the concept!

Current problems:

  • After calling init_connect() there seems to be no event on the selector so the connection is not built.
\$\endgroup\$

3 Answers 3

1
\$\begingroup\$

Recently I came across a nice library for doing network stuff in Java called netty. It's not even a library - it is rather a framework for building scalable apps, so it imposes some architecture decisions on application architecture. Even though using it might be an overkill in your situation I suggest you to check its docs out as you might learn the approaches implemented there.

What I can see can be improved:

  • Code layering. In your code you have everything in one place: network handling, packet queuing and so on. So it is difficult to see which is where, to test and debug it. I'd try to separate the layers somehow.
  • Separation of network and business logic threads. In-case your business logic or I/O gets significantly more load then the counterpart the whole performance will degrade since it is done in a single thread. This might be not an issue in this particular case since the logic is rather simple, but nevertheless. If you move the business logic into a separate thread I/O won't block waiting for it to perform its work and the overall performance will increase. The pitfall is the proper communication/synchronization between the threads so that they don't thrash the data.

The points above are pretty much handled in that lib, so I suggest you to peek into their docs and examples to understand the rationale and the way they implemented it. Even if you won't opt for it you might learn something useful for your project.

P.S.: I'm not affiliated neither with JBoss nor with netty, but I really fell in love with it.

\$\endgroup\$
0
\$\begingroup\$

Short answer

Use a state machine.

Longer answer

I haven't read your code in detail, but I already worked with the Java's non-blocking sockets. So I can say it's pretty difficult to get it right.

I once inherited a codebase that was similar to your code. My task was to write unit tests, in order to be close to 100% branches coverage. I tried to write tests without touching the code, but there are so much states to track (the selector, the buffers, the socket's state) that I was not able to reliably test the code.

So, I re-wrote the code, using a state-machine to encapsulate these states. I ended up with 7 small classes, each one representing the state of the sockets, or the state of the request processing. The resulting code was more robust (many missing edge-cases became obvious), easier to understand and to maintain. Writing the tests was easier, then.

If I had to do it again, I'd try to use Netty (as Ihor suggested). Netty encapsulates the states too, and manages the connection errors in a more unified way.

\$\endgroup\$
0
\$\begingroup\$

I have little experience with networking, so I cannot criticize the logic itself, alas. I have some minor remarks on the form, though:

  • It is controversial because you will find some people proponent of this style, but I personally (and it is shared by many people!) avoid to use this.fieldName outside of constructors, because it is redundant. I see it as visual noise. It can help distinguishing local variables from fields, I chose to use a prefix for that, which is controversial too...
  • in_msg field isn't declared private, not sure if it is intentional.
  • Don't swallow the exception in init_connect, you might miss some problem...

That's most of what I can think of; you might need to clean up resources if you catch an exception at the level of run().

\$\endgroup\$
1
  • \$\begingroup\$ Thanks that you try to help! 1) I normally do not use "this" but the code is based on examples of the internet and I didn't change everything. 2)+3) At the moment I am just trying to get the server working so I didn't really care about data encapsulation etc. Also see the "TODO"s in the code... I do not really know how to handle the exception in init_connect so I left it as a TODO. Anyway I'll change the code a little now. \$\endgroup\$ Commented Aug 29, 2011 at 9:43

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.