Problem with first NIO code

I was playing with NIO and came up with the following questions to which I couldn’t find the answer even after hours of research:

  1. I want to write a very basic IRC Server and want to send a message to every user connected on the server. So when I read a message from a key in OP_READ state, I save the message somewhere and when a key arrives in OP_WRITE state, I write the message in the channel.

But how can I make sure to have written the message to every connection once. I mean, let’s suppose some channels are occupied sending data, then they might miss the message. Or perhaps a connection is really fast and comes 2x in OP_WRITE state, then he gets it twice.

Do I need to keep track of the clients and save to whom I wrote the message already ?

  1. When I get a new connection, I set the connection to OP_READ|OP_WRITE but as i am in an infinite loop and OP_WRITE is ready all the time, it gives me 100% CPU usage ?

So should I not set the channels to OP_WRITE and just try to find out when to set them to write…or how is this handled ?

I came up with something but have no idea wether this works or not:

  1. register every key to READ only.
  2. make a ByteBuffer for every key
  3. if message received, get a list of all the keys and try to write in their bytebuffer until limit is reached. If limit is reached, set them to WRITE if not, message is sent and we are happy.
  4. if key comes in WRITE, write from their bytebuffer until limit. If limit leave WRITE, else set them to READ.

Does this work ?

http://javagamesfactory.com/articles.html

(look for the NIO one)

PS: I’ve made some fixes to that series of articles, and am about to republish them on javagamesfactory.org (when I do, the above URL will have a redirect that should push you to the right place), so if you can wait a few weeks they’ll be better.

I’ve already read them several times, as well as the whole:

http://forum.java.sun.com/thread.jspa?threadID=459338&start=60&tstart=0

My problem is fairly simpel but I can’t seem to find a solution:

I want a simple chat server where, when a client sends a message, every client gets this message.

I have experimented with incoming, outgoing queues…complicating things with each try. There has to be a good solution but I can’t figure out which one. Until now, I came up with this:



import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.*;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

public class Server {

      // HashMap where we store the ByteBuffer of each SelectionKey
      public HashMap out_buffers = new HashMap();

      public ArrayList keys = new ArrayList();

      public ArrayList messages = new ArrayList();

      Charset charset = Charset.forName("ISO-8859-1"); // England and most of

      // Europe

      public Server(int port) throws IOException {
            System.out.println("Start init.");

            Selector selector = Selector.open();

            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);

            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            ss.bind(address);

            ssc.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("Finished init: listening on port " + port);

            for (;;) {
                  // Selector.select() is blocking
                  int num = selector.select();

                  Set selectedKeys = selector.selectedKeys();
                  Iterator it = selectedKeys.iterator();

                  while (it.hasNext()) {
                        SelectionKey key = (SelectionKey) it.next();
                        it.remove();

                        if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                              // new Connection
                              System.out.println("New incoming connection.");

                              SocketChannel newsc = ((ServerSocketChannel) key.channel())
                                          .accept();
                              newsc.configureBlocking(false);

                              SelectionKey selectKey = newsc.register(selector,
                                          SelectionKey.OP_READ);

                              // create a bytebuffer for this SelectionKey
                              keys.add(selectKey);
                              out_buffers.put(selectKey, ByteBuffer.allocate(512));

                        } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

                              SocketChannel newsc = (SocketChannel) key.channel();

                              // we use the SelectionKey to get the buffer
                              String message = new String();
                              ByteBuffer buffer = ByteBuffer.allocate(1024);
                              int bytesEchoed = 0;

                              while (true) {
                                    buffer.clear();
                                    int r = -1;

                                    try {
                                          r = newsc.read(buffer);
                                    } catch (IOException e) {
                                          System.out.println("Client closed connection.");
                                    }

                                    if (r <= 0) {
                                          if (r == -1) {
                                                // the client has disconnected
                                                newsc.close();
                                                keys.remove(key);
                                          }
                                          break;
                                    }
                                    buffer.flip();
                                    message = message.concat(charset.decode(buffer)
                                                .toString());
                                    messages.add(message);
                              }

                              if(message.length() != 0)
                                    System.out.println("Message received:" + message);

                              for (int i = 0; i < keys.size(); i++) {
                                    SelectionKey tmpkey = (SelectionKey) keys.get(i);
                                    out_buffers.put(tmpkey, charset.encode(message));
                                    
                                    tmpkey.channel().register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE );
                              }

                        } else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                              //System.err.println("Ready for write");
                              SocketChannel newsc = (SocketChannel) key.channel();
                              ByteBuffer out = (ByteBuffer)out_buffers.get(key);
                              newsc.write(out);
                              if (!out.hasRemaining()){
                                    key.channel().register(selector, SelectionKey.OP_READ);
                                    out_buffers.remove(key);
                              }
                        } else {
                              System.err.println("Unknown key type");
                        }
                  }

            }
      }

      public static void main(String[] args) {
            if (args.length != 1) {
                  System.out.println("Usage: java Server port");
            } else {
                  try {
                        new Server(Integer.parseInt(args[0]));
                  } catch (NumberFormatException e) {
                        System.out.println("Portnumber must be an integer.");
                  } catch (IOException e) {
                        e.printStackTrace();
                  }
            }
      }
}

the problem I see here is when…let’s say…a READ comes when the WRITE hasn’t been done yet for each connection.

Then the out buffer will be overwritten ?!

Um…if so, why don’t you put a two-element arry on each attachment instead of just one BB? First element is incoming buffer, second is outgoing.

This is what I used to do before I bit the bullet and just used multiple selectors (way easier, and - now that sun have fixed all the critical bugs - the only thing I recommend)

Well well, as I really couldn’t get any satisfying answer at first and because I am so thickheaded, I forced myself and wrote a small SERVER/CLIENT Chat System with NIO to get some experience with NIO.

the complete source can be found here:

http://www.martialartsmovies.net/chat.zip

I hope it helps newbies to understand the NIO functionalities better.

The Server saves all the SelectionKeys and for each SelectionKey, a queue (ArrayList) of messages (String) he still needs to send to this SelectionKey.

I use synchronized functions in the Messages Class for the Client to avoid any problems when multiple messages are received before the messages are read or when the client wants to read a message and there is none etc…(usual multi-threads problem)

If you find any mistakes or if you have suggestions, please post them here.

THANKS !

http://www.martialartsmovies.net/chat.zip

Link is broken :frowning: