I am using telnet to test it out. I telnet in and it works find, but it always says I’m still connected on the other side. I can even call isConnected(); on the channel and it still returns true. You need Java 1.5 to compile this since I am using BlockingQueue and Generics. You can find the precompiled versions at http://files.amdfanboy.com under JChat.
//main chat program
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
public class JChatServer
{
public static void main(String[] args)
{
ChatController controller = new ChatController();
try
{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(true);
ssc.socket().bind(new InetSocketAddress(70));
while(true)
{
ChatThread t = new ChatThread(ssc.accept(), controller);
controller.register(t);
t.start();
}
}
catch(IOException ioe)
{
System.out.println(ioe);
}
}
}
//chat controller
import java.util.*;
public class ChatController
{
//ArrayList of the ChatThreads that have been registered
private ArrayList<ChatThread> registeredThreads;
//makes a new chatcontoller
public ChatController()
{
this.registeredThreads = new ArrayList<ChatThread>();
}
//adds ct to registeredThreads
public boolean register(ChatThread ct)
{
this.registeredThreads.add(ct);
return true;
}
//removes ct from registeredThreads
public boolean unregister(ChatThread ct)
{
return this.registeredThreads.remove(ct);
}
//distributes a message to all the threads located registerThreads
//using the addMessage(Message) method
public void pushToAll(Message m)
{
for(ChatThread ct : this.registeredThreads)
{
ct.addMessage(m);
}
}
//shuts down all the threads in registeredThreads b calling their
//shutdown() method
public void shutdownThreads()
{
for(ChatThread ct : this.registeredThreads)
{
ct.shutdown();
}
}
}
///the worker thread
import java.util.concurrent.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
public class ChatThread extends Thread
{
//queue of messages
private BlockingQueue<Message> queue;
//channel to use
private SocketChannel channel;
//controller that was set in the constructor
private ChatController controller;
//shutdown flag; run checks if it should return based on it
private boolean shutdown = false;
//makes a new ChatThread object
public ChatThread(SocketChannel sc, ChatController cc)
{
this.channel = sc;
this.controller = cc;
this.queue = new LinkedBlockingQueue<Message>();
}
//used to run as thread
public void run()
{
try
{
this.channel.configureBlocking(false);
}
catch(IOException ioe)
{
}
while(!shutdown)
{
try
{
if(!this.channel.isConnected())
{
throw new IOException("Disconnected");
}
this.readMessage();
this.pushMessages();
TimeUnit.SECONDS.sleep(1);
}
catch(IOException ioe)
{
System.out.println(ioe);
shutdown();
break;
}
catch(InterruptedException ie)
{
}
}
}
//read the message from the channel or return if there is nothing to read
//i.e. non-blocking; assumes channel is already non-blocking
public void readMessage() throws IOException
{
if(!this.channel.isConnected())
{
throw new IOException("Disconnected");
}
System.out.println("Reading");
try
{
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder strBuild = new StringBuilder();
buffer.clear();
boolean readMessage = false;
while(this.channel.read(buffer) > 0)
{
buffer.flip();
for(int i = 0; i < buffer.limit(); i++)
{
strBuild.append((char) buffer.get(i));
}
buffer.clear();
readMessage = true;
}
System.out.println("Done Reading");
if(readMessage)
{
this.controller.pushToAll(new Message(strBuild.toString()));
}
}
catch(IOException ioe)
{
System.out.println(ioe);
throw ioe;
}
}
//pushes all messages currently in queue and pushes
//them out to the clients with ObjectOutputStreams
//(temp sets blocking to true)
public void pushMessages() throws IOException
{
System.out.println("Pushing messages");
try
{
if(!this.channel.isConnected())
{
throw new IOException("Disconnected");
}
this.channel.configureBlocking(true);
while(this.queue.size() > 0)
{
Message m = this.queue.poll();
ObjectOutputStream oos = new ObjectOutputStream(this.channel.socket().getOutputStream());
oos.writeObject(m);
}
this.channel.configureBlocking(false);
}
catch(IOException ioe)
{
System.out.println(ioe);
throw ioe;
}
}
//adds Message m to queue(BlockingQueue)
public boolean addMessage(Message m)
{
boolean answer = this.queue.offer(m);
System.out.println("Got a message");
return answer;
}
//shuts down this thread by stting the shutdown flag to true
//and also unregisters this from the conroller
public void shutdown()
{
//deregister
this.controller.unregister(this);
this.shutdown = true;
}
}
///message class, used only internal so far
public class Message extends Object implements java.io.Serializable
{
private String data;
public Message(String s)
{
this.data = s;
}
public String getData()
{
return data;
}
}