I have a TCP connection in Akka Stream that ends in a Sink. Right now all messages go into one Sink. I want to split the stream into an unknown number of Sinks given some function.
The use case is as follows, from the TCP connection I get en continuous stream of something like List[DeltaValue]
, now I want to create an actorSink for each DeltaValue.id
so that i can continuously accumulate and implement behaviour for each DeltaValue.id
. I find this to be a standard use case in stream processing but I'm not able to find a good example with Akka Stream.
This is what I have right now:
def connect(): ActorRef = tcpConnection
.//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
.to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
.run()
Update: I now have this, not sure what to say about it, it does not feel super stable but it should work:
private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
case Success(actorRef) => actorRef ! m
case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
}
}
def connect(): ActorRef = tcpConnection
.to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
.run()