How do I use Ashley entity framework alongside Kryonet?

Hi,

I’ve been experimenting with both the Kryonet and Ashley frameworks separately and really like both, and now I’d like to use both in a single project.

My question is how do I hook them up together? Kryonet works with listeners and there is no ‘loop’, but Ashley needs a loop with a delta passed into it’s update method.

I’ve seen a few posts where people do use a loop on the server, and Kryonet’s listener simply add to a queue. This queue then gets processed during every update. Is this a good way to organise things? All of the examples in the Kryonet repository simply handle everything directly in the listener itself at the time of the event.

Thanks!

Hi,

I read a recommendation from Nate a couple of days ago that explained that due to the listener also does the (de)serializing of the objects he recommended to pass it off to another thread on busy network applications. I changed my code around to use another thread and it resolved a couple of queuing issues as well, so I’d say go for it! :slight_smile:

Mike

Hi Mike,

Thanks for that reply. So you just have a loop running in it’s own thread which processes the queue? Would it be possible to post any code snippet of your loop setup?

Hi,

Sure, there is probably a much nicer way to do it though, but this works fine for me. As I’m not for-eaching the arraylist I didn’t see any issues so far of not synchronizing it.

I’m sure there are a lot of people on the forum that knows a much neater way to do it though :slight_smile:

Mike

public class HandleMessageThread extends Thread {

    private boolean threadRunning = true; //Check if the thread should keep running

    private final static ArrayList<Message> receivedMessages = new ArrayList<Message>();

    @Override
    public void run() {
        while (isAlive() && threadRunning) {
            try {
                while (receivedMessages.size() > 0) {
                    Message message = null;
                    try {
                        message = receivedMessages.get(0);
                        recievedMessage(message.getConnection(), message.getMessage());
                        receivedMessages.remove(0);
                    } catch (final Throwable t) {
                        if (message != null) {
                            Main.getKryoClient().submitError("Error processing message " + message.getMessage().toString(), t);
                        } else {
                            Main.getKryoClient().submitError("Error processing message", t);
                        }
                        receivedMessages.remove(0);
                    }
                }
            } catch (final Throwable e) {
                Main.getKryoClient().submitError("Executing message", e);
            }
            try {
                Thread.sleep(10);
            } catch (final InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void recievedMessage(Connection c, Object p) {
        if (p instanceof SessionReturnMessage) {
            SessionReturn.process((SessionReturnMessage)p);
        } else if (p instanceof PingMessage) {
            [...]
        } else {
            Main.getKryoClient().submitError("Cannot find method: " + p.toString());
        }
    }

    public void stopThread() {
        threadRunning = false;
    }

    public void received(Connection c, Object p) {
        receivedMessages.add(new Message(c, p));
    }
}

public class Message {

    private final Connection connection;

    private final Object message;

    public Message(Connection c, Object p) {
        connection = c;
        message = p;
    }

    public Connection getConnection() {
        return connection;
    }

    public Object getMessage() {
        return message;
    }

}

If you add items to an ArrayList on thread A, and get/remove them from thread B, you must synchronize access to this ArrayList, or the behavior of the ArrayList is undefined, as shown by the following example:


import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

public class Queue {
	public static void main(String[] args) {
		final int pushCount = 16 * 1024 * 1024;

		new Thread(new Runnable() {
			@Override
			public void run() {
				for(int i = 0; i < pushCount; i++) {
					push(Integer.valueOf(i));
				}
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				Set<Integer> set = new HashSet<>();
				for(int i = 0; i < pushCount; i++) {
					System.out.println("queue size: " + queue.size() + ", polled count: " + set.size());
					Integer got = poll();
					if(got == null)
						continue;
					if(!set.add(got))
@@						throw new IllegalStateException("repeated poll result!");
				}
			}
		}).start();
	}

	static final ArrayList<Integer> queue = new ArrayList<Integer>();

	public static void push(Integer s) {
		queue.add(s);
	}

	public static Integer poll() {
		return queue.isEmpty() ? null : queue.remove(0);
	}
}


queue size: 3313, polled count: 0
queue size: 4569, polled count: 1
queue size: 4786, polled count: 2
queue size: 4989, polled count: 3
queue size: 5245, polled count: 4
queue size: 5499, polled count: 5
queue size: 5621, polled count: 6
queue size: 5902, polled count: 7
queue size: 6222, polled count: 8
queue size: 9369, polled count: 9
queue size: 9629, polled count: 10
queue size: 10100, polled count: 11
queue size: 10548, polled count: 12
queue size: 12009, polled count: 13
queue size: 12779, polled count: 14
queue size: 13314, polled count: 15
queue size: 13764, polled count: 16
queue size: 14052, polled count: 17
queue size: 14416, polled count: 18
queue size: 15165, polled count: 19
queue size: 15738, polled count: 20
queue size: 16326, polled count: 21
queue size: 16884, polled count: 22
queue size: 17444, polled count: 23
queue size: 18025, polled count: 24
queue size: 18696, polled count: 25
queue size: 19918, polled count: 26
queue size: 20622, polled count: 27
queue size: 21036, polled count: 28
queue size: 21078, polled count: 29
queue size: 21773, polled count: 30
Exception in thread "Thread-1" java.lang.IllegalStateException: repeated poll result!
	at nav.Queue$2.run(Queue.java:30)
	at java.lang.Thread.run(Thread.java:745)

Solve with:


static final ArrayList<Integer> queue = new ArrayList<Integer>();

	public static void push(Integer s) {
		synchronized (queue) {
			queue.add(s);
		}
	}

	public static Integer poll() {
		synchronized (queue) {
			return queue.isEmpty() ? null : queue.remove(0);
		}
	}

Or much better: (among other things, due to the capped queue size)


	static final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(n);

	public static void push(Integer s) {
		while (true) {
			try {
				queue.put(s);
				break;
			}
			catch (InterruptedException e) {
				// err
			}
		}
	}

	public static Integer poll() {
		return queue.poll();
	}

Awesome as always Riven, guess I was just lucky that it didn’t break yet. I used to use a synchronize block but thought it wasn’t needed as I wasn’t looping through it. I’m now using a ArrayBlockingQueue instead.

Mike