I'm creating a simple message delivery service using Akka stream. The service is just like mail delivery, where elements from source include destination
and content
like:
case class Message(destination: String, content: String)
and the service should deliver the messages to appropriate sink based on the destination
field. I created a DeliverySink
class to let it have a name:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])
Now, I instantiated two DeliverySink
, let me call them sinkX
and sinkY
, and created a map based on their name. In practice, I want to provide a list of sink names and the list should be configurable.
The challenge I'm facing is how to dynamically choose an appropriate sink based on the destination
field.
Eventually, I want to map Flow[Message]
to a sink. I tried:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)
but, obviously this doesn't work because we can't reference msg
outside of map...
I guess this is not a right approach. I also thought about using filter
with broadcast
, but if the destination scales to 100, I cannot type every routing. What is a right way to achieve my goal?
[Edit]
Ideally, I would like to make destinations dynamic. So, I cannot statically type all destinations in filter or routing logic. If a destination sink has not been connected, it should create a new sink dynamically too.