In my implementation after the Methode selectedKeys() and the check for Readability if ( key.isReadable())
another thread is processing the incoming Message!
My Problem is that the selectKeys()-Methode returns the already processed Key again until the Data is read from channel by the prcessing Thread !!!
I tried to remove OP_READ from the key’s interest with interestOps-Methode and set it again after Thread has finished
to read the Data from Channel.
The Problem with this solution is that setting the OP_READ Interest again needs (40-500 ms) and this isn’t very performant!
Is there any way to read Data from Channels from another Thread (not blocking the Main-Thread so it can monitor other Key’s for processing)
without to change the interest’s of the Key which is currently in use (for reading data) of the other Thread!
Here some code of my Main-Thread CommServerSocket
public void run()
{
int keysNumber = 1;
while ( !this.isInterrupted())
{
// Key
SelectionKey key = null;
try
{
if ( keysNumber != 0 )
{
if ( this.trace.isTraceLevelOn(
ATrace.DETAIL | JCubaTrace.COMMUNICATION ))
{
this.trace.communication( ATrace.DETAIL,
"Select-Call..." );
}
}
//keysNumber = this.srvSelector.select( 500 );
keysNumber = this.srvSelector.select( 50 );
//keysNumber = this.srvSelector.select();
if ( keysNumber > 0 )
{
if ( this.trace.isTraceLevelOn( ATrace.DETAIL | JCubaTrace.COMMUNICATION ))
{
this.trace.communication( ATrace.DETAIL,
"Select-Call returns <"
+ keysNumber + ">" );
}
Set acceptSelectedSet = this.srvSelector.selectedKeys();
// All Key's
Iterator it = acceptSelectedSet.iterator();
while ( it.hasNext())
{
// performing Key
key = (SelectionKey) it.next();
// Removing Key from Set
it.remove();
if ( key.isValid())
{
this.trace.communication( ATrace.ALL,
"Key <" +key +"> fuer CommId <"
+ key.attachment() +">" );
try
{
if( key.isConnectable())
{
this.trace.communication( ATrace.ALL,
"Key for CommId <" +key.attachment()
+ "> is Connectable" );
}
// New Client
if ( key.isAcceptable())
{
...
}
// Daten verarbeiten
if ( key.isReadable())
{
this.trace.communication( ATrace.ALL,
"Key for CommId <"
+ key.attachment()
+ "> is Readable" );
// create and use Thread for processing the Request
this.processClientRequest( key );
}
}
catch( CancelledKeyException e )
{
this.trace.exception( e );
try
{
this.trace.communication( ATrace.ALL,
ATrace.ERROR +"Key for CommId <"
+ key.attachment()
+ "> not valid anymore!");
key.cancel();
}
catch( Exception err )
{
this.trace.exception( e );
}
}
}
else
{
this.trace.communication( ATrace.ALL,
ATrace.ERROR +"Key for CommId <"
+ key.attachment()
+ "> not valid anymore!");
key.cancel();
}
}
}
else
{
// No Key's from an registered Channels received
}
}
catch( CancelledKeyException e )
{
this.trace.exception( e );
try
{
this.trace.communication( ATrace.ALL,
ATrace.ERROR +"Key for CommId <"
+ key.attachment()
+ "> not valid anymore!");
}
catch( Exception err )
{
this.trace.exception( e );
}
}
catch ( Exception exc )
{
// TODO Auto-generated catch block
exc.printStackTrace();
this.trace.exception( exc );
}
}
...
...
// is Called from processClientRequest
public void processDataStart()
{
this.trace.communication( ATrace.ALL,
"Reading data for CommId <" + this.commId + "> ..." );
long startTime = System.currentTimeMillis();
// Reset globally stored interestOps without OP_READ (usally 0)
// Duration of this is 0ms
this.selectionKey.interestOps(
this.interestOps );
long endTime = System.currentTimeMillis();
this.trace.communication( ATrace.ALL,
"<SelectionKey.OP_READ> for CommId <" + this.commId + "> "
+ "deleted! Duration <" +( endTime - startTime ) +">" );
// Thread for Reading the Data
ClientCommThread processThread = new ClientCommThread( this );
processThread.start();
}
....
....
// Method is called at the end of the Thread (when reading is finished)
private void processDataEnd()
{
this.trace.communication( ATrace.ALL,
"Data for CommId <" + this.commId + "> read!" );
long startTime = System.currentTimeMillis();
// IMPORTANT: Set of the interests need more than 40ms
this.selectionKey.interestOps(
this.interestOps | (SelectionKey.OP_READ));
// this.selectionKey.selector().wakeup();
long endTime = System.currentTimeMillis();
this.trace.communication( ATrace.ALL,
"<SelectionKey.OP_READ> for CommId <" + this.commId + "> "
+ "set again! Duration <" +( endTime - startTime ) +">" );
// Socket-Connection ends?
if ( this.deregisterFlag == true )
{
// Socket-Connection close...
this.commServerSocket.deregit( this.commId );
}
}
I hope someone could help me!
thx
mikr
PS: Changing from 1.4.2_05 to 1.4.2_08 improved the Duration for set the Interest a little Bit!