Showing posts with label JAVA distributed programming network programming. Show all posts
Showing posts with label JAVA distributed programming network programming. Show all posts

Wednesday, January 29, 2014

JAVA NIO Networking tutorial

In the NIO tutorial, I provided an overview of the nio package in JAVA that enables developers to build scalable applications using non blocking techniques. In this tutorial I will focus on the networking parts of the nio package. We will build a simple Echo server and Echo client using non blocking sockets.

The conventional way of writing networking applications is using Socket and ServerSocket classes in java.net package. If you are new to network programming, you may read the tutorial from Oracle at All about Sockets.  The Oracle tutorial cover Sockets where the IO thread blocks. You might say - this works for me and why should I care about non blocking sockets ? When writing a server using blocking sockets, once the server accepts a connection, a thread is created for the peer client socket which handles the IO with the client. There will be times when this thread is doing nothing but waiting for IO. That is not a big issue if the server just has a few clients. But if server needs to handle tens of thousands of concurrent clients, you will end creating thousands of threads that may or may not be doing work and this approach will exhaust the operating system resources.

In NIO networking:
  • The networking stuff such as accepting connections, reading from a peer socket, writing to a socket happens generally from a single thread
  • The processing of data that is read and preparing data to write is done in worker threads.
  • In the main thread a ServerSocketChannel registers an interest in events with a selector.
  • The selector waits for events to occur. Events could be socket requesting a connection, socket is ready for writing , socket is ready for reading.
  • When events occurs the select method returns and the main thread retrieves the event and the peer socket channel.
  • If the event is an accept event, the ServerSocketChannel might accept the connection and register an interest in a read event for the peer socket.
  • If the event is a read event, the thread can read the data from the peer socket and hand it to a worker thread for processing. It can further register an interest in waiting for an event where the socket can be written to.
  • If the event is a write event, the thread can take data intended for this peer that is queued somewhere and write it to the peer socket. It may then register an interest for the next read event from this peer.

Note that the read and writes happen only where the sockets are ready for reading or writing. Let us use the above concepts and write a simple EchoServer and EchoClient. The EchoClient sends lines of text to the EchoServer which echoes each line back.

EchoServer
The complete code is in the file EchoServer.java

Step 1: Create a ServerSocketChannel and bind the local address

        private ServerSocketChannel serverchannel ;
        private Selector selector ;
        private HashMap queuedWrites = 

                   new HashMap();

        serverchannel = ServerSocketChannel.open() ;
        serverchannel.configureBlocking(false) ;
        serverchannel.socket().bind(new InetSocketAddress("localhost",8081));  

The ServerSocketChannel is the channel on which the server will accept stream oriented connections. Selector is what lets you multiplex several connections without blocking for IO. The hashmap is used to queue data to be written on individual connections. This is necessary since we do not want to block on writes (or reads).

Step 2 : Create a selector and register interest in accept event. Run a loop waiting for events.

       selector = Selector.open();
       serverchannel.register(selector, SelectionKey.OP_ACCEPT) ;
           
       while(true) {        
                selector.select() ;
                Iterator keysIterator = selector.selectedKeys().iterator() ;
                while(keysIterator.hasNext()) {                 
                    SelectionKey key = keysIterator.next();
                    

                    // handle the event
                   if (key.isAcceptable()) {
                   } else if (key.isReadable()) {
                   } else if (key.isWritable()) {

                  }
                }

           }

The outer while loop is there so that the server stays up and running waiting for events to happen. In a nutshell, it outlines what a server program does. A Selector is created and we register an interest in the ACCEPT event. The select call is the only call in the program that blocks waiting for events to happen.  We are now waiting for connections. The select call returns when there is an event on a channel. The SelectionKeys tell which event on what channel. The server program takes action based on the event which could be read, write ,accept or connect.

Step 3. Accept a connection. Register interest in a read event.

        if (key.isAcceptable()) {               
                ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking( false );
                sc.register( selector, SelectionKey.OP_READ );
       }

The SelectionKey has the peer channel. Code accepts connection for that channel. Once the connection is accepted, the server is going to have to read some data sent by the client. Hence we register interest in read event.

Step 4. If there is a read event, read the socket. Hand off the data to the worker queue. Register interest in a write event for the socket.

        if (key.isReadable()) { {
                SocketChannel sc = (SocketChannel)key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(8192);                       
                int numread ;
                while (true) { // read till some data is available
                         numread = sc.read( readBuffer );
                         if (numread <=0) {
                              break;
                         }   
                }       
                if (numread == -1) {
                        // Remote entity shut the socket down cleanly. Do the
                        // same from our end and cancel the channel.
                        key.channel().close();
                        key.cancel();
                        continue ;
               }
               readBuffer.flip() ;
               queuedWrites.put(sc,readBuffer) ;   
               key.interestOps(SelectionKey.OP_WRITE) ;

The client has sent some data and the server reads it. Generally a server will want to send a response to the client. Since this is an EchoServer, the server just queues the data read for writing back when the channel is ready for writing.

Step 5. When socket is ready for write, get data from the queue and write it to the socket.

         if (key.isWritable())
                 SocketChannel sc = (SocketChannel)key.channel();     
                 ByteBuffer towrite = queuedWrites.get(sc) ;      
                 while (true) {
                          int n = sc.write(towrite) ;
                          if (n == 0 || towrite.remaining() == 0)
                                break ;
                  }     
                 key.interestOps(SelectionKey.OP_READ) ;
        }

After writing, be ready to read the next message from the client. To recap, the server is in a loop, accepting connections, reading and writing to channels.

EchoClient
The complete code is in the file EchoClient.java

Step 1: Create a Socket Channel and a Selector. Register interest in a Connect event.

        selector = Selector.open() ;
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_CONNECT) ;


Step 2 :  Initiate a connection to the EchoServer

       // Kick off connection establishment. Client initiates connection to server
        clientChannel.connect(new InetSocketAddress("localhost", 8081));


Step 3: Main client loop similar to the server loop

        while(true) {                     
                 // queue a write so it can be written when channel ready
                writeQueue.add("This is line " + i) ;
                selector.select() ;  // wait for events
           
                Iterator skeys = selector.selectedKeys().iterator() ;           
                while (skeys.hasNext()) {
                        SelectionKey key = (SelectionKey) skeys.next();
                        skeys.remove();
                        if (!key.isValid()) {
                             continue;
                        }

                       // Check what event is available and deal with it
                      if (key.isConnectable()) {  // server has accepted connection
                           finishConnection(key);
                      } else if (key.isReadable()) { // socket is ready for reading
                          read(key);
                     } else if (key.isWritable()) {  // socket ready for writing
                         write(key);
                    }
               }
        }


Step 3 : When a connect event is received from the server, finish the connection. Register interest in a Write event on the socket.

        private void finishConnection(SelectionKey key) throws IOException        
                clientChannel.finishConnect() ; // tcp handshake
                key.interestOps(SelectionKey.OP_WRITE) ;   
        }


Step 4 : When the socket is ready for write, write a queued message to the socket. Register interest in a read event from the socket.

        private void write(SelectionKey key) throws IOException {    
                String toWrite = writeQueue.pollFirst() ;       

                if (toWrite != null) {           
                        ByteBuffer b ;
                        b = ByteBuffer.wrap(toWrite.getBytes()) ;          
                       while (true) {
                              int n = clientChannel.write(b) ;

                              if (n == 0 || b.remaining() == 0)
                             break ;
                        }   
                  }
              key.interestOps(SelectionKey.OP_READ) ;

       }


Step 5: When the server echos the message, a read event is there on the socket. Read the message and print out the echo.

        public void read(SelectionKey key) throws IOException {
                readBuf.clear() ;       
                while (true) {
                       int numread = clientChannel.read( readBuf );
                       if (numread <=0) {
                             break;
                      }                     
               }
               System.out.println("Read Echo from server:" + new String(readBuf.array()))  ;       

               key.interestOps(SelectionKey.OP_WRITE) ;
        }

You will notice that in both the client and the server, the read and write operations are in a loop. This is because we working with TCP or stream oriented sockets. Each read returns only as much data there is the socket buffer which might be less than what you need, but more data might be arriving. Similarly on write, each write will write only as much data as much space there is in the socket buffer. So you might need to call write multiple times for all data in your buffer to be written. This is no different that traditional sockets programming.

In conclusion, writing NIO networking programs is a little more involved than traditional blocking sockets programming. But they will scale much more and handle many more concurrent connections.