Actor Model : Java short implementation

I just learned about the actor model a while ago and decided to see if I could do something similar in Java. The result is pretty short, only 2 class and it works.


public class Message {
	
	public final Object object;
	public final Actor source;
	
	public Message(Object object, Actor source){
		this.object = object;
		this.source = source;
	}

}


import java.util.ArrayList;
import java.util.HashMap;

public abstract class Actor implements Runnable{
	
	private Thread thread;
	private ArrayList<Message> messageList = new ArrayList<Message>();
	private HashMap<Object, ArrayList<Message>> waitMap = new HashMap<Object, ArrayList<Message>>();
	private Object current = null;
	
	private synchronized void add(Message m){
		messageList.add(m);
		notify();
	}
	
	private synchronized Message get(){
		return messageList.remove(0);
	}
	
	private synchronized void waiting(){
		if(messageList.size() == 0){
			try { wait(); } 
			catch (InterruptedException e) { } //Do nothing
		}
	}
	
	public void wait(Object key, Message m){
		ArrayList<Message> list = waitMap.get(key);
		if(list == null){
			list = new ArrayList<Message>();
			waitMap.put(key, list);
		}
		list.add(m);
	}
	
	public void notify(Object key){
		ArrayList<Message> list = waitMap.get(key);
		if(list != null && list.size() > 0){
			Message m = list.remove(0);
			add(m);
			if(list.size() == 0){
				waitMap.put(key, null);
			}
		}
	}
	
	public void notifyAll(Object key){
		ArrayList<Message> list = waitMap.get(key);
		if(list != null && list.size() > 0){
			for(Message m : list){
				add(m);
			}
			list.clear();
			waitMap.put(key, null);
		}
	}
	
	public void sleep(int milli){
		try { Thread.sleep(milli); }
		catch (InterruptedException e){ }; //Do nothing
	}
	
	public void println(Object o){
		System.out.println(o);
	}
	
	public void println(){
		System.out.println();
	}
	
	public void print(Object o){
		System.out.print(o);
	}
	
	public void send(Object o, Actor actor){
		if(actor != null){
			actor.add(new Message(o, this));
		}
	}
	
	public void act() {
		thread = new Thread(this);
		thread.start();
	}
	
	public void stop() {
		thread.interrupt();
	}

	@Override
	public void run() {
		init();
		while(!thread.isInterrupted()){
			if(messageList.size() > 0){ 
				Message m = get();
				current = m.object;
				receive(m); 
			}else{ waiting(); }
		}
		end();
	}
	
	public boolean is(Object o){
		if(current == null){ return false; }
		else{ return current.equals(o); }
	}
	
	public boolean isA(Object o){
		if(current == null || o == null){ return false; }
		else{ return o.getClass().isInstance(current); }
	}
	
	public <E> boolean isA(Class<E> c){
		if(current == null || c == null){ return false; }
		else{ return c.isInstance(current); }
	}

	public abstract void init();
	
	public abstract void receive(Message m);
	
	public abstract void end();

}

PingPong with Scala Actors

import scala.actors.Actor;

object PingPong {

case object Ping
case object Pong
case object Stop

class Ping(count: Int, pong: Pong) extends Actor{
	def act(){
	    var begin = System.nanoTime();
		var pingsLeft = count - 1;
		pong ! Ping;
		while (true) {
			receive {
			case Pong =>
			if (pingsLeft % 1000 == 0)
				Console.println("Ping: pong")
				if (pingsLeft > 0) {
					pong ! Ping
					pingsLeft -= 1
				} else {
					Console.println("Ping: stop")
					pong ! Stop
					var end = System.nanoTime();
					var delta = end-begin;
					println(delta);
					println(delta/(1000*1000));
					exit()
				}
			}
		}
		
	}
}

class Pong extends Actor{
	def act(){
		var pongCount = 0;
		while (true) {
			receive {
			case Ping =>
			if (pongCount % 1000 == 0)
				Console.println("Pong: ping "+pongCount)
				sender ! Pong
				pongCount = pongCount + 1
			case Stop =>
			Console.println("Pong: stop")
			exit()
			}
		}
	}
}

def main(args: Array[String]){
	val pong = new Pong;
	val ping = new Ping(100000, pong);
	ping.start
	pong.start
}

}

PingPong with my Java actors

import main.Actor;
import main.Message;

public class PingPong {
	
	private class Stop { }
	
	private static class Ping extends Actor {
		
		private int pingLeft;
		private Pong pong;
		
		public Ping(int count, Pong pong){
			pingLeft = count;
			this.pong = pong;
		}
		
		@Override
		public void init() {
			pingLeft--;
			send(Ping.class, pong);
		}

		@Override
		public void receive(Message m) {
			if(is(Pong.class)){
				if(pingLeft % 1000 == 0){
					println("Ping: pong");
				}
				pingLeft--;
				if(pingLeft > 0){
					send(Ping.class, pong);
				}else{
					println("Ping: stop");
					send(Stop.class, pong);
					stop();
				}
			}
		}

		@Override
		public void end() { }
		
	}
	
	private static class Pong extends Actor {
		
		private int pongCount = 0;

		@Override
		public void init() { }

		@Override
		public void receive(Message m) {
			if(is(Ping.class)){
				if(pongCount % 1000 == 0){
					println("Pong: ping " + pongCount);
				}
				send(Pong.class, m.source);
				pongCount++;
			}else if(is(Stop.class)){
				println("Pong: stop");
				stop();
			}
		}

		@Override
		public void end() { }

	}
	
	public static void main(String [] args){
		Pong pong = new Pong();
		Ping ping = new Ping(100000, pong);
		pong.act();
		ping.act();
	}

}

Actor’s implementation is already made in Java with JADE framework. It has good specification.

Functional Java also has a very high performance Actor implementation, which is essentially the same one that’s in Scalaz. The gold standard for actor systems these days though is Akka, which has both a Scala and Java API.

I see a number of big issues in the Java implementation up top. The big no-no is the use of a single thread per-actor, which you simply should never be doing for any kind of serious actor system. Use a thread pool, or better yet make the execution strategy pluggable.

The overloading of wait/notify/notifyAll is probably not a good idea either, since it’s too easily confused with the same-named methods on Object, but it’s minor compared to that previous one. I think everyone should explore the actor model, try inventing their own, but then look at an industrial-strength implementation and see what they do differently and why.

I don’t get it.
Reading the wikipedia (http://en.wikipedia.org/wiki/Actor_model) it sounds like these are nothing more than ‘nodes’ on a communication network?
I’m not seeing the point? Why have them? They don’t appear to be designed do any work at all other then shunt data off to other actors??

The point of an actor is that they do useful work whenever they get a message, which results in side effects and usually one or more different messages going to other actors. The akka docs make it a little clearer than the wikipedia article, and there are also tutorials you can look at.

I use actors in my code by having a web service send a message to an actor to process everything async, then after much expensive backend processing, get back a reply (also via an actor message) to send back. Inbetween there’s a network of service classes that are themselves actors that send each other messages.

If you’ve ever used a Message Driven Bean in a JavaEE app, those are also basically actors (with a very heavyweight mailbox).

Although I haven’t done much work with Actors, I always thought it would be interesting to try and set up GameEntity’s as Actors and handle things asynchronously, although you could simply have an Update message that it reacts too.

I’ve been tinkering with a game that does exactly that, and in fact goes totally nuts with the idea and makes every damn thing an actor, including every square on the map (it’s a tile-based game). It even uses libgdx’s scene2d actors for the graphics, which are of course a very limited form of actor (they’re not even threadsafe) but handy for what they do. It is not at all small or efficient that way, but hey it runs fast enough on my machine anyway.

Recently I did what I tend to do, tore the whole thing up and started over. My quest for purity and using scalaz actors will probably be falling by the wayside, and it’ll likely be stateful Akka actors this time, become() and all.

Cheers for the replies sproingie, I have worked with MQ Series stuff before (ie: set up JMS message queues to chit-chat with AS400 stuff) and did think to myself that perhaps Actors were an alternative to this. I’ll take a look at those links of yours soon (on lunchbreak atm)

Hmmm, just had a crazy thought. Could we use “Actors” to represent NPC’s in a game with the comms between them used to implement an enemy hive mind?

Actors localize state, they don’t share it very well, because unless you’re willing to do some fairly complex stuff around Lamport clocks (or vector clocks), you’re not going to get consensus among them on what the current state actually is. Akka has a few things around STM to help, but if you’re really going for a shared AI state, actors are not a great model for that.

Thx for pointing that out. I changed it to use a thread pool instead and my performance increased by 6 times. Strangely, the default Scala actors don’t seems so fast. For sending and receiving 100000 messages it takes :

Mine : 100 millis (1 micro per message in average)
Scala : 1000 millis (10 micro per message in average)

:persecutioncomplex:

Only problems is that by default, my thread pool is always running until I close my JVM. Wonder what is the best solution for that;

  • adding start()/stop() method to actor and when at least one actor is running the thread pool is running
  • manually starting/stopping the thread pool directly in the code

EDIT : Decided to make an ActorSystem like Akka. You can specify which ActorSystem an Actor use and you need to start/stop the ActorSystem.

If you do an empty benchmark with messages that don’t get any processing, scala is likely to underperform because it just plain has more overhead. Even then I’d try benchmarking it against scalaz or akka actors, since scala.actors doesn’t have the best latency characteristics.