some NIO troubles

Hi all,

I’ve been starting programming a NIO server as a excersise, but I’ve encountered some problems…

1.) I made a simple test with which I want to show the difference between TCP and UDP:

A TCP test client sends a sequence of 50 messages, each message consists of a int identifying its sequence number.
A TCP test server simply listens for incomming messages and detects if all the message are in order and if none are lost.

Now with TCP, the list should ordered and complete (1,2,3…50). Most of the times it is, but sometimes I seem to miss a couple of packets, often in the beginning of the list. (I run both the client and server local). This probably is because of some stack overflow or something? (as i send the messages asap in the client, without pausing between sending messages). But on the other hand, NIO should support handling many simultaneous packets? Or doesn’t that many apply to packets from the same sender?

2.) In my server I have one ByteBuffer which I use to read data from client channels. After i’ve read the contents into the buffer I call processMessage for that client which can process the buffer’s contents. This is how I all tutorials i’ve read seem to do it. But due the asynchronize nature of nio, wouldn’t this lead to problems, as I use only one bytebuffer to read in? What would be a better solution? To assign a new bytebuffer per client that connects to read/write in? And is it wise to use one bytebuffer for each client for both reading AND writing?

3.) I’ve read somewhere that if I want to write something back to the client(s), I should register their socketchannel for SelectionKey.OP_WRITE and after writing set it back to SelectionKey.OP_READ. Is this true? Why not registering each client connection that comes in for both SelectionKey.OP_READ and SelectionKey.OP_WRITE?

Ok, many questions, i hope there will be many answers aswell :wink:

Thanks in advance!

Martijn

Doubtful. Without seeing the code my wild guess is a race condition and/or a bug in hoe you are using the BUffers. Both can be tricky.

Well, reading should all happen on one thread, in turn. Thats sort of the point of NIO. SO yo ushouldnt race on read. HOWEVER the question then arises as to what yo uare doing with that buffer. If your read thread is delivering it diretly through callback to your packet handling code then yo umay be alright because ist still on one thread, though that woudl be abd in general NIOcode because the clietn code coudl block up all reading.

A more genreal solution it so have a queue for each socket and to queue the packet on the apropriate queue BUt then you will have to copy the packet one to put it on the queue because the input buffer will be over-written by the next input.

Is that clear as mud?

Again it depends. if both reads and writes are queued such that only one of either type is being processed at once then its okay… BUT in that case you will have to make copies on the queue anyway when you queue your writes so its not likley to even be an issue.

From the client thread’s poitn of view, it whoudl not try to share a write buffer with the NIo code if the NIO code is running writes asynchronously. Again yo ucan get away with it if you aren’t quing writes, but not queuing writes is a bad idea becacuse if the reader at the other end stops reading your write call acna block the application up indefinitely.

If you register for OP_WRITE then you will wake up whenevr it is possible to write to the socket, which in the case of proeprly functionign code shoudl be almost all the time. Thus yo uwil lend up with a “busy loop” in your NIO code burning massive amounts of CPU.

Allrighty, here’s the code from the server, the client is just a loop which spits out as much packets (each with an incremental id) to the port this server is listening on. For the sake of simplicity this server assumes there can only connect 1 client to check the messages for (else the packetsLost etc should be implemented to be counted for each client instead global like now)

Here are the questions left I have about this code:

  • I seem to need to call the Selector.open() method before registering it to a serversocketchannel, but I have to call it before, else it throws me an nullpointerexception…

-Sometimes, there will be packets not registered when the client pumps out a list of 50 sequential messages as fast as it can

-In the accept, should I add each new client to a list with it’s own dedicated bytebuffer and after i’ve read it’s channel use duplicate() on the read buffer to copy its contents to the buffer reserved for that client?

-Im not sure if the write(SelectionKey key) method is implemented as it should…?

Thanks!

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.io.IOException;
import java.util.Set;

public class TCPTestServer {
int packetsLost = 0, receivedID = 0, lastID = 0;
protected boolean isProcessing;
protected Selector selector = Selector.open();
protected ByteBuffer buffer;
protected Charset charset;
protected ServerSocketChannel serverSocketChannel;

    public TCPTestServer(int bufferSize, String charset, String host, int port) throws Exception {
            this.buffer = ByteBuffer.allocate(bufferSize);
            this.charset = Charset.forName(charset);
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.socket().bind(new InetSocketAddress(host, port));
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
            this.selector = Selector.open();
            System.out.println("Server is listening at "+host+" at port "+port);
    }

    protected void accept(SelectionKey key) throws IOException {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel clientSocketChannel = ssc.accept();
            clientSocketChannel.configureBlocking(false);
            clientSocketChannel.register(this.selector, SelectionKey.OP_READ);
	System.out.println("Accepted new connection from " + ((ServerSocketChannel)key.channel()).socket().getInetAddress().getHostAddress());
    }

    private final void read(SelectionKey key) throws IOException {
            ReadableByteChannel channel = (ReadableByteChannel) key.channel();
            this.buffer.clear();
            try {
                  int numRead = channel.read(this.buffer);
                  if (numRead < 0) {
                       close(key);
                       return;
                  }
           } catch (IOException e) {
                      close(key);
                      return;
           }
           this.buffer.flip();
           this.processMessage(key);
    }

    protected void processMessage(SelectionKey key) throws IOException {
            receivedID = (int)buffer.get();
            packetsLost += Math.abs(1-(receivedID - lastID));
            lastID = receivedID;
            System.out.println("Received message id: "+receivedID);
    }

    protected void composeMessage(SelectionKey key) throws IOException {
            System.out.println("tcptestserver compose message");
    }

    protected void write(SelectionKey key) throws IOException {
            this.buffer.clear();
            this.composeMessage(key);
            this.buffer.flip();
	
            setWriteRequest(key,true);
            long writtenBytes = 0, totalBytes = this.buffer.remaining();
            while (writtenBytes != totalBytes) {
                    writtenBytes += ((WritableByteChannel)key.channel()).write(this.buffer);
                    // sleep here? And could it happen the buffer isn't written in one shot?
            }
            setWriteRequest(key,false);
   }

   public void setWriteRequest(SelectionKey key, boolean write) throws IOException {
            if (write) {
                 key.channel().register(this.selector,
                 key.interestOps() | SelectionKey.OP_WRITE);
            } else {
                 key.channel().register(this.selector,
                 key.interestOps() & ~SelectionKey.OP_WRITE);
            }
   }

    public void close(SelectionKey key) throws IOException {
            key.cancel();
            key.channel().close();
            System.out.println("Client closed connection:");
            System.out.println("packets lost: "+packetsLost);
            receivedID = 0;
            lastID = 0;
            packetsLost = 0;
    }

    public void dispose() throws Exception {
            this.serverSocketChannel.close();
            this.selector.close();
    }

    public void run() {
            this.isProcessing = true;
            try {
                 while (this.isProcessing) {
                          this.selector.select();
                          Set<SelectionKey> keys = this.selector.selectedKeys();
                          for (SelectionKey key : keys) {
                                 keys.remove(key);
                                 if (key.isAcceptable()) {
                                      this.accept(key);
                                 } else if (key.isReadable()) {
                                      this.read(key);
                                 } else if (key.isWritable()) {
                                      this.write(key);
                                 }
                         }
                  }
            } catch (IOException e) {
                    System.out.println("Error in AbstractServer.read: "+ e.getMessage());
            }
    }

    public static void main(String[] args) {
	
            int bufferSize = 255;
            int port = 10000;
            String charset = "UTF-8";
            String host = "127.0.0.1";

            try {
                    TCPTestServer server = new TCPTestServer(bufferSize, charset, host, port);
                    server.run();
            }
            catch (Exception e) { System.out.println("Can't instanciate TCPTestServer: "+e.toString()); }
   }

}

Don’t have time to look at the whole thing now, but one thing I’ve found with OP_WRITE is that the channel will not get selected unless a packet arrives from the client. So after you’ve selected the channel written some stuff, the next time able to write (if you only write when key is selected by nio) will be after a packet has arrived from the client.

I didn’t find documentation on this, but found out through experimenting… I think this might be a wrong implementation, it should, as Jeff says, get selected all the time (if nothing is wrong with the connection) but in reality it isn’t.

So I’m just writing whenever, and never bother to register with OP_WRITE… works fine for me.

The only danger there is that you can block up on a TCP conenction if thr ead bufefr fills up on the other end.

To be honest, my current personal NIo utils do write wrong… I just write from the app thread, which as I mentioedn above means that I can block up the app under the same cirucmstances as above.

I’ve been meaning to fix it, though i may have someoen else on my project do it. Il lwatch for this OP_WRITE issue and elt you gusy know what I see.

@Jeff:
Would it generally be better to handle sending (and recieving?) in separate threads?

I could imagine a setup where each channel has a thread, which accepts packages to send, looks if the channel can be written to and if so writes the next package, else waits.

Does anyone have a idea why I seem to miss a couple of packets (with TCP using NIO) sometimes?

I tested it with the old java.net & io packages and everything seems fine then. Also I dont see ANY advantage of using NIO for UDP (?), so I guess i’ll switch back to the old ways of handling network code.

Maybe it is as Jeff said that your stream is blocking up?
You wait until you can write via the selector.

Besides that I have no idea (that might be because I have little knowlage of the whole thing anyway).
Sorry

[quote]Maybe it is as Jeff said that your stream is blocking up?
You wait until you can write via the selector.
[/quote]
I think that was a response to Aramaz, further, I don’t use writing at all in my case. The method is implemented in the Server, but not used for the scenario I explained. (I only wait for incoming messages from the client and echo them on the console)

Your writing to the channel is very weird.

Before writing you do:
key.channel().register(this.selector, key.interestOps() | SelectionKey.OP_WRITE);

After writing you do:
key.channel().register(this.selector, key.interestOps() & ~SelectionKey.OP_WRITE);

I doubt you understand the callback-mechanism of NIO. You are supposed to register your interest-ops, then wait for the Selector to hand over the SelectionKeyS which will notify you when you can write to the channel. In this case it shouldn’t really matter, meaning: it won’t cause any bugs, but it’s just plain wrong to do your writing this way.

Further, your “composeMessage(SelectionKey)” method, doesn’t compose anything.

And you’re not sharing the code of the client connecting to the server, so it’s not really possible for us to see the results you are seeing.

Yes, the writing part isn’t complete yet, I haven’t looked into how to do it properly, thanks for the tip. But it doesn’t explain the question, as writing is not used.

The client code is pretty straight forward:

import java.io.*;
import java.net.Socket;

public class TCPTestClient {

public static void main(String[] args) {
	
	int id =1, port = 10000;
	String host = "127.0.0.1";
	Socket server;
	OutputStream out;

	try {
		server = new Socket(host, port);
		System.out.println("Connected to server at " + host + " port " + port);

		try { out = server.getOutputStream(); }
		catch (IOException ioe) { return; }

		while (id <= 500)
		{
			try
			{
				ByteArrayOutputStream bos = new ByteArrayOutputStream();
				DataOutputStream dos = new DataOutputStream((OutputStream)bos);
				dos.writeInt(id++);
				dos.writeLong(System.currentTimeMillis());
				System.out.println("sending packet id:" + id);
				out.write(bos.toByteArray());
			}
			catch (Exception e) { }
		}
	}
	catch (IOException e) { System.out.println("Can't instanciate TCPTestClient: "+e.toString()); }
}

}

You are doing Selector.open() twice, causing the wrong Selector to be used, resulting in a server that blocks on the wrong selector, never ever waking up.

After fixing numerous bugs, I finally got it to work.

  1. using only 1 selector
  2. the server was reading 1 byte from the buffer per packet, while the client wrote an int (4b) and a long (8b) per packet
  3. your ID was wrong: it sent packet [x] and it printed “packet sent: [x+1]”
  4. not flushing your clients output (!!)

I’d advice you to read the javadocs very carefully, because there was so much wrong with your code it seems like you were just doing copy-and-paste from several sites and some trial-and-error here and there. Not that that is wrong, but you pay for it in the long run.

Ouch, the selector.open has slipped in because I tried several things before…
The message structure was something I was playing with, I copied/pasted a wip version :-[
The long has the timestamp the original packet was send so i can compare it on the server (it’s the same machine) to see how long sending and processing takes.

About flushing the client output, you mean the bytebuffer i read it’s output in?

Thanks for taking the time to look at it!

The client has an OutputStream, flush() it.

Otherwise the bytes are not yet sent as they’ll be stored in the “OSs socket buffer” (?) which only sends bytes (to the server) when it’s full.

When you close() your OutputStream, it gets flushed too, but normally you want to flush your bytes earlier.