SSL with plain old IO is downright simple, yet with NIO it’s a whole different story.
This class uses SSLEngine to allow running multiple SSL connections on a single I/O thread.
The SSL handshake itself is performed by N worker threads to prevent them blocking all I/O.
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
public abstract class NonBlockingSSL implements Runnable
{
final ByteBuffer wrapSrc, unwrapSrc;
final ByteBuffer wrapDst, unwrapDst;
final SSLEngine engine;
final Executor ioWorker, taskWorkers;
public NonBlockingSSL(SSLEngine engine, int bufferSize, Executor ioWorker, Executor taskWorkers)
{
this.wrapSrc = ByteBuffer.allocateDirect(bufferSize);
this.wrapDst = ByteBuffer.allocateDirect(bufferSize);
this.unwrapSrc = ByteBuffer.allocateDirect(bufferSize);
this.unwrapDst = ByteBuffer.allocateDirect(bufferSize);
this.unwrapSrc.limit(0);
this.engine = engine;
this.ioWorker = ioWorker;
this.taskWorkers = taskWorkers;
this.ioWorker.execute(this);
}
public abstract void onInboundData(ByteBuffer decrypted);
public abstract void onOutboundData(ByteBuffer encrypted);
public abstract void onHandshakeFailure(Exception cause);
public abstract void onHandshakeSuccess();
public abstract void onClosed();
public void sendLater(final ByteBuffer data)
{
this.ioWorker.execute(new Runnable()
{
@Override
public void run()
{
wrapSrc.put(data);
NonBlockingSSL.this.run();
}
});
}
public void notifyReceived(final ByteBuffer data)
{
this.ioWorker.execute(new Runnable()
{
@Override
public void run()
{
unwrapSrc.put(data);
NonBlockingSSL.this.run();
}
});
}
public void run()
{
// executes non-blocking tasks on the IO-Worker
while (this.step())
{
continue;
}
// apparently we hit a blocking-task...
}
private boolean step()
{
switch (engine.getHandshakeStatus())
{
case NOT_HANDSHAKING:
boolean anything = false;
{
if (wrapSrc.position() > 0)
anything |= this.wrap();
if (unwrapSrc.position() > 0)
anything |= this.unwrap();
}
return anything;
case NEED_WRAP:
if (!this.wrap())
return false;
break;
case NEED_UNWRAP:
if (!this.unwrap())
return false;
break;
case NEED_TASK:
final Runnable sslTask = engine.getDelegatedTask();
Runnable wrappedTask = new Runnable()
{
@Override
public void run()
{
System.out.println("async SSL task: " + sslTask);
long t0 = System.nanoTime();
sslTask.run();
long t1 = System.nanoTime();
System.out.println("async SSL task took: " + (t1 - t0) / 1000000 + "ms");
// continue handling I/O
ioWorker.execute(NonBlockingSSL.this);
}
};
taskWorkers.execute(wrappedTask);
return false;
case FINISHED:
throw new IllegalStateException("FINISHED");
}
return true;
}
private boolean wrap()
{
SSLEngineResult wrapResult;
try
{
wrapSrc.flip();
wrapResult = engine.wrap(wrapSrc, wrapDst);
wrapSrc.compact();
}
catch (SSLException exc)
{
this.onHandshakeFailure(exc);
return false;
}
switch (wrapResult.getStatus())
{
case OK:
if (wrapDst.position() > 0)
{
wrapDst.flip();
this.onOutboundData(wrapDst);
wrapDst.compact();
}
break;
case BUFFER_UNDERFLOW:
// try again later
break;
case BUFFER_OVERFLOW:
throw new IllegalStateException("failed to wrap");
case CLOSED:
this.onClosed();
return false;
}
return true;
}
private boolean unwrap()
{
SSLEngineResult unwrapResult;
try
{
unwrapSrc.flip();
unwrapResult = engine.unwrap(unwrapSrc, unwrapDst);
unwrapSrc.compact();
}
catch (SSLException exc)
{
this.onHandshakeFailure(exc);
return false;
}
switch (unwrapResult.getStatus())
{
case OK:
if (unwrapDst.position() > 0)
{
unwrapDst.flip();
this.onInboundData(unwrapDst);
unwrapDst.compact();
}
break;
case CLOSED:
this.onClosed();
return false;
case BUFFER_OVERFLOW:
throw new IllegalStateException("failed to unwrap");
case BUFFER_UNDERFLOW:
return false;
}
switch (unwrapResult.getHandshakeStatus())
{
case FINISHED:
this.onHandshakeSuccess();
return false;
}
return true;
}
}
The SSL code is actually independant of NIO, but I added the following code to support NIO.
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
public abstract class NioNonBlockingSSL extends NonBlockingSSL
{
private final SelectionKey key;
public NioNonBlockingSSL(SelectionKey key, SSLEngine engine, int bufferSize, Executor ioWorker, Executor taskWorkers)
{
super(engine, bufferSize, ioWorker, taskWorkers);
this.key = key;
}
private final ByteBuffer big = ByteBuffer.allocateDirect(64 * 1024);
public boolean processIncomingData()
{
big.clear();
int bytes;
try
{
bytes = ((ReadableByteChannel) this.key.channel()).read(big);
}
catch (IOException exc)
{
bytes = -1;
}
if (bytes == -1)
return false;
big.flip();
ByteBuffer copy = ByteBuffer.allocateDirect(bytes);
copy.put(big);
copy.flip();
this.notifyReceived(copy);
return true;
}
@Override
public void onOutboundData(ByteBuffer encrypted)
{
try
{
((WritableByteChannel) this.key.channel()).write(encrypted);
if (encrypted.hasRemaining())
{
throw new IllegalStateException("failed to bulk-write");
}
}
catch (IOException exc)
{
throw new IllegalStateException(exc);
}
}
}