Building Data Pipeline with Kotlin Coroutines Actors
Table of Contents
In this post I will show how to build simple data-enriching pipeline using Kotlin coroutines. I will use Channels and Actors abstractions provided by kotlinx-coroutines.
In Actors model “actors” are the universal primitives of concurrent computation. In response to a message that it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other through messages (avoiding the need for any locks).
Let’s start with high-level definition of the pipeline:
(π€Producer) -βοΈβ π¬(π€Enricher) -βοΈβ π¬(π€Updater)
RawData RichData
- Pipeline will have Producer Actor, which will get some raw data from database or some mock data and will send it to pipeline for enrichment.
- Then Enricher Actor will handle raw data object and add some attributes to it.
- Finally, Updater Actor will store enriched data to the database.
For the sake of simplicity, let’s implement squaring function: we will take integers as raw data and will enrich them by squaring.
Let’s define our data model:
data class RawData(val value: Int)
Enriched data will be represented by data class RichData
:
data class RichData(val value: Int, val square: Int)
Following Actors model, we will use Kotlin Actors to represent processing units in a pipeline and Channels to communicate with Actors.
In Kotlin actors are implemented as part of kotlinx-coroutines library:
An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class.
There is an actor coroutine builder that conveniently combines actor’s mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.
Defining Messages #
Actor always reacts to some external message or set of messages. It’s a good idea to define an Envelope to transfer metadata along the payload. It is very important that all messages are immutable, so it is safe to pass messages to different threads.
data class Metadata(val timestampMillis: Long, val correlationId: UUID = UUID.randomUUID())
data class Envelope<T>(val payload: T, val metadata: Metadata)
In this case Metadata
contains timestamp in milliseconds and correlation Id.
Following function will be useful for data transformation and copy metadata to new envelope.
fun <T, R> transformMessage(input: Envelope<T>, block: (T) -> R): Envelope<R> {
val result = block(input.payload)
return Envelope(result, input.metadata)
}
Defining Actors #
Let’s define our Producer:
private val context = Executors.newFixedThreadPool(5, NamedThreadFactory("producer")).asCoroutineDispatcher()
@InternalCoroutinesApi
@ExperimentalCoroutinesApi
fun CoroutineScope.producerActor(total: Int) = produce<Envelope<RawData>>(
context,
capacity = 10,
onCompletion = {
context.close() // close context on stopping the actor
log("π Completed. Exception: $it")
}
) {
for (i in 1..total) {
val rawData = RawData(i)
val result = Envelope(rawData, Metadata(Instant.now().toEpochMilli()))
log("π₯Producing $result")
channel.send(result)
}
channel.close()
}
Coroutines are always executed in some CoroutineContext
.
If we want to control how many threads will be available for actor,
we can use ExecutorCoroutineDispatcher
.
As you can see, we defined the Dispatcher with 5 threads with FixedThreadPool.
Actor will use this thread pool to run.
You may notice, that actor is defined with channel buffer with capacity=100
.
Actor will be able to send messages to the channel unless it’s buffer is full.
Also, onCompletion
function will be called when actor will be stopped or canceled.
ExecutorCoroutineDispatcher
needs to be stopped (closed) when our actor is completed,
either successfully or exceptionally.
That’s why context.close()
is called in onCompletion
function.
Now let’s define Enricher Actor:
private val context = Executors.newFixedThreadPool(3, NamedThreadFactory("enricher")).asCoroutineDispatcher()
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.enricherActor(inbox: ReceiveChannel<Envelope<RawData>>): ReceiveChannel<Envelope<RichData>> =
produce(
context,
capacity = 10,
onCompletion = {
context.close() // close context on stopping the actor
log("π Completed. Exception: $it")
}
) {
for (msg in inbox) { // iterate over incoming messages
log("π₯ Processing $msg")
val result = transformMessage(msg) { enrich(msg.payload) }
log("π₯ Enriched $result")
channel.send(result) // send to next
}
}
private fun enrich(rawData: RawData): RichData {
val value = rawData.value
val square = value * value
return RichData(value, square)
}
The implementation is very similar to Producer.
The difference is that it receives messages from inbox
which is ReceiveChannel<Envelope<RichData>>
and sends them to it’s channel.
You may notice that actor’s thread pool has only 3 threads now.
Updater will receive RichData
message and print it.
In real-life case it should save a message to database.
private val context = Executors.newFixedThreadPool(2, NamedThreadFactory("updater")).asCoroutineDispatcher()
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.updaterActor(inbox: ReceiveChannel<Envelope<RichData>>): ReceiveChannel<Envelope<RichData>> =
produce(
context,
capacity = 100,
onCompletion = {
context.close() // close context on stopping the actor
log("π Completed. Exception: $it")
}
)
{
for (msg in inbox) { // iterate over incoming messages
val created = msg.metadata.timestampMillis
log("π Writing $msg, processed in ${Instant.now().toEpochMilli() - created}ms")
Thread.sleep(100) // to simulate blocking operation
log("β
Done with $msg")
channel.send(msg)
}
}
Actor will print received messages and simulate IO operation by calling Thread.sleep(500)
.
It has only 2 threads.
When message is processed we send it again to outgoing channel,
so it could be handled externally.
Building Pipeline #
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun main() {
val total = 15
val time = measureTimeMillis {
runBlocking {
val raw = producerActor(total)
val enriched = enricherActor(raw)
val updated = updaterActor(enriched)
var counter = 0
for (msg in updated) {
counter++
log("π Processed ${counter} : ${msg}")
}
log("The End")
}
}
log("Done in $time ms")
}
As you can see, it’s now very easy to create pipeline. Although it is not as visual as in Akka Streams, but still very clear.
Let’s run it:
2019-01-30T22:45:43.678798Z [producer-0] π₯Producing Envelope(payload=RawData(value=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.692628Z [producer-0] π₯Producing Envelope(payload=RawData(value=2), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.692860Z [producer-0] π₯Producing Envelope(payload=RawData(value=3), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.693005Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.693075Z [producer-0] π₯Producing Envelope(payload=RawData(value=4), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.693259Z [producer-0] π₯Producing Envelope(payload=RawData(value=5), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.693468Z [producer-0] π₯Producing Envelope(payload=RawData(value=6), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.693638Z [producer-0] π₯Producing Envelope(payload=RawData(value=7), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.693826Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.693836Z [producer-0] π₯Producing Envelope(payload=RawData(value=8), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.694165Z [producer-0] π₯Producing Envelope(payload=RawData(value=9), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.694342Z [producer-0] π₯Producing Envelope(payload=RawData(value=10), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.694343Z [updater-1] π Writing Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e)), processed in 35ms
2019-01-30T22:45:43.694643Z [producer-0] π₯Producing Envelope(payload=RawData(value=11), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.694730Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=2), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.694885Z [producer-0] π₯Producing Envelope(payload=RawData(value=12), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.694891Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.695178Z [producer-0] π₯Producing Envelope(payload=RawData(value=13), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.695178Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=3), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.695391Z [producer-0] π₯Producing Envelope(payload=RawData(value=14), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:43.695414Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:43.695642Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=4), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.695772Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:43.695936Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=5), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.696087Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:43.696225Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=6), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.696359Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:43.696498Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=7), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.696621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:43.696753Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=8), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.696875Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:43.697006Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=9), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.697142Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:43.697278Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=10), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.697403Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:43.697537Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=11), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.697548Z [producer-0] π₯Producing Envelope(payload=RawData(value=15), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:43.697676Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:43.697892Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=12), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.698051Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:43.698650Z [producer-0] π Completed. Exception: null
2019-01-30T22:45:43.797951Z [updater-1] β
Done with Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.798638Z [main] π Processed 1 : Envelope(payload=RichData(value=1, square=1), metadata=Metadata(timestampMillis=1548888343659, correlationId=8704b3b5-c357-4044-bc5d-995fe6a4797e))
2019-01-30T22:45:43.798806Z [updater-1] π Writing Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133)), processed in 106ms
2019-01-30T22:45:43.798936Z [enricher-2] π₯ Processing Envelope(payload=RawData(value=13), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.799176Z [enricher-2] π₯ Enriched Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:43.901131Z [updater-1] β
Done with Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.901515Z [main] π Processed 2 : Envelope(payload=RichData(value=2, square=4), metadata=Metadata(timestampMillis=1548888343692, correlationId=efaa86ff-292d-4657-bd63-925779caa133))
2019-01-30T22:45:43.902077Z [updater-1] π Writing Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc)), processed in 210ms
2019-01-30T22:45:43.902125Z [enricher-0] π₯ Processing Envelope(payload=RawData(value=14), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:43.902342Z [enricher-0] π₯ Enriched Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:44.002874Z [updater-1] β
Done with Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:44.003432Z [main] π Processed 3 : Envelope(payload=RichData(value=3, square=9), metadata=Metadata(timestampMillis=1548888343692, correlationId=b266e28d-bffd-4066-8bd9-b103dc67d5bc))
2019-01-30T22:45:44.003488Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=15), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:44.003448Z [updater-1] π Writing Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712)), processed in 310ms
2019-01-30T22:45:44.003834Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:44.108139Z [updater-1] β
Done with Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:44.108577Z [updater-1] π Writing Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f)), processed in 415ms
2019-01-30T22:45:44.108548Z [main] π Processed 4 : Envelope(payload=RichData(value=4, square=16), metadata=Metadata(timestampMillis=1548888343693, correlationId=f5d468be-6845-4973-ba25-ebf773152712))
2019-01-30T22:45:44.109303Z [enricher-2] π Completed. Exception: null
2019-01-30T22:45:44.210843Z [updater-1] β
Done with Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:44.211289Z [updater-1] π Writing Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725)), processed in 518ms
2019-01-30T22:45:44.211250Z [main] π Processed 5 : Envelope(payload=RichData(value=5, square=25), metadata=Metadata(timestampMillis=1548888343693, correlationId=039b9f12-35f1-4b82-80af-07279eb17c8f))
2019-01-30T22:45:44.316362Z [updater-1] β
Done with Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:44.316923Z [main] π Processed 6 : Envelope(payload=RichData(value=6, square=36), metadata=Metadata(timestampMillis=1548888343693, correlationId=718d484d-fa21-4345-940c-6ad738914725))
2019-01-30T22:45:44.316908Z [updater-1] π Writing Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4)), processed in 623ms
2019-01-30T22:45:44.417721Z [updater-1] β
Done with Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:44.418917Z [updater-1] π Writing Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59)), processed in 725ms
2019-01-30T22:45:44.418966Z [main] π Processed 7 : Envelope(payload=RichData(value=7, square=49), metadata=Metadata(timestampMillis=1548888343693, correlationId=fee9b735-745c-4bf6-988d-89d2adc42df4))
2019-01-30T22:45:44.522580Z [updater-1] β
Done with Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:44.522889Z [updater-1] π Writing Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177)), processed in 828ms
2019-01-30T22:45:44.522913Z [main] π Processed 8 : Envelope(payload=RichData(value=8, square=64), metadata=Metadata(timestampMillis=1548888343693, correlationId=9d66c785-f765-4044-930f-4b7a87668c59))
2019-01-30T22:45:44.626517Z [updater-1] β
Done with Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:44.627721Z [updater-1] π Writing Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64)), processed in 933ms
2019-01-30T22:45:44.627687Z [main] π Processed 9 : Envelope(payload=RichData(value=9, square=81), metadata=Metadata(timestampMillis=1548888343694, correlationId=e0449c6b-a720-4a0d-953b-3f2e25930177))
2019-01-30T22:45:44.730284Z [updater-1] β
Done with Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:44.730587Z [updater-1] π Writing Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e)), processed in 1036ms
2019-01-30T22:45:44.730599Z [main] π Processed 10 : Envelope(payload=RichData(value=10, square=100), metadata=Metadata(timestampMillis=1548888343694, correlationId=7b8ceefa-eebf-47ca-bc3a-1322193d2d64))
2019-01-30T22:45:44.834934Z [updater-1] β
Done with Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:44.835236Z [updater-1] π Writing Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f)), processed in 1141ms
2019-01-30T22:45:44.835253Z [main] π Processed 11 : Envelope(payload=RichData(value=11, square=121), metadata=Metadata(timestampMillis=1548888343694, correlationId=f87da826-7a94-4373-8dae-8c5bd72bb27e))
2019-01-30T22:45:44.936474Z [updater-1] β
Done with Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:44.936821Z [main] π Processed 12 : Envelope(payload=RichData(value=12, square=144), metadata=Metadata(timestampMillis=1548888343694, correlationId=2188daea-4dc0-4004-adc8-af634413284f))
2019-01-30T22:45:44.936799Z [updater-1] π Writing Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138)), processed in 1241ms
2019-01-30T22:45:45.037599Z [updater-1] β
Done with Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:45.038629Z [updater-1] π Writing Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab)), processed in 1343ms
2019-01-30T22:45:45.038636Z [main] π Processed 13 : Envelope(payload=RichData(value=13, square=169), metadata=Metadata(timestampMillis=1548888343695, correlationId=1942a54d-f914-4be0-b511-485c3fb4c138))
2019-01-30T22:45:45.142661Z [updater-1] β
Done with Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:45.143027Z [updater-1] π Writing Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559)), processed in 1446ms
2019-01-30T22:45:45.143067Z [main] π Processed 14 : Envelope(payload=RichData(value=14, square=196), metadata=Metadata(timestampMillis=1548888343695, correlationId=61f6f513-b1b3-4fb2-ab16-09c5e42408ab))
2019-01-30T22:45:45.244791Z [updater-1] β
Done with Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:45.245192Z [main] π Processed 15 : Envelope(payload=RichData(value=15, square=225), metadata=Metadata(timestampMillis=1548888343697, correlationId=bc7bd5e0-2745-46d0-b01b-f7c86d179559))
2019-01-30T22:45:45.245485Z [updater-1] π Completed. Exception: null
2019-01-30T22:45:45.247817Z [main] The End
2019-01-30T22:45:45.248840Z [main] Done in 1678 ms
As you can see, we have simulated fast producer and slow consumer.
Producer initially started and was producing messages
unless it’s buffer became full.
Then Enricher started processing messages unblocking producer.
Next bottleneck was in Updater (Thread.sleep(500)
did the job).
At steady mode Producer and Enricher are limited by the Updater performance.
Automatic back-pressure support is really nice feature of Kotlin coroutines.
You may noticed that processing time is O(N)
: 1678ms ~= (15 * 100ms)
.
Even if we have created thread pools, it is only one actor of each type working at a time.
Let’s change our pipeline so multiple actor work in parallel.
-β (π€Updater-0) -βοΈβ
(π€Producer) -βοΈβ π¬(π€Enricher) -βοΈβ π¬ -β (π€Updater-1) -βοΈβ π¬
RawData RichData -β (π€Updater-2) -βοΈβ
Done
We introduce Done
message so Updaters could signal that it has processed all messages.
Let’s add new message type to Messages.kotlin:
object Done
Our pipeline will change to:
const val TOTAL = 15
const val PARALLEL_ACTORS = 5
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun main() {
val time = measureTimeMillis {
runBlocking {
val raw = producerActor(TOTAL)
val enriched = enricherActor(raw)
val completed = Channel<Done>(5)
repeat(PARALLEL_ACTORS) { // launch 5 Updaters in parallel
updaterActor(enriched, completed)
}
var counter = 0
for (msg in completed) {
if (msg === Done) {
counter++
log("π Updater is finished")
if (counter == PARALLEL_ACTORS) {
break // break when all Updaters have finished
}
}
}
log("The End")
coroutineContext.cancelChildren()
}
}
log("Done in $time ms")
}
Updater will be changed to:
private val context = Executors.newFixedThreadPool(5, NamedThreadFactory("updater")).asCoroutineDispatcher()
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
fun CoroutineScope.updaterActor(
inbox: ReceiveChannel<Envelope<RichData>>,
updated: Channel<Done>
) = launch(context = context) {
for (msg in inbox) { // iterate over incoming messages
val created = msg.metadata.startMillis
log("π Writing $msg, processed in ${Instant.now().toEpochMilli() - created}ms")
Thread.sleep(100) // to simulate blocking operation
log("β
Done with $msg")
}
updated.send(Done)
}
Now let’s run:
2019-03-25T09:10:52.538Z [producer-0] π₯Producing Envelope(payload=RawData(value=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.617Z [producer-0] π₯Producing Envelope(payload=RawData(value=2), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.617Z [producer-0] π₯Producing Envelope(payload=RawData(value=3), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.617Z [producer-0] π₯Producing Envelope(payload=RawData(value=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=5), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=6), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=7), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=8), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=9), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=10), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=11), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [producer-0] π₯Producing Envelope(payload=RawData(value=12), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.618Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.619Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=2), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-0] π Writing Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537)), processed in 82ms
2019-03-25T09:10:52.619Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=3), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-1] π Writing Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.619Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [updater-2] π Writing Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.619Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.619Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=5), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.619Z [updater-3] π Writing Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617)), processed in 2ms
2019-03-25T09:10:52.620Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=6), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [updater-4] π Writing Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618)), processed in 2ms
2019-03-25T09:10:52.620Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=7), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] π₯Producing Envelope(payload=RawData(value=13), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] π₯Producing Envelope(payload=RawData(value=14), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=8), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [producer-0] π₯Producing Envelope(payload=RawData(value=15), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.620Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=9), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=10), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=11), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=12), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=13), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.621Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=14), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] π₯ Processing Envelope(payload=RawData(value=15), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [enricher-1] π₯ Enriched Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.622Z [producer-0] π Completed. Exception: null
2019-03-25T09:10:52.622Z [enricher-1] π Completed. Exception: null
2019-03-25T09:10:52.721Z [updater-1] β
Done with Envelope(payload=RichData(value=2, square=4), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-3] β
Done with Envelope(payload=RichData(value=4, square=16), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-3] π Writing Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-0] β
Done with Envelope(payload=RichData(value=1, square=1), metadata=Metadata(startMillis=1553505052537))
2019-03-25T09:10:52.721Z [updater-0] π Writing Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-2] β
Done with Envelope(payload=RichData(value=3, square=9), metadata=Metadata(startMillis=1553505052617))
2019-03-25T09:10:52.721Z [updater-4] β
Done with Envelope(payload=RichData(value=5, square=25), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.721Z [updater-2] π Writing Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-1] π Writing Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.721Z [updater-4] π Writing Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618)), processed in 103ms
2019-03-25T09:10:52.826Z [updater-2] β
Done with Envelope(payload=RichData(value=9, square=81), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-4] β
Done with Envelope(payload=RichData(value=10, square=100), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-0] β
Done with Envelope(payload=RichData(value=8, square=64), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-3] β
Done with Envelope(payload=RichData(value=7, square=49), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-1] β
Done with Envelope(payload=RichData(value=6, square=36), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.826Z [updater-3] π Writing Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.826Z [updater-0] π Writing Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.826Z [updater-4] π Writing Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618)), processed in 208ms
2019-03-25T09:10:52.826Z [updater-2] π Writing Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618)), processed in 208ms
2019-03-25T09:10:52.826Z [updater-1] π Writing Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620)), processed in 206ms
2019-03-25T09:10:52.929Z [updater-2] β
Done with Envelope(payload=RichData(value=11, square=121), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.929Z [updater-4] β
Done with Envelope(payload=RichData(value=12, square=144), metadata=Metadata(startMillis=1553505052618))
2019-03-25T09:10:52.929Z [updater-3] β
Done with Envelope(payload=RichData(value=14, square=196), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.929Z [updater-1] β
Done with Envelope(payload=RichData(value=15, square=225), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.929Z [updater-0] β
Done with Envelope(payload=RichData(value=13, square=169), metadata=Metadata(startMillis=1553505052620))
2019-03-25T09:10:52.931Z [main] π Updater is finished
2019-03-25T09:10:52.931Z [main] π Updater is finished
2019-03-25T09:10:52.931Z [main] π Updater is finished
2019-03-25T09:10:52.931Z [main] π Updater is finished
2019-03-25T09:10:52.931Z [main] π Updater is finished
2019-03-25T09:10:52.931Z [main] The End
2019-03-25T09:10:52.958Z [main] Done in 544 ms
Now we see that execution time reduced to 544ms and Updaters work in parallel.
The sources code you may find here
Links #
I recommend following resources to learn more about coroutines and structured concurrency in Kotlin:
- Coroutines Guide
- KotlinConf 2018 - “Exploring Coroutines in Kotlin” by Venkat Subramariam
- Google DevFest 2018 - “Kotlin Coroutines” by Svetlana Isakova
- Structured concurrency - by Roman Elizarov
- Deadlocks in non-hierarchical CSP - by Roman Elizarov
- Writing complex actors - Discussion about writing actors with coroutines