PyroNet (lowlevel NIO Wrapper)

NIO networking as simple as it gets

Over the last months, I made some major improvements to my ‘NIO Wrapper’. I believe this is as simple as it gets. You get notified of I/O event via a callback mechanism, so you can act on ‘client connected’ or ‘data received from client’ events. The API is single-threaded (and immediately throws exceptions if used otherwise), regardless of how many servers are listening, and how many clients are connected (as this is the point on NIO).

The API does not force any protocol on you, so the received byte[]s can be of any size, just how the OS received them. You can register ReadCallbacks, which cause you to be notified when a specified amount of bytes are received:

How to process incoming data: ReadCallback
NioClient client = ...; byte[] fill = new byte[N]; ReadCallback callback = new ReadCallback(fill) { public void done() { byte[] filled = this.data(); } }; callback.register(client);

How to send data:
client.enqueue(byte[]); // will combine your data, and send it in batches, whenever the OS feels like it

How to process incoming data even easier: PacketCallback (2 byte length header + payload)
NioClient client = ...; PacketCallback callback = new PacketCallback() { public void received(byte[] payload) { // we got yet another packet! NioClient source = this.client(); } }; callback.loop(client); // will infinitely register itself on this client

How to send data even easier: (using Packets)
Packet p = new Packet(byte[]); p.sendTo(clientA); p.sendTo(clientB); // prepends payload with 2b header and enqueues the write

Download it here: (source included)
www.indiespot.net/files/jawnae-net2b.jar

To connect to a server: (previously: Socket)
`
NioNetwork network = new NioNetwork(handler);
network.connect(new InetSocketAddress(HOST, PORT));

  while (true)
  {
     network.select();
  }

`

To listen for clients: (previously: ServerSocket)
`
NioNetwork network = new NioNetwork(handler);
network.listen(new InetSocketAddress(HOST, PORT));

  while (true)
  {
     network.select();
  }

`

NioNetworkHandler (you will implement this to handle all events)
`
public interface NioNetworkHandler
{
public void selectedKeys(int count);
public void selectFailure(IOException cause);

public void serverSelected(NioServer server);
public void clientSelected(NioClient client);

public void acceptedClient(NioClient client);
public void connectedClient(NioClient client);
public void unconnectableClient(NioClient client);
public void droppedClient(NioClient client, IOException cause);
public void disconnectedClient(NioClient client);

public void receivedData(NioClient client, int bytes);
public void sendingData(NioClient client);
public void sentData(NioClient client, int bytes);

public void executingTask(Runnable task);
public void taskCrashed(Runnable task, Throwable cause);
}
`

Ofcourse these are also provided for convenience:
NioNetworkAdapter, NioNetworkLogger, NioNetworkExtendableHandler

Executing tasks on the NetworkThread

All reads and writes must be performed on the NetworkThread. This is the thread on which the NioNetwork instance was constructed. Now you probably have more threads in your app, so what to do when you need to send a client a message at some random moment?

`
NioNetwork network = …;
NioServer server = …;

// on another thread:

Runnable broadcastTask = new Runnable()
{
public void run()
{
byte[] payload = …;
Packet packet = new Packet(payload);
for(NioClient client: server) // NioServer implements Iterable and returns all NioClients
{
packet.sendTo(client);
}
}
};
network.scheduleOnNetworkThread(broadcastTask);
network.interrupt(); // network.select() will unblock
`

Feedback greatly appreciated. Even if you mention MINA… :wink:

Simple testcases included:
[o] TimeServer + TimeClient
[o] ChatServer + ChatClient

The simplest task (like a Time Server/Client) may seem like a bit complex, but keep in mind NIO is a very powerful API, so to take advantage of it, there is a bit of boilerplate code, even in a wrapper…

TimeClient:


      NioNetworkHandler handler = new NioNetworkAdapter()
      {
         @Override
         public void connectedClient(NioClient client)
         {
            new PacketCallback()
            {
               @Override
               public void received(byte[] payload)
               {
                  System.out.println("Received: " + new String(payload));
               }
            }.loop(client);
         }
      };

      NioNetwork network = new NioNetwork(handler);

      network.connect(new InetSocketAddress(HOST, PORT));

      while (true)
      {
         network.select();
      }

TimeServer:


      final NioNetwork network = new NioNetwork(new NioNetworkLogger());
      final NioServer server = network.listen(new InetSocketAddress(HOST, PORT));

      final long selectTimeout = 1000;
      final long broadcastInterval = 5000;
      long lastSent = 0;

      while (true)
      {
         network.select(selectTimeout);

         final long now = System.currentTimeMillis();

         if (now - lastSent < broadcastInterval)
         {
            continue;
         }

         String text = "Server time: " + now;
         byte[] data = text.getBytes();

         Packet p = new Packet(data);
         for (NioClient client : server)
         {
            p.sendTo(client);
         }

         lastSent = now;
      }

See sourcecode in JAR file

The ChatServer + ChatClient (see test.jawnae.net2.NioTcpChatServer/Client) are a much better example of how powerful the API is - I coded it within 10 minutes. It feels just like reading and writing lines in plain old java.io.* / java.net.*

The sample code is horrendous, but should give you a good idea how to use the API, and how to do better.

There are slightly less that 1000 lines of code in the core API handling the dirty bits of NIO. Not counting interfaces, loggers and adapters. Keep this in mind when you want to roll-your-own, it is very close to the metal, and this simple wrapper will certainly safe countless hours.

I can see this being very handy for my next networked application/game.

Thanks for the effort!

I haven’t come to the point in my projects to need networking, but this seems very useful. Thanks!

This looks very good, especially the Packet/PacketCallback stuff which waits for the whole message to be recieved. Almost all NIO frameworks don’t handle this out-of-the-box, including MINA! Unless streaming of audio or video is needed, I don’t know why anyone would be interested in receiving half a message.

I just got rid of the networking part of my game since it was holding me back from developing faster. When I refactor it back in Ill use this API.

Thanks for open-sourcing it :smiley:

Riven, I love your APIs - they are just brilliantly designed. I must write a multiplayer game sometime!

Would anyone be interested in collaborating with Puppygames to write a Soldat-type game? (Mac is desperately missing Soldat so I think there’s a proper market for the game)

Cas :slight_smile:

can you list advantages / differences when comparing to nio2?

AFAIK NIO2 is mainly about the FileSystem and completely seperating networking from java.net.* classes. I’ve yet to find sample code (not dealing with file I/O) to compare the too.

Well I haven’t being toying with it yet so that’s why I was asking:
there is some info in the Javadocs of openjdk:
http://download.java.net/jdk7/docs/api/java/nio/channels/AsynchronousServerSocketChannel.html

final AsynchronousServerSocketChannel listener =
      AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(5000));

  listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
      public void completed(AsynchronousSocketChannel ch, Void att) {
          // accept the next connection
          listener.accept(null, this);

          // handle this connection
          handle(ch);
      }
      public void failed(Throwable exc, Void att) {
          ...
      }
      public void cancelled(Void att) {
          ...
      }
  });

http://download.java.net/jdk7/docs/api/java/nio/channels/AsynchronousSocketChannel.html

At least as far as naming goes your api looks a lot more gentle

I see. It is infact quite similar.

Future<Long> f = channel.write(ByteBuffer)

But, there is a major difference… see that you get that Long back? THat means that your ByteBuffer might not be written fully.

Lets say you do this:
Future<Long> f0 = channel.write(bb0); Future<Long> f1 = channel.write(bb1);

When f0 yields a value, f1 will be written sooner or later. Now bb0 might not be fully written, so you lose data when the data of bb1 gets appended. You’ll have to code around this, like waiting for f0 to yield, check how many bytes are remaining, potentially resubmit bb0 OR initiate the f1 write - you’ll have to loop this, because b00 might have bytes remaining after N writes…

Update:
Now that I read the javadocs, it even seems to be worse. The second write will throw a PendingWriteException - what’s the use of having a ‘multithreaded api’, when you only allow 1 thread at a time, in an ASYNC api… i mean… even if you synchronize on your channel, it will throw exceptions at you for being ‘concurrent’ as the Future has not yet yielded. This basically SCREAMS for a queueing mechanism!

You simply need yet another abstraction layer, again.

Further, the read/write-timeout mechanism is simply b0rked. If the read or write didn’t happen in time, it will throw an Exception (ok…) but it will also leave your channel in an UNDEFINED state! Totally worthless! Reconnect and try again, I guess. What’s the point of specifiying the timeout per read/write - if it destroys your channel, as apposed to per channel…

Further, in my wrapper API, all data is batched up for you. If you write/enqueue, nothing happens until network.select() is called, which groups all enqueued data as efficiently as possible, and sends it to the socket. Same for reading: the API reads as much as possible, and spreads the bytes over the ReadCallbacks.

I guess my wrapper is just more convenient. (although NIO2 is a major improvement!).

Plus you get the Packet protocol (16 bit header) out of the box.

If you want to browse the sourcecode online:

http://www.katav.nl/html_stuff2/
–> check out the jawnae.net2 package.
–> check out the test.jawnae.net2 package.

I was programming this singlethreaded proxy server, and noticed that in that situation, my ReadCallbacks where a royal PITA, as they would only return if N bytes were fully read. If you don’t know the amount of bytes to expect, like for example in a line-based protocol like HTTP, FTP, SMTP, POP3, etc etc etc… then you need to the able to handle the incoming bytes instantly…

So!

You can now pass a NioReadStrategy (enum) for each NioClient: READ_CALLBACK and READ_DIRECT.

NioNetworkHandler (interface)
... public NioReadStrategy acceptedClient(NioClient client); public NioReadStrategy connectedClient(NioClient client); ... public void receivedBytes(NioClient client, int bytes); // will be called in READ_CALLBACK mode public void receivedData(NioClient client, byte[] data); // will be called in READ_DIRECT mode ...

Now it gets very simply to dump data between two connections, so can code a non-blocking simply proxy in very few lines:

NioSimpleProxy: (See http://java-gaming.org/ on http://localhost/)


public class NioGateway extends NioNetworkExtendableHandler
{
   public static void main(String[] args) throws IOException
   {
      InetSocketAddress src = new InetSocketAddress("127.0.0.1", 80);
      InetSocketAddress dst = new InetSocketAddress("java-gaming.org", 80);
      

      NioGateway nioGateway = new NioGateway(src, dst);
      NioNetwork network = new NioNetwork(nioGateway);
      network.listen(src);

      // now browse to http://localhost/

      while (true)
      {
         network.select();
      }
   }

   //

   private final InetSocketAddress src, dst;

   public NioGateway(InetSocketAddress src, InetSocketAddress dst)
   {
      super(new NioNetworkLogger());

      this.src = src;
      this.dst = dst;
   }

   //

   @Override
   public NioReadStrategy acceptedClient(NioClient client)
   {
      InetSocketAddress local = client.getLocalAddress();

      System.out.println("accepted: " + local);

      if (!this.src.equals(local))
      {
         client.disconnect();
         return null;
      }

      try
      {
         // make a connection to the other guy
         NioClient otherGuy = client.network().connect(this.dst);

         client.attach(otherGuy);
         otherGuy.attach(client);
      }
      catch (IOException exc)
      {
         exc.printStackTrace();
      }

      // no ReadCallbacks, just pass plain received byte[]
      return NioReadStrategy.READ_DIRECT;
   }

   @Override
   public NioReadStrategy connectedClient(NioClient client)
   {
      System.out.println("connected: " + client.getRemoteAddress());

      return NioReadStrategy.READ_DIRECT;
   }

   @Override
   public void receivedData(NioClient client, byte[] data)
   {
      // send what you received to the other guy
      NioClient otherGuy = (NioClient) client.attachment();
      otherGuy.enqueue(data);
   }

   @Override
   public void droppedClient(NioClient client, IOException cause)
   {
      System.err.println("dropped: [" + client + "] " + cause.getClass().getSimpleName());

      this.disconnectedClient(client);
   }

   @Override
   public void disconnectedClient(NioClient client)
   {
      NioClient otherGuy = (NioClient) client.attachment();

      // also disconnect the other guy
      if (!otherGuy.isDisconnected())
      {
         otherGuy.disconnect();
      }

      // note that you don't know whether you are the
      // other guy, or the other guy is! if either end
      // disconnects, the _other_ guy disconnects too.
   }
}

The sourcecode on the server has been updated!
–> test.jawnae.net2.NioGateway

I downloaded sources from http://www.katav.nl/html_stuff2/jars/jawnae.jar link and tried NioTcpTimerServer/Client examples.

Soon as client is started both parties throw an exception. Quick look indicates its server crashing due to a NioReadStrategy attribute being a NULL. Same error applies to ChatClient/Server example. Do you have a working examples?

This is what I modified but how can I read incoming reply packets from the server?

NioTcpTimeServer.java


package test.jawnae.net2;

import java.io.IOException;
import java.net.InetSocketAddress;

import jawnae.net2.*;

public class NioTcpTimeServer {
   public static final String HOST = "127.0.0.1";
   public static final int    PORT = 8421;

   public static void main(String[] args) throws IOException {
      final NioNetworkHandler handler = new NioNetworkAdapter() {
         @Override
         public NioReadStrategy acceptedClient(NioClient client) {
            System.out.println("acceptedClient: " + client);
            return NioReadStrategy.READ_CALLBACK;
         }
      };

      final NioNetwork network = new NioNetwork(handler);

      final NioServer server = network.listen(new InetSocketAddress(HOST, PORT));

      final long selectTimeout = 1000;
      final long broadcastInterval = 5000;
      long lastSent = 0;

      // post timestamp every 5 secs to all registered clients
      while (true) {
         network.select(selectTimeout);

         final long now = System.currentTimeMillis();

         if (now - lastSent < broadcastInterval)
            continue;

         String text = "Server time: " + now;
         byte[] data = text.getBytes();

         Packet p = new Packet(data);
         for (NioClient client : server) {
            p.sendTo(client);
         }

         lastSent = now;
      }
   }
}

NioTcpTimeClient.java


package test.jawnae.net2;

import java.io.IOException;
import java.net.InetSocketAddress;

import jawnae.net2.*;

public class NioTcpTimeClient {
   public static final String HOST = "127.0.0.1";
   public static final int    PORT = 8421;

   public static void main(String[] args) throws IOException {
      NioNetworkHandler handler1 = new NioNetworkLogger();
      NioNetworkHandler handler2 = new NioNetworkExtendableHandler(handler1) {
         @Override
         public NioReadStrategy connectedClient(NioClient client) {
            System.out.println("connectedClient: " + client);
            super.connectedClient(client);
            return NioReadStrategy.READ_CALLBACK;
         }

         @Override
         public void receivedBytes(NioClient client, int bytes) {
            System.out.println("receivedBytes: " + client  + ", len: " + bytes);
         }
      };

      NioNetwork network = new NioNetwork(handler2);

      network.connect(new InetSocketAddress(HOST, PORT));

      while (true) {
         network.select();
      }
   }

}

Ah… stupid mistake of not running the test-cases after modifying the core.

I fixed it, by using a default strategy if none is provided.

Maybe I’ll code up something better, but at least it should work now.

I uploaded the fix.

Updated in NioClient.java:


   protected NioReadStrategy defaultStrategy = NioReadStrategy.READ_CALLBACK;

...
      NioReadStrategy strategy = this.network.handler.connectedClient(this);
      if (strategy == null)
         strategy = this.defaultStrategy;
      this.applyNioReadStrategy(strategy);
...

As to how to get it to work: use a PacketCallback like in the provided testcase. That’s where all received data ends up. if you use a READ_CALLBACK.

new PacketCallback() { @Override public void received(byte[] payload) { System.out.println("Received from ["+this.client()+"]: " + new String(payload)); } }.loop(client);

If you use a READ_DIRECT however, all data ends up at:
NioNetworkListener.receivedData(NioClient client, byte[] data)
but in that case your packets won’t have a guaranteed length, you receive the data as received by the OS.

Before I read your reply I managed to implement this version. I do however download an updated code and continue experimenting with your NIO wrapper. I like the packet approach its so much easier to create application logic such as sending short xml messages.

What is it loop(client) call you prefer, I am now a bit lost of whether to use .register(client) call once or loop(client). Any suggestions?

NioTcpTimeClient.java


package test.jawnae.net2;

import java.io.IOException;
import java.net.InetSocketAddress;

import jawnae.net2.*;

public class NioTcpTimeClient {
   public static final String HOST = "127.0.0.1";
   public static final int    PORT = 8421;

   public static void main(String[] args) throws IOException {
      final ReadCallback callback = new PacketCallback() {
         @Override
         public void received(byte[] payload) {
            System.out.println("Received: " + new String(payload));
         }
      };

      NioNetworkHandler handler1 = new NioNetworkLogger();
      NioNetworkHandler handler2 = new NioNetworkExtendableHandler(handler1) {
         @Override
         public NioReadStrategy connectedClient(NioClient client) {
            System.out.println("connectedClient: " + client);
            super.connectedClient(client);
            return NioReadStrategy.READ_CALLBACK;
         }

         @Override
         public void receivedBytes(NioClient client, int bytes) {
            System.out.println("receivedBytes: " + bytes);
            callback.register(client);
         }
      };

      NioNetwork network = new NioNetwork(handler2);

      network.connect(new InetSocketAddress(HOST, PORT));

      while (true) {
         network.select();
      }
   }

}

edit: and should I download all individual files from http://www.katav.nl/html_stuff2/ page or plan on releasing a zip build?
thx.

edit: And I think previous fix does not fix the original problem, my very limited understanding of the code logic indicates its NioClient.registerReadCallback method should use a default strategy. Or maybe better a way earlier should default stragegy be initialized.

base.jar and jawnae.jar contain both java source and class files.

PacketCallback does the job of registering itself (well, not exactly, look in the code), so you don’t have to.

Once you have called loop(client) there is no more house-keeping for you.

You might have to look at PacketReadback anyway, just to see how simple it is, and how it uses 2 alternating readbacks to fetch the header/payload/header/payload/header/etc/etc

Thank you for making me look silly :slight_smile: I deserve it.

Now I actually tested it, instead of just coding a bit it and expecting it to work…

I’ve done away with the mandatory NioReadCallback return value, it was a PITA anyway.

Now you simply don’t do anything, unless you have to switch to READ_DIRECT.
client.applyNioReadStrategy(READ_DIRECT);

Thx, I cleared a browser cache and was able to save an updated http://www.katav.nl/html_stuff2/jars/jawnae.jar file. Firefox and/or webserver is unable to give changed .jar file for some reason.

TimeServer-Client test works ok out of the box. But ChatServer-client test app does nothing, however it did not crash anymore :slight_smile: It seems server-side does not trigger packetReceived handler.

Sorry, I was very busy last night, and will probably look into it later today.

It’s all rather silly, and I have to confess I only tested the Time server/client…

Fixed it.