Concurrency, asynchronous and distributed Pub

Let's talk about the more interesting topics...

Synchronous vs async message execution

Message execution is naturally asynchronous, in the engine's internal implementation. However, logically, the execution is treated as synchronous. The engine uses Akka actors internally, to execute each message individually, in its own separate execution context.

This way, one message does not meaningfully impact the other, in terms of resources, timeouts, thread starvation etc. At the same time, not having to deal with all this asynchronicity by default makes user's life easy.

For instance:

$when do.start
=> do.step1
=> do.step2

In a sync implementation, step1 could holdup resources for step2. However, in a fully async implementation, the two would start in parallel and make it really hard for the user to understand what is going on - for instance consider this case:

$when do.start
=> do.step1
=> (ages = (student => get.age(student))
=> do.step2(ages)

If these are ran async and in parallel, you'd have no way of knowing what is going on - in fact, most of the time, they'd fail.

This is why, although the engine internally runs asynchronously, it runs logically in sequence. If you are familiar with futures and promises, this is what we have - the sequence above is equivalent to:

do.start =  do.step1 andThen do.step2

Parallel execution

There are a couple of ways to use parallel and async execution:

  • $flow to introduce sequence/parallel execution
  • spawn
  • streams


If you want to orchestrate the nodes decoposed for a specific message do.this then you can declare a $flow that matches it:

$flow do.this => (do.step1 + (do.step2 | do.step3) + do.step4)

Note that simply because you use $flow, the actions are not guaranteed to run in parallel. The engine merely removes the implicit dependencies between them and can start them in parallel - but they truly only run in parallel, if the respective executors are asynchtonous - see engine implementation details below.


You can spawn new engines, using the ==> and <=> operators:

$when dt.spawnNoWait(var)
==> dt.child1(var)

$when dt.child1(var)
=> ctx.sleep(duration=500)

$when dt.spawnWait(var)
<=> dt.child1(var)

The first one is the "no wait" or fire and forget option, where you spawn a new engine with the respective root message and forget about it - your engine will complete right away, while the other one processes in the background.

The second option will wait for the spawned engine to finish. This can be a little tricky:

  • that node will wait for the spawned engine to finish
  • if you want something else to go in parallel, use $flow

For instance:

$when dt.forkJoin(var)
<=> dt.child1(var)
<=> dt.child1(var)
<=> dt.child1(var)

$flow dt.forkJoin => dt.child1

See more examples in mt-spec and mt-story.


The streams are a modern abstration, derived from the older producer-consumer protocols. A diesel stream is a pipeline of elements processed asynchronously to the producer.

In diesel we have a simple stream implementation for inter-flow communication. One flow can create a stream and listen to data on it, while one or more other flows can generate data on the stream.

Stream lifecycle:

  • a new stream belongs to the consumer engine. This engine will stop all streams when exiting, so use to stop the engine from exiting.
  • as elements are inserted into the stream, the elements are created under the parent. Only the last N of these are kept for logging purposes, the oldest are deleted as new nodes are added, so there is no resource creep.
Create a stream, ownded by the current engine

Start a generator in same flow - this will spawn a separate flow sending data on the stream
$send testdiesel.streams.startGenerator(stream="s1")

Now this original flow, the stream owner/consumer, starts consuming
$send"s1", timeoutMillis=1000)

This flow will now be stopped here until the stream is done.

This will put 5 elements on the stream and then close the stream (`done`).
$when testdiesel.streams.generate(stream)
=>, data=[1,2,3,4,5])

And this is the receiver:
$when == "s1", data)
=> ctx.echo(data=data)
=> (total = total + data)

This is an asynchronous generator, to use with large collections:
=>, start=1, end=5)

Stream context

When creating a stream, any extra arguments are stored in a "context" and are passed into any onXX method as a context argument:

$send"s1", p1="haha")


$when == "s1", data, context)
=> ctx.echo(p1 = context.p1)

Note some properties that control some internal stream behaviours:

  • - max size of stream, config in localhost property file. Default 10k in localhost, 100 on cloud

Streams and timeouts

You can have two types of timeouts when using streams:

Timeout on consumption - use a timeout on consume. This is good when you don't want the entire process to last longer than X.

$send"s1", timeoutMillis=1000)

The other is an activity based timeout - when the stream has been empty for X duration, then close it and be done:

$send"s1", timeoutMillis=1000)

Internal engine design and API

This is probably too much for most, but good details for the curios.

Async message executors

TODO just

This represents the ultimate in flexibility but also ads some complexity for the users:

  • when is a message really "done" ?

For instance, the message $msg <POST> subDb.createSub is an asynchronous REST call, consisting of a request and then a reply on a different actor/thread.

Thus, when the message is triggered initially, it is not in fact complete, as logically, it should wait for the response.

The executor of the message is responsible for notifying the engine when the message is completed. At this point, the engine will consider the next messages in a sequence.


A good example is the implementation of the ctx.sleep message. Instead of sleeping in the current thread, the executor will return a EEngSuspend, instructing the engine to suspend sync execution and await a later message. At the same time, it will set an asynchronous timer, which will then send a DEComplete to the engine, causing it to continue execution where the ctx.sleep was.

This is the actual code for ctx.sleep - see the apply method:

/** a custom executor to sleep asynchronously */
class EEMySleep extends EExecutor("my.sleep") {

/** well, then apply it */
override def apply(in: EMsg, destSpec: Option[EMsg])(implicit ctx: ECtx): List[Any] = {
   // our message was passed a duration integer
   val d = in.attrs.find( == "duration").map(_.calculatedTypedValue.asInt).getOrElse(1000)

   // an executor returns a list of nodes it generated.
   // we'll return an info node and an EEngSuspend, which will suspend this branch of
   // the engine until that DEComplete. 
   // we'll also use the DELater message to execute this reply later (i.e. wait)
   new EInfo("ctx.sleep - slept " + d) :: 
   ext.EEngSuspend("ctx.sleep", "", Some((e,a,l) => { ! DELater(, d, DEComplete(, a, true, l, Nil)))
   })) :: Nil

  override val messages: List[EMsg] =
    EMsg("ctx", "sleep") ::

Note that:

  • it will return a EEngSuspend which will cause the engine to suspend.
  • at the time the engine is suspended, it will send a DELater scheduled to trigger a DEComplete - this last one is the final asynchronous completion of the ctx.sleep message.

Because it is asynchronous, you can start many in parallel in the same engine, and they would truly run in parallel.

This is truly an ask pattern. Be careful with launching many activities in parallel and especially long running ones and expecting the original engine to be around when they return... the ask pattern is evil.

Was this useful?    

By: Razie | 2020-10-03 .. 2023-03-15 | Tags: academy , reference , engine

Viewed 524 times ( | History | Print ) this page.

You need to log in to post a comment!

© Copyright DieselApps, 2012-2024, all rights reserved.