I discovered Matthias Mann’s continuations library when Riven showcased it on these forums. I then put (a modified version of) it on top of fork/join and the result was Quasar: https://github.com/puniverse/quasar. If you’re interested, you can read about Quasar, and its Clojure API, Pulsar, in this blog post: http://blog.paralleluniverse.co/post/49445260575/quasar-pulsar
Interesting stuff. I see in your blog post the code examples are in clojure. Do you have equivalent examples in Java?
Nice read.
Have you considered using the Disruptor for your queues? Unbounded queues enable simpler APIs but are almost always a bad idea. Just copy the behavior of Go’s channels; no capacity and you get a SynchronizedQueue, otherwise you get a bounded queue. You should be able to emulate a SynchronizedQueue using the Disruptor, then you only have to document that capacity has to be a power-of-2. But I guess you already require that in your bounded queue implementation.
There are some Java examples pointed out on the project’s page.
You’ll find links there to the documentation, too, when it’s ready.
The bounded queues are implemented very similarly to the disruptor. Are you suggesting one disruptor to replace all queues? If so, I don’t think the disruptor would make a good choice. The disruptor especially shines when you have one producer and multiple consumers (each consumer updates a non-shared index to the last read item, while the producer simply needs to increment a volatile index to mark the last entry written). In fact, if you’ve seen the original LMAX talk, that’s precisely what the disruptor was designed for. But here the problem is the opposite: we have multiple producers but only a single consumer.
pron,
Thanks, I’m not sure how I missed it. I look forward to playing with it.
spasi,
Thanks to the pointer to Disruptor. I haven’t heard of it before.
[quote=“pron,post:4,topic:41694”]
Disruptor development, after the initial release, was mostly driven by exactly this case, multiple producers. Even as early as when instantiating a RingBuffer you have to choose between the two major use cases, see the createSingleProducer and createMultiProducer methods. The MultiProducerSequence class is what takes care of multi-thread publishing in the current version.
Yes, but even in that case, the disruptor only helps when you have multiple consumers. For multiple producers the disruptor does the only thing that can be done: a CAS on each insert. The whole idea behind the disruptor was improving the reads, not the writes. Having each consumer hold its own index into the queue and not having the consumers write contended variables is the disruptor’s contribution. If you have a single consumer, there’s no contention on the read side anyway.
In short, in the single-consumer multiple-producer case, the disruptor is identical to the bounded queue used in Quasar.
That’s my point, isn’t it better to use a proven solution instead of rolling your own? You also get batch processing, configurable wait strategies and dependency chaining out of the box.
[quote=“pron,post:7,topic:41694”]
I don’t see the point of what you’re saying. It’s obvious that it’s not as fast compared to having a single producer, but it’s many times faster than an JDK BlockingQueue in the multi-producer case. CAS-on-insert is much better than a lock and it’s not the only thing that the Disruptor gets right (PoT size to avoid divisions, false-sharing elimination, etc, you do these things in your implementation too).
Maybe you’re right. I don’t know, it just seemed like a bit of an overkill if all I needed was one consumer. I didn’t need dependency chaining and configurable wait strategies, and the only wait strategy I needed wan’t supported by the disruptor anyway – fiber-blocking – so I would have had to do that myself. Also, I wanted to have primitive-type bounded queues, and unbounded queues, too, so an existing solution for one specific type of queue wouldn’t have saved much work. You can look at my implementation here and here. It’s about 300 lines in total. Primitive queues require a bit more work. See here and here. (Those are just the bounded queues).
Actually, I just remembered, there was one tricky issue here. The consumer holds a pointer to the last item it read in the queue (I use this for selective receives), and because I use both bounded and unbounded queues, that pointer could be either an index or a ref to a linked-list node, so it’s gotta be an Object. In the bounded queue case, Martin’s queue uses a long index which always increases; the actual index in the array is that index mod the array length. That meant that any time the consumer read a value, a boxed long would have had to be created and could have been held for a long time (i.e., could be promoted GC-wise). I do something a bit ugly. Internally I use Martin’s long-index approach, but I give the consumer an int index, which is equal to the actual array index. This keeps the index always to a small int (always less than the size of the array), so increasing Integer’s default maximal cached Integer by just a bit, ensures that a the boxed Integer instance is always cached.
We’ve re-written our spaceships demo to use Quasar’s lightweight Java threads. You can read about it here: http://blog.paralleluniverse.co/post/64210769930/spaceships2.