Problem with same SelectionKey returning again and again

In my implementation after the Methode selectedKeys() and the check for Readability if ( key.isReadable())
another thread is processing the incoming Message!

My Problem is that the selectKeys()-Methode returns the already processed Key again until the Data is read from channel by the prcessing Thread !!!

I tried to remove OP_READ from the key’s interest with interestOps-Methode and set it again after Thread has finished
to read the Data from Channel.
The Problem with this solution is that setting the OP_READ Interest again needs (40-500 ms) and this isn’t very performant!

Is there any way to read Data from Channels from another Thread (not blocking the Main-Thread so it can monitor other Key’s for processing)
without to change the interest’s of the Key which is currently in use (for reading data) of the other Thread!

Here some code of my Main-Thread CommServerSocket


	public void run()
	{
		int keysNumber = 1;
		
		while ( !this.isInterrupted())
		{
			// Key
			SelectionKey key = null;

			try
			{
				if ( keysNumber != 0 )
				{
					if ( this.trace.isTraceLevelOn( 
								ATrace.DETAIL | JCubaTrace.COMMUNICATION ))
					{
						this.trace.communication( ATrace.DETAIL, 
							"Select-Call..." );
					}
				}

				//keysNumber = this.srvSelector.select( 500 );
				keysNumber = this.srvSelector.select( 50 );
				//keysNumber = this.srvSelector.select();

				if ( keysNumber > 0 )
				{
					if ( this.trace.isTraceLevelOn( ATrace.DETAIL | JCubaTrace.COMMUNICATION ))
					{
						this.trace.communication( ATrace.DETAIL,
							"Select-Call returns <"
							+ keysNumber + ">" );
					}

					Set acceptSelectedSet = this.srvSelector.selectedKeys();

					// All Key's
					Iterator it = acceptSelectedSet.iterator();

					while ( it.hasNext())
					{
						// performing Key
						key = (SelectionKey) it.next();

 						// Removing Key from Set
						it.remove();

						if ( key.isValid())
						{
							this.trace.communication( ATrace.ALL,
								"Key <" +key +"> fuer CommId <"
								+ key.attachment() +">" );

							try
							{
								if( key.isConnectable())
								{
									this.trace.communication( ATrace.ALL, 
										"Key for CommId <" +key.attachment()
										+ "> is Connectable" );
								}

								// New Client
								if ( key.isAcceptable())
								{
									...
								}
			
								// Daten verarbeiten
								if ( key.isReadable())
								{
									this.trace.communication( ATrace.ALL, 
										"Key for CommId <"
										+ key.attachment()
										+ "> is Readable" );

									// create and use Thread for processing the Request
									this.processClientRequest( key );
								}
							}
							catch( CancelledKeyException e )
							{
								this.trace.exception( e );

								try
								{
									this.trace.communication( ATrace.ALL, 
										ATrace.ERROR +"Key for CommId <"
										+ key.attachment()
										+ "> not valid anymore!");
	
									key.cancel();
								}
								catch( Exception err )
								{
									this.trace.exception( e );
								}
							}
						}
						else
						{
							this.trace.communication( ATrace.ALL, 
								ATrace.ERROR +"Key for CommId <"
								+ key.attachment()
								+ "> not valid anymore!");
							
							key.cancel();
						}
					}
				}
				else
				{
					// No Key's from an registered Channels received
				}
			}
			catch( CancelledKeyException e )
			{
				this.trace.exception( e );

				try
				{
					this.trace.communication( ATrace.ALL, 
						ATrace.ERROR +"Key for CommId <"
							+ key.attachment()
							+ "> not valid anymore!");
				}
				catch( Exception err )
				{
					this.trace.exception( e );
				}
			}
			catch ( Exception exc )
			{
				// TODO Auto-generated catch block
				exc.printStackTrace();
				this.trace.exception( exc );
			}
		}

	...
	...

	// is Called from processClientRequest
	public void processDataStart()
	{
		this.trace.communication( ATrace.ALL, 
			"Reading data for CommId <" + this.commId + "> ..." );

		long startTime = System.currentTimeMillis();

		// Reset globally stored interestOps without OP_READ (usally 0)
		// Duration of this is 0ms
		this.selectionKey.interestOps( 
			this.interestOps );

		long endTime = System.currentTimeMillis();
		this.trace.communication( ATrace.ALL,
			"<SelectionKey.OP_READ> for CommId <" + this.commId + "> "
			+ "deleted! Duration <" +( endTime - startTime ) +">" );

		// Thread for Reading the Data
		ClientCommThread processThread = new ClientCommThread( this );
		processThread.start();
	}

	....
	....

	// Method is called at the end of the Thread (when reading is finished)
	private void processDataEnd()
	{
		this.trace.communication( ATrace.ALL, 
			"Data for CommId <" + this.commId + "> read!" );

		long startTime = System.currentTimeMillis();

		// IMPORTANT: Set of the interests need more than 40ms
		this.selectionKey.interestOps(
			this.interestOps | (SelectionKey.OP_READ));

//		this.selectionKey.selector().wakeup();

		long endTime = System.currentTimeMillis();

		this.trace.communication( ATrace.ALL,
			"<SelectionKey.OP_READ> for CommId <" + this.commId + "> "
			+ "set again! Duration <" +( endTime - startTime ) +">" );

		// Socket-Connection ends?
		if ( this.deregisterFlag == true )
		{
			// Socket-Connection close...
			this.commServerSocket.deregit( this.commId );
		}
	}

I hope someone could help me!
thx
mikr
PS: Changing from 1.4.2_05 to 1.4.2_08 improved the Duration for set the Interest a little Bit!

You’re missing the point, I think - you’re not supposed to have multiple threads - if you do that, you’re slowing things down because the OS has already carefully removed the multple threadedness for you so you can handle it as a single thread.

If you’re desperate to use multiple threads, you can have the main thread parse each request, and send a completed “request” object to a thread pool, and then get on with parsing the next request.

i.e. decouple the multi-threadedness from the select() operations.