Allrighty, here’s the code from the server, the client is just a loop which spits out as much packets (each with an incremental id) to the port this server is listening on. For the sake of simplicity this server assumes there can only connect 1 client to check the messages for (else the packetsLost etc should be implemented to be counted for each client instead global like now)
Here are the questions left I have about this code:
- I seem to need to call the Selector.open() method before registering it to a serversocketchannel, but I have to call it before, else it throws me an nullpointerexception…
-Sometimes, there will be packets not registered when the client pumps out a list of 50 sequential messages as fast as it can
-In the accept, should I add each new client to a list with it’s own dedicated bytebuffer and after i’ve read it’s channel use duplicate() on the read buffer to copy its contents to the buffer reserved for that client?
-Im not sure if the write(SelectionKey key) method is implemented as it should…?
Thanks!
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.io.IOException;
import java.util.Set;
public class TCPTestServer {
int packetsLost = 0, receivedID = 0, lastID = 0;
protected boolean isProcessing;
protected Selector selector = Selector.open();
protected ByteBuffer buffer;
protected Charset charset;
protected ServerSocketChannel serverSocketChannel;
public TCPTestServer(int bufferSize, String charset, String host, int port) throws Exception {
this.buffer = ByteBuffer.allocate(bufferSize);
this.charset = Charset.forName(charset);
this.serverSocketChannel = ServerSocketChannel.open();
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.socket().bind(new InetSocketAddress(host, port));
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
this.selector = Selector.open();
System.out.println("Server is listening at "+host+" at port "+port);
}
protected void accept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel clientSocketChannel = ssc.accept();
clientSocketChannel.configureBlocking(false);
clientSocketChannel.register(this.selector, SelectionKey.OP_READ);
System.out.println("Accepted new connection from " + ((ServerSocketChannel)key.channel()).socket().getInetAddress().getHostAddress());
}
private final void read(SelectionKey key) throws IOException {
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
this.buffer.clear();
try {
int numRead = channel.read(this.buffer);
if (numRead < 0) {
close(key);
return;
}
} catch (IOException e) {
close(key);
return;
}
this.buffer.flip();
this.processMessage(key);
}
protected void processMessage(SelectionKey key) throws IOException {
receivedID = (int)buffer.get();
packetsLost += Math.abs(1-(receivedID - lastID));
lastID = receivedID;
System.out.println("Received message id: "+receivedID);
}
protected void composeMessage(SelectionKey key) throws IOException {
System.out.println("tcptestserver compose message");
}
protected void write(SelectionKey key) throws IOException {
this.buffer.clear();
this.composeMessage(key);
this.buffer.flip();
setWriteRequest(key,true);
long writtenBytes = 0, totalBytes = this.buffer.remaining();
while (writtenBytes != totalBytes) {
writtenBytes += ((WritableByteChannel)key.channel()).write(this.buffer);
// sleep here? And could it happen the buffer isn't written in one shot?
}
setWriteRequest(key,false);
}
public void setWriteRequest(SelectionKey key, boolean write) throws IOException {
if (write) {
key.channel().register(this.selector,
key.interestOps() | SelectionKey.OP_WRITE);
} else {
key.channel().register(this.selector,
key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
public void close(SelectionKey key) throws IOException {
key.cancel();
key.channel().close();
System.out.println("Client closed connection:");
System.out.println("packets lost: "+packetsLost);
receivedID = 0;
lastID = 0;
packetsLost = 0;
}
public void dispose() throws Exception {
this.serverSocketChannel.close();
this.selector.close();
}
public void run() {
this.isProcessing = true;
try {
while (this.isProcessing) {
this.selector.select();
Set<SelectionKey> keys = this.selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
}
} catch (IOException e) {
System.out.println("Error in AbstractServer.read: "+ e.getMessage());
}
}
public static void main(String[] args) {
int bufferSize = 255;
int port = 10000;
String charset = "UTF-8";
String host = "127.0.0.1";
try {
TCPTestServer server = new TCPTestServer(bufferSize, charset, host, port);
server.run();
}
catch (Exception e) { System.out.println("Can't instanciate TCPTestServer: "+e.toString()); }
}
}