outputstreams / buffers

I am trying to write about 120kB of data through a TCP-connected socket. This data is divided into records that are each approximately 100B or so in size. The problem I’m running into is that somewhere about halfway through, my client gets out of sync and eventually crashes because of invalid size information on the records. after some experimentation, it seems to me that this behavior is what i might expect if there is a buffer (i’m not using bufferedoutputstream) that is being unpredictably overwritten before the queued bytes are being flushed through the stream to the client. i have tried calling flush() between each record, but that doesn’t seem to have any effect on this. i’m not really seeing anything else in the javadocs that seems relevant. can anyone help me on this?

Not sure wether I got you right.

Basically, writing n bytes to a TCP stream does not mean that n bytes can be read. It is a stream.

I struggled with that myself when doing my NIO stuff, receiving messages partly or getting more than one a time. I solved that easily by truely following the ‘stream’ paradigma. Collecting data until a message is complete, placing the rest of the read into the next message buffer.

Is that your problem?

If I understand your question correctly, the answer is no. I only pull out a message if I have all of it’s data; I do this by checking if there is enough data available(). This is what I’m using for that purpose:


      /**
       * decodeMessage is called internally when there is data in the input stream
       */
      private void decodeIncomingMessages() {


            if(!connected) return; // no need to mess with this now
            try {
                  if(input.available() > 2) { // at least the size information and one more byte
                        input.mark(BUFFER_SIZE);
                        int size = input.readShort(); // message size
                        if(input.available() < size) {
                              input.reset(); // return to the mark; not enough data is available
                              return;
                        }
//                        System.out.println("size of incoming: " + size);
                        ByteBuffer buffer = bufferPool.checkoutBuffer();
                        input.read(buffer.array(), 0, size);
                        buffer.mark(); // so we can go back to this point if it shouldn't be processed on this low level
                        short type = buffer.getShort();
                        switch(type) { // check for low-level protocol messages
                              case Protocol.DISCONNECT:
                                    try {
                                          socket.close();
                                    } catch(Exception e) {
                                          System.out.println("Error closing Session socket on Protocol.DISCONNECT.");
                                    }
                                    connected = false;
                                    bufferPool.checkinBuffer(buffer);
                                    break;
                              case Protocol.CONNECTED:
                                    connected = true;
                                    bufferPool.checkinBuffer(buffer);
                                    break;
                              default: 
                                    buffer.reset(); // return it to the point where the message type can be read
                                    enqueueIncomingMessage(buffer); // put this back on top
                                    break;
                        }
                  }
            }
            catch(IOException e) {
                  e.printStackTrace();
                  handleSessionError();
                  return;
            }
      }

      /**
       * sendMessage is called internally when flushing all queued messages to the output stream
       * @param message the message to be sent
       */
      private void flushOutgoingMessages() {
      
            if(!connected) return; // it's behavior will probably just cause more errors
            try {
                  while(hasOutgoingMessage()) {
                        ByteBuffer buffer = dequeueOutgoingMessage();
//                        System.out.println("outgoing type: " + buffer.getShort(2));
                        output.write(buffer.array(), 0, buffer.position());
                        output.flush();
                        bufferPool.checkinBuffer(buffer);
                  }
            } catch (IOException e) {
                  e.printStackTrace();
                  handleSessionError();
            }
      }

Hm, hard to see. I’d feel a bit uncomfortable concerning input.mark()/reset() bc. they establishe states that easily might get shaken…

Maybe you could try to just plain read the socket to the end in a test environment, just to make sure wether the problem is on the sender or the receiver side?

Anyway, I provide a snippet of what I do. It different for I always read into my buffer succesivly until input is exhausted or the buffer it full. Than I evaluate it and separate the messages from it. If I reach the end of the buffer, I try to read further.
This does not require to mix ByteBuffer and Socket actions.


      /**
       * Helper:
       * Prepare a BusTicket and notify the station.
       */
      private final void evaluateMessage( SelectionKey key )
      {
            // Get the BusLine where the ticket came in.
            NetLine line = (NetLine)key.attachment();
            
        // What channel?
        SocketChannel socketchannel = (SocketChannel)key.channel();
                
        // Prepare the buffer.
            mBuffer.clear();
        
        while( ! readAndParse( line, socketchannel ) )
            ;
      }


    /**
     * Fill the prepared buffer and evaluate it.
     * @return Parsed to end.
     */
    private final boolean readAndParse( NetLine line, SocketChannel socketchannel )
    {
      
        //
        // Try to read from the socket. 
        // If it fails, close the connection.
        //
        try
        {
            int nbytes = socketchannel.read( mBuffer ); 
            
            if ( -1 == nbytes )
            {
                Log.logger.fine( "Channel has EOF, closing line." );
                line.close();
                return true;
            }
        }
        catch ( Exception ex )
        {
.... exception handling ommited...
            return true;                        // --> LEAVE
        }

            // Prepare buffer for reading.
        mBuffer.flip();

        //
        // Parse the buffer and make BusTickets from it.
        // Notify the listeners about what happened.
        //
            while ( mBuffer.position() < mBuffer.limit() )
            {
            // Can we read at least one short completely?
            if ( mBuffer.remaining() < 2 )
            {
                prepareBufferForNextRead();
                return false;                   // --> LEAVE, we are NOT ready!
            }

            // Retrieve length of the message.
            short ticketlen = mBuffer.getShort();
            
            //
            // Is the following ticket completely in the buffer?
            //
            if ( ticketlen > mBuffer.remaining() )
            {
                // Step back two byte to set reading position at the length-short again.
                mBuffer.position( mBuffer.position() - 2 );
                prepareBufferForNextRead();
                return false;                   // --> LEAVE, we are NOT ready!
            }
            
            
                  // Grap a suitable copy of the buffer.
                  ByteBuffer ticketbuffer = mBuffer.slice();
//            Log.logger.finer( "Prepared buffer slice with capacity: " + ticketbuffer.capacity() );

                  // Configure it to its nominal size.
                  ticketbuffer.limit( (int)ticketlen );
           
                  // Move buffer reading position forward
                  mBuffer.position( mBuffer.position() + ticketlen );
                  
                  // Use package-local ctor to contruct the incoming ticket.
                  BusTicket ticket = new BusTicket( ticketbuffer );

                  // Dispatch the message.
                  this.notifyIncomingBus( line, ticket );
            }
        
        return true;
    }
    
    
    /**
     * Helper:
     * Prepare the buffer for the next read.
     */ 
    private final void prepareBufferForNextRead()
    {        
        ByteBuffer rest = mBuffer.slice();

        // Set writing position to the beginning
        mBuffer.clear();
        
        // Put the rest-buffer to the beginning.
        mBuffer.put( rest );        
    }