Java Continuations and GreenThreads

Introduction

Continuations can be used to implement Green Threads which are basically virtual threads that run on real (native) threads, like java.lang.Thread. The difference is that we can run thousands, if not millions of green threads on a single thread.

Use case

This enables us to (for example) write AI code with your usual java control-flow, without suffering from the issue of thread overhead and multithreading related complexity (locks, race conditions, etc).

For example, we could write an AI like this, and run it on it’s own green thread


public void run() {
   // this is the AI of the unit, it will go on forever
   while(true) {
      Vec2 originalLocation = new Vec2(this.position);
      Water water = this.findWater();
      this.moveTo(water.position);
      this.drink(water);
      this.moveTo(originalLocation); // walk back
   }
}

public void drink(Water water) {
   while(!this.isFull() && !water.isEmpty()) {
      water.level--;
      this.water++;
@@      sleep(1500); // drinking takes a while
   }
}

public void moveTo(Vec2 dst) {
   Vec2 src = new Vec2(this.position);
   float distance = Vec2.distance(src, dst);
   float duration = distance / this.speed;

   for(float traveled = 0.0f; traveled <= duration; traveled += speed) {
      float ratio = Math.min(traveled / distance, 1.0f);
      this.position.x = src.x + ratio * (dst.x - src.x);
      this.position.y = src.y + ratio * (dst.y - src.y);
@@      yield(); // wake up *here* in the next game tick
   }
}

Implementation

My continuations library is written on top of Matthias Mann’s continuations library. It provides the concept of ‘yield return’, as found in C# and other languages, and builds Green Threads on top of it. It works by rewriting bytecode, to be able to store and restore the Java stack.

There are two options to rewrite the bytecode;

  • Ahead of time: an Ant task will discover your class files and rewrite them
  • At runtime: a java-agent will intercept classloading, and rewrite each class, supporting hot-swapping of code

With either option, you still have full debugging functionality in your IDE: step into/over/out and breakpoints work just fine.

Download files

Matthias Mann’s continuations library (see blog post, mercurial repository)


import java.util.Iterator;
import de.matthiasmann.continuations.CoIterator;
import de.matthiasmann.continuations.SuspendExecution;

public class ContinuationsTestCoIterator {

	static class YieldReturn extends CoIterator<String> {
		@Override
		protected void run() throws SuspendExecution {
			this.produce("a");
			this.produce("b");
			for (int i = 0; i < 4; i++) {
				this.produce("c" + i);
			}
			this.produce("d");
			this.produce("e");
		}
	}

	public static void main(String[] args) {
		Iterator<String> iterator = new YieldReturn();
		for (String str : asIterable(iterator)) {
			System.out.println("got: " + str);
		}
	}

	private static final <E> Iterable<E> asIterable(final Iterator<E> iterator) {
		return new Iterable<E>() {
			@Override
			public Iterator<E> iterator() {
				return iterator;
			}
		};
	}
}

Output:


got: a
got: b
got: c0
got: c1
got: c2
got: c3
got: d
got: e

Bare bones example of the highlevel continuations library:


import static net.indiespot.continuations.VirtualThread.*;
import net.indiespot.continuations.*;
import de.matthiasmann.continuations.SuspendExecution;

public class VirtualThreadTestSchedule {
	public static void main(String[] args) {
		final VirtualProcessor processor = new VirtualProcessor();

		// create virtual threads (or green threads, if you will)

		final long started = now();
		final long finalWake = now() + 5000;

		new VirtualThread(new VirtualRunnable() {
			@Override
			public void run() throws SuspendExecution {
				System.out.println("thread 1: a (" + (now() - started) + "ms)");
				sleep(1000);
				System.out.println("thread 1: b (" + (now() - started) + "ms)");
				sleep(1000);
				System.out.println("thread 1: c (" + (now() - started) + "ms)");
				sleep(1000);
				System.out.println("thread 1: d (" + (now() - started) + "ms)");
				yield();
				System.out.println("thread 1: e (" + (now() - started) + "ms)");
				wakeupAt(finalWake);
				System.out.println("thread 1: f (" + (now() - started) + "ms)");
			}
		}).start();

		new VirtualThread(new VirtualRunnable() {
			@Override
			public void run() throws SuspendExecution {
				sleep(500);
				System.out.println("thread 2: a (" + (now() - started) + "ms)");
				sleep(2000);
				System.out.println("thread 2: b (" + (now() - started) + "ms)");
			}
		}).start();

		new VirtualThread(new VirtualRunnable() {
			@Override
			public void run() throws SuspendExecution {
				sleep(1500);
				System.out.println("thread 3: a (" + (now() - started) + "ms)");
				sleep(100);
				System.out.println("thread 3: b (" + (now() - started) + "ms)");
				sleep(100);
				System.out.println("thread 3: c (" + (now() - started) + "ms)");
				sleep(100);
				System.out.println("thread 3: d (" + (now() - started) + "ms)");
			}
		}).start();

		// game loop
		do {
			processor.tick(now());

			try {
				Thread.sleep(1);
			} catch (InterruptedException exc) {
				// ignore
			}
		} while (processor.hasPendingTasks());
	}

	static long now() {
		return System.nanoTime() / 1_000_000L;
	}
}

Output: (the entire program runs on 1 thread!)


thread 1: a (27ms)
thread 2: a (527ms)
thread 1: b (1027ms)
thread 3: a (1527ms)
thread 3: b (1627ms)
thread 3: c (1727ms)
thread 3: d (1827ms)
thread 1: c (2027ms)
thread 2: b (2527ms)
thread 1: d (3027ms)
thread 1: e (3028ms)
thread 1: f (5000ms)

The overhead of continuations is next to zero, you have have at least tens of thousands of units with each having a few green-threads for their AI, ticking at 60Hz.

Isn’t there at least some significant overhead with saving and restoring the stack?

Cas :slight_smile:

It’s all insignificant, until ofcourse, you store/restore a really deep (recursive) function.

so how are the threads linked to the processor and what does the processor.tick(long time) function do?

VirtualThread.start() attaches the virtual thread to the last defined VirtualProcessor on the current thread, as a convenient shortcut to: VirtualThread.start(processor)

The VirtualProcessor schedules VirtualThreads by their wakeup-time, and pops them off an ordered queue. Upon popping, the VirtualThread is restored, and executed until it reaches a sleep, yield or suspend, which gives control back to the VirtualProcessor, after all local variables that were on the stack, and instruction pointers for every method are stored. Then it peeks at the next VirtualThread, and checks whether it is scheduled to run in the future, it not, it pops it off and executes it.

In short, the VirtualThread is only interrupted by calls of yield/sleep/suspend from the green thread itself. Therefore you control when a thread is pre-empted, not the processor. To give an example:
[icode]while(true) { continue; }[/icode] would hang your mainloop / VirtualProcessor, while [icode]while(true) { yield(); }[/icode] would not.

This looks neat.

It would be nice if the processor could have a thread pool, so we can dedicate more than one native thread to servicing green threads.

How does it compare to other green thread OSS? I’m not familiar with any libs in particular, just curious if you looked into other stuff and how it compares. Yours seems very simple and straightforward, which is great.

If you run N green threads on M native threads, your app is subject to all multi-threading issues there are. With green threads you shouldn’t use synchronized or calls like lang.lang.Object.wait(), or any blocking calls for that matter, as that will hijack the VirtualProcessor (the VirtualThread never releases control).

I’m in the process of adding VirtualLock and VirtualCondition (see Sun’s Lock and Lock.newCondition() for usage).

Regarding the alternatives:
Apache’s continuations doesn’t work properly and (more importantly) is not maintained.
Kilims fibers require you to rewrite your whole program to the message passing paradigm.

I prefer my library to be as close as possible to regular threads, runnables and locks.

I added VirtualLock to the continuations library, so now we can do locking and signaling on continuations too.

All demos are in the continuations-riven-test.jar file.

Example class (BoundedBuffer) ported from:
http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/locks/Condition.html



import static net.indiespot.continuations.VirtualThread.sleep;
import de.matthiasmann.continuations.SuspendExecution;
import net.indiespot.continuations.*;

public class VirtualThreadTestBoundedBuffer {

	static class BoundedBuffer {
@@		final VirtualLock lock = new VirtualLock();
@@		final VirtualCondition notFull = lock.newCondition();
@@		final VirtualCondition notEmpty = lock.newCondition();

		final Object[] items;
		public int putptr, takeptr, count;

		public BoundedBuffer(int cap) {
			items = new Object[cap];
		}

		public void put(Object x) throws SuspendExecution {
@@			lock.lock();
			try {
				while (count == items.length) {
@@					notFull.await();
				}

				items[putptr] = x;
				if (++putptr == items.length) {
					putptr = 0;
				}
				++count;

@@				notEmpty.signal();
			} finally {
@@				lock.unlock();
			}
		}

		public Object take() throws SuspendExecution {
@@			lock.lock();
			try {
				while (count == 0) {
@@					notEmpty.await();
				}

				Object x = items[takeptr];
				if (++takeptr == items.length) {
					takeptr = 0;
				}
				--count;

@@				notFull.signal();
				return x;
			} finally {
@@				lock.unlock();
			}
		}
	}

	public static void main(String[] args) {
		final VirtualProcessor processor = new VirtualProcessor();

		final BoundedBuffer bb = new BoundedBuffer(10);

		final int slowDownFactor = 10;

		new VirtualThread(new VirtualRunnable() {
			@Override
			public void run() throws SuspendExecution {
				for (int i = 0; true; i++) {
					bb.put(Integer.valueOf(i));

					// varying production rate
					sleep(slowDownFactor * (5 + (i / 10) % 15));
				}
			}
		}).start();

		new VirtualThread(new VirtualRunnable() {
			@Override
			public void run() throws SuspendExecution {
				while (true) {
					System.out.println("size=" + bb.count);
					bb.take();

					// constant consumption rate
					sleep(slowDownFactor * 10);
				}
			}
		}).start();

		// game loop
		do {
			processor.tick(now());

			try {
				Thread.sleep(1);
			} catch (InterruptedException exc) {
				// ignore
			}
		} while (processor.hasPendingTasks());
	}

	static long now() {
		return System.nanoTime() / 1_000_000L;
	}
}

I decided to give it a whirl. I need to display a progress bar, and the actual work that is progressing needs to happen on the GL thread. Doing it with a virtual thread was very easy. :slight_smile: Kudos! The agent is easy to use at least. I haven’t integrated it into my build. I’ll probably use code from InstrumentationTask since my build done with Scar in Java.

There might still be a use case for a thread pool, but that’s ok. :slight_smile:

It might be nice to support specifying the processor when a virtual thread is created. This way two native threads could service separate tasks. I don’t actually have this need, just trying to help with feature creep. :wink:

Have you considered optional thread and tasks names that could be included in exceptions?

Can I disable the agent’s logging? It keeps me from reading important logging!

AFIK: kilim doesn’t work in java-7, unless it’s been fixed since the last time I looked.

Great to hear it was both functional for you and easy to integrate in an existing application.

It’s easy enough too add. Will look into that later today (once I ate breakfast and all…)

[quote=“Nate,post:9,topic:40622”]
VirtualThread.start(VirtualProcessor) already provides that functionality. VirtualThread.start() uses a ThreadLocal pointing at the last created VirtualProcessor.

Actually, no, but I do consider them now :slight_smile:

It’s easy enough to alter the stacktraces of (uncaught!) Throwables to make them look even more like real threads. Currently uncaught Throwables reach all the way up to VirtualProcessor.tick() which might not be desirable.

That seems to be a regression after a revision in the ‘core’ of this library. Verbosity used to be suppressed by default, and according to the first glance of the code it still is, but somehow everything is still printed to stdout. I noticed this earlier, but had to ship a new release (for VirtualLocks) and decided it was not worth postponing.

I just added thread names, and support for VirtualUncaughtExceptionHandler:


virtualProcessor.setUncaughtExceptionHandler(new VirtualUncaughtExceptionHandler() {
   public void uncaughtException(VirtualThread thread, Throwable uncaught) {
      // do something
   }
});

the default behaviour (when there is no handler) is to dump the (cleaned up) stacktrace to stderr.

After my preliminary fix, Matthias Mann swiftly fixed it properly.

Download files

I don’t have that much time atm to think about for what this lib could be useful for me.
Could you perhaps explain your progress bar a bit more Nate?

I don’t think there is, actually. VirtualThreads are extremely light weight, and making N VirtualProcessors pop tasks continuations off of a shared queue will require synchronization, slowing the thing down by at least an other of magnitude, if you run tasks that yield/sleep very often.

There is an additional problem, which seems very, very nasty: with native threads you get guaranteed cache consistency (per thread), as otherwise not a single piece of code would work. If your VirtualThread would be pushed around real threads all the time, then this code would cause undefined behaviour:


for(int i=0; i<values.length; i++) {
   values[i] += 1; // race conditions within a single thread!
   yield();
   values[i] -= 1;
   yield();
}

TL;DR: No way! Feature creep successfully averted!

If you want N native threads handling tasks, create N VirtualProcessors on these native threads. Then spawn your VirtualThreads on these threads and you’re done. And it’d be perfectly safe too.

Sure. I have an export dialog. The user clicks OK and it creates an FBO, renders to it, gets the pixels for it, and saves them to a video file. It repeats this process many times, once for each frame in the video. It is rendering to the FBO with OpenGL, so has to do this processing in the GL thread. This means when the user clicks OK, the UI freezes for 10+ seconds while the processing happens.

To fix this, I use Riven and Matthias’ fancy magic thread stuff. I do the export in a virtual thread that is run on the GL thread. In my processing loop, I do a virtual sleep after outputting a frame. Now the UI is rendered while the processing virtual thread is sleeping, and only every once in a while do I halt the GL thread to render and output a frame. This allows me to show a progress dialog that fills up as frames are output.

Riven, good point that a processor per thread works just fine. :slight_smile: Thanks for the fast fixes and new features! :smiley:

Got this (while using the agent):

java.lang.VerifyError: Bad type on operand stack in method com.esotericsoftware.spine.editor.dialogs.ExportDialog.export()V at offset 1061

Narrowed it down to this line:

progressDialog.show(getStage());

getStage is just a getter that returns a field. It is defined in a parent class. The class hierarchy is deep, getStage is defined in the 6th parent class. Changing to this causes the error to go away:

progressDialog.show(editor.ui.stage);

I’m not sure how I could debug this further? There is a lot of code leading up to this, but it is structured this way:

	protected void result (Object object) {
		// ...
		new VirtualThread(new VirtualRunnable() {
			public void run () throws SuspendExecution {
				export();
			}
		}).start();
	}

	void export () throws SuspendExecution {
		EditorDialog progressDialog = new EditorDialog("Exporting...", skin);
		progressDialog.show(getStage());
		// ...
		try {
			// ...
			VirtualThread.yield();
			// ...
		} catch (Exception ex) {
			// ...
		} finally {
			// ...
		}
		// ...
	}

It’s always fun to see new code being used out in the open, breaking in unexpected ways! Keep faith thought :slight_smile:

Can you check whether ahead-of-time transformation works? (ant / scar task)

Either way, please send me:
[x] com/esotericsoftware/spine/editor/dialogs/ExportDialog.class (prior to transforming, and post transformation, if you want to go the extra mile)

Sent PM with the files. AOT transformation also results in an error. My code has changed slightly from when I first reported the error. With the agent I get:

java.lang.VerifyError: Bad local variable type in method com.esotericsoftware.spine.editor.dialogs.ExportDialog.export()V at offset 1512

And with AOT I get:

java.lang.VerifyError: (class: com/esotericsoftware/spine/editor/dialogs/ExportDialog, method: export signature: ()V) Register 16 contains wrong type

FWIW, Scar code to do instrumentation (ripped from InstrumentationTask):


MethodDatabase db = new MethodDatabase(YourClass.class.getClassLoader());
for (String classFile : paths("path/to/classes", "**.class"))
	db.checkClass(new File(classFile));
for (File file : db.getWorkList())
	instrumentClass(db, file);

	static private void instrumentClass (MethodDatabase db, File file) throws IOException {
		FileInputStream input = new FileInputStream(file);
		ClassReader reader = new ClassReader(input);
		input.close();

		ClassWriter writer = new DBClassWriter(db, reader);
		reader.accept(new InstrumentClass(writer, db, false), ClassReader.SKIP_FRAMES);
		byte[] newClass = writer.toByteArray();

		FileOutputStream output = new FileOutputStream(file);
		output.write(newClass);
		output.close();
	}

I updated the Continuations library - all test cases work now without issues.

The latest sources can be found here.
And a compiled version is available on my website.

Cool, works now, though I get these warnings with the agent:

[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Table#getChildren()Lcom/badlogic/gdx/utils/SnapshotArray;
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Table#getStage()Lcom/badlogic/gdx/scenes/scene2d/Stage;
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Table#getChildren()Lcom/badlogic/gdx/utils/SnapshotArray;
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Table#add(Lcom/badlogic/gdx/scenes/scene2d/Actor;)Lcom/esotericsoftware/tablelayout/Cell;
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#getValue()F
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#getValue()F
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#setValue(F)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#setValue(F)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/esotericsoftware/spine/editor/dialogs/EditorDialog#addAction(Lcom/badlogic/gdx/scenes/scene2d/Action;)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#setValue(F)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/esotericsoftware/spine/editor/dialogs/EditorDialog#addAction(Lcom/badlogic/gdx/scenes/scene2d/Action;)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/Slider#setValue(F)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/esotericsoftware/spine/editor/dialogs/EditorDialog#addAction(Lcom/badlogic/gdx/scenes/scene2d/Action;)V
[Continuations] WARNING: Method not found in class - assuming suspendable: com/badlogic/gdx/scenes/scene2d/ui/CheckBox#getText()Ljava/lang/CharSequence;

Don’t see them when doing AOT instrumentation. Otherwise it works great! :slight_smile:

I don’t have immediate use for it, but i just want to express how cool this is. Maybe emphasizing the caveats a bit more as stated on Matthias website would be good.