SocketChannel remains open after disconnect

I am using telnet to test it out. I telnet in and it works find, but it always says I’m still connected on the other side. I can even call isConnected(); on the channel and it still returns true. You need Java 1.5 to compile this since I am using BlockingQueue and Generics. You can find the precompiled versions at http://files.amdfanboy.com under JChat.


//main chat program

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.io.*;

public class JChatServer
{
      public static void main(String[] args)
      {
            ChatController controller = new ChatController();
            try
            {
                  ServerSocketChannel ssc = ServerSocketChannel.open();
                  ssc.configureBlocking(true);
                  ssc.socket().bind(new InetSocketAddress(70));
                  
                  while(true)
                  {
                        ChatThread t = new ChatThread(ssc.accept(), controller);
                        controller.register(t);
                        t.start();
                  }
            }
            catch(IOException ioe)
            {
                  System.out.println(ioe);
            }
            
      }
}





//chat controller

import java.util.*;

public class ChatController
{
      //ArrayList of the ChatThreads that have been registered
      private ArrayList<ChatThread> registeredThreads;
      
      //makes a new chatcontoller
      public ChatController()
      {
            this.registeredThreads = new ArrayList<ChatThread>();
      }
      
      //adds ct to registeredThreads
      public boolean register(ChatThread ct)
      {
            this.registeredThreads.add(ct);
            return true;
      }
      
      //removes ct from registeredThreads
      public boolean unregister(ChatThread ct)
      {
            return this.registeredThreads.remove(ct);
      }
      
      //distributes a message to all the threads located registerThreads
      //using the addMessage(Message) method
      public void pushToAll(Message m)
      {
            for(ChatThread ct : this.registeredThreads)
            {
                  ct.addMessage(m);
            }
      }
      
      //shuts down all the threads in registeredThreads b calling their
      //shutdown() method
      public void shutdownThreads()
      {
            for(ChatThread ct : this.registeredThreads)
            {
                  ct.shutdown();
            }
      }
}




///the worker thread

import java.util.concurrent.*;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;

public class ChatThread extends Thread
{
      //queue of messages 
      private BlockingQueue<Message> queue;
      //channel to use
      private SocketChannel channel;
      //controller that was set in the constructor
      private ChatController controller;
      //shutdown flag; run checks if it should return based on it
      private boolean shutdown = false;
      
      //makes a new ChatThread object
      public ChatThread(SocketChannel sc, ChatController cc)
      {
            this.channel = sc;
            this.controller = cc;
            this.queue = new LinkedBlockingQueue<Message>();
      }
      
      //used to run as thread
      public void run()
      {
            try
            {
                  this.channel.configureBlocking(false);
            }
            catch(IOException ioe)
            {
                  
            }
            
            while(!shutdown)
            {
                  try
                  {
                        if(!this.channel.isConnected())
                        {
                              throw new IOException("Disconnected");
                        }
                        this.readMessage();      
                        this.pushMessages();
                        TimeUnit.SECONDS.sleep(1);
                  }
                  catch(IOException ioe)
                  {
                        System.out.println(ioe);
                        shutdown();
                        break;
                  }
                  catch(InterruptedException ie)
                  {
                  }
            }
      }
      
      //read the message from the channel or return if there is nothing to read
      //i.e. non-blocking; assumes channel is already non-blocking
      public void readMessage() throws IOException
      {
            
            if(!this.channel.isConnected())
            {
                  throw new IOException("Disconnected");
            }
            
            System.out.println("Reading");
            try
            {
                  ByteBuffer buffer = ByteBuffer.allocate(1024);
                  StringBuilder strBuild = new StringBuilder();
                  buffer.clear();
                  boolean readMessage = false;
                  while(this.channel.read(buffer) > 0)
                  {
                        buffer.flip();
                        for(int i = 0; i < buffer.limit(); i++)
                        {
                              strBuild.append((char) buffer.get(i));
                        }
                        buffer.clear();
                        readMessage = true;
                  }
                  System.out.println("Done Reading");
                  if(readMessage)
                  {
                        this.controller.pushToAll(new Message(strBuild.toString()));
                  }
                  
            }
            catch(IOException ioe)
            {
                  System.out.println(ioe);
                  throw ioe;
                  
            }

      }
      
      //pushes all messages currently in queue and pushes
      //them out to the clients with ObjectOutputStreams
      //(temp sets blocking to true)
      public void pushMessages() throws IOException 
      {
            System.out.println("Pushing messages");
            try
            {
                  if(!this.channel.isConnected())
                  {
                        throw new IOException("Disconnected");
                  }
                  this.channel.configureBlocking(true);
                  while(this.queue.size() > 0)
                  {
                        Message m = this.queue.poll();
                        ObjectOutputStream oos = new ObjectOutputStream(this.channel.socket().getOutputStream());
                        oos.writeObject(m);
                  }
                  this.channel.configureBlocking(false);
            }
            catch(IOException ioe)
            {
                  System.out.println(ioe);
                  throw ioe;
            }
      }
      
      //adds Message m to queue(BlockingQueue)
      public boolean addMessage(Message m)
      {
            boolean answer  = this.queue.offer(m);
            System.out.println("Got a message");
            return answer;
      }
      
      //shuts down this thread by stting the shutdown flag to true
      //and also unregisters this from the conroller
      public void shutdown()
      {
            //deregister
            this.controller.unregister(this);
            this.shutdown = true;
      }
}








///message class, used only internal  so far

public class Message extends Object implements java.io.Serializable 
{
      private String data;
      
      public Message(String s)
      {
            this.data = s;
      }
      
      public String getData()
      {
            return data;
      }
}