Let's talk about the more interesting topics...
Message execution is naturally asynchronous, in the engine's internal implementation. However, logically, the execution is treated as synchronous. The engine uses Akka actors internally, to execute each message individually, in its own separate execution context.
This way, one message does not meaningfully impact the other, in terms of resources, timeouts, thread starvation etc. At the same time, not having to deal with all this asynchronicity by default makes user's life easy.
For instance:
$when do.start
=> do.step1
=> do.step2
In a sync implementation, step1 could holdup resources for step2. However, in a fully async implementation, the two would start in parallel and make it really hard for the user to understand what is going on - for instance consider this case:
$when do.start
=> do.step1
=> (ages = payload.map (student => get.age(student))
=> do.step2(ages)
If these are ran async and in parallel, you'd have no way of knowing what is going on - in fact, most of the time, they'd fail.
This is why, although the engine internally runs asynchronously, it runs logically in sequence. If you are familiar with futures and promises, this is what we have - the sequence above is equivalent to:
do.start = do.step1 andThen do.step2
There are a couple of ways to use parallel and async execution:
$flow
to introduce sequence/parallel execution$flow
If you want to orchestrate the nodes decoposed for a specific message do.this
then you can declare a $flow
that matches it:
$flow do.this => (do.step1 + (do.step2 | do.step3) + do.step4)
$flow
, the actions are not guaranteed to run in parallel. The engine merely removes the implicit dependencies between them and can start them in parallel - but they truly only run in parallel, if the respective executors are asynchtonous - see engine implementation details below.
You can spawn new engines, using the ==>
and <=>
operators:
$when dt.spawnNoWait(var)
==> dt.child1(var)
$when dt.child1(var)
=> ctx.sleep(duration=500)
$when dt.spawnWait(var)
<=> dt.child1(var)
The first one is the "no wait" or fire and forget option, where you spawn a new engine with the respective root message and forget about it - your engine will complete right away, while the other one processes in the background.
The second option will wait for the spawned engine to finish. This can be a little tricky:
$flow
For instance:
$when dt.forkJoin(var)
<=> dt.child1(var)
<=> dt.child1(var)
<=> dt.child1(var)
$flow dt.forkJoin => dt.child1
See more examples in mt-spec and mt-story.
The streams are a modern abstration, derived from the older producer-consumer protocols. A diesel stream is a pipeline of elements processed asynchronously to the producer.
In diesel we have a simple stream implementation for inter-flow communication. One flow can create a stream and listen to data on it, while one or more other flows can generate data on the stream.
Stream lifecycle:
diesel.stream.consume
to stop the engine from exiting.diesel.stream.onData
elements are created under the diesel.stream.consume
parent. Only the last N of these are kept for logging purposes, the oldest are deleted as new nodes are added, so there is no resource creep.Create a stream, ownded by the current engine
$send diesel.stream.new(stream="s1")
Start a generator in same flow - this will spawn a separate flow sending data on the stream
$send testdiesel.streams.startGenerator(stream="s1")
Now this original flow, the stream owner/consumer, starts consuming
$send diesel.stream.consume(stream="s1", timeoutMillis=1000)
This flow will now be stopped here until the stream is done.
This will put 5 elements on the stream and then close the stream (`done`).
$when testdiesel.streams.generate(stream)
=> diesel.stream.putAll(stream, data=[1,2,3,4,5])
=> diesel.stream.done(stream)
And this is the receiver:
$when diesel.stream.onData(stream == "s1", data)
=> ctx.echo(data=data)
=> (total = total + data)
This is an asynchronous generator, to use with large collections:
=> diesel.stream.generate(stream, start=1, end=5)
When creating a stream, any extra arguments are stored in a "context" and are passed into any onXX
method as a context
argument:
$send diesel.stream.new(stream="s1", p1="haha")
...
$when diesel.stream.onData(stream == "s1", data, context)
=> ctx.echo(p1 = context.p1)
Note some properties that control some internal stream behaviours:
diesel.stream.maxSize
- max size of stream, config in localhost property file. Default 10k in localhost, 100 on cloudYou can have two types of timeouts when using streams:
Timeout on consumption - use a timeout on consume
. This is good when you don't want the entire process to last longer than X.
$send diesel.stream.consume(stream="s1", timeoutMillis=1000)
The other is an activity based timeout - when the stream has been empty for X duration, then close it and be done:
$send diesel.stream.new(stream="s1", timeoutMillis=1000)
This is probably too much for most, but good details for the curios.
This represents the ultimate in flexibility but also ads some complexity for the users:
For instance, the message $msg <POST> subDb.createSub
is an asynchronous REST call, consisting of a request and then a reply on a different actor/thread.
Thus, when the message is triggered initially, it is not in fact complete, as logically, it should wait for the response.
The executor of the message is responsible for notifying the engine when the message is completed. At this point, the engine will consider the next messages in a sequence.
A good example is the implementation of the ctx.sleep
message. Instead of sleeping in the current thread, the executor will return a EEngSuspend
, instructing the engine to suspend sync execution and await a later message. At the same time, it will set an asynchronous timer, which will then send a DEComplete
to the engine, causing it to continue execution where the ctx.sleep
was.
This is the actual code for ctx.sleep
- see the apply
method:
/** a custom executor to sleep asynchronously */
class EEMySleep extends EExecutor("my.sleep") {
/** well, then apply it */
override def apply(in: EMsg, destSpec: Option[EMsg])(implicit ctx: ECtx): List[Any] = {
// our message was passed a duration integer
val d = in.attrs.find(_.name == "duration").map(_.calculatedTypedValue.asInt).getOrElse(1000)
// an executor returns a list of nodes it generated.
// we'll return an info node and an EEngSuspend, which will suspend this branch of
// the engine until that DEComplete.
// we'll also use the DELater message to execute this reply later (i.e. wait)
new EInfo("ctx.sleep - slept " + d) ::
ext.EEngSuspend("ctx.sleep", "", Some((e,a,l) => {
DieselAppContext.router.map(_ ! DELater(e.id, d, DEComplete(e.id, a, true, l, Nil)))
})) :: Nil
}
override val messages: List[EMsg] =
EMsg("ctx", "sleep") ::
Nil
}
Note that:
EEngSuspend
which will cause the engine to suspend.DELater
scheduled to trigger a DEComplete
- this last one is the final asynchronous completion of the ctx.sleep
message.Because it is asynchronous, you can start many in parallel in the same engine, and they would truly run in parallel.
You need to log in to post a comment!