1
votes

I have a TCP connection in Akka Stream that ends in a Sink. Right now all messages go into one Sink. I want to split the stream into an unknown number of Sinks given some function.

The use case is as follows, from the TCP connection I get en continuous stream of something like List[DeltaValue], now I want to create an actorSink for each DeltaValue.id so that i can continuously accumulate and implement behaviour for each DeltaValue.id. I find this to be a standard use case in stream processing but I'm not able to find a good example with Akka Stream.

This is what I have right now:

def connect(): ActorRef = tcpConnection
    .//SOMEHOW SPLIT HERE and create a ReceiverActor for each message
    .to(Sink.actorRef(system.actorOf(ReceiverActor.props(), ReceiverActor.name), akka.Done))
    .run()

Update: I now have this, not sure what to say about it, it does not feel super stable but it should work:

  private def spawnActorOrSendMessage(m: ResponseMessage): Unit = {
    implicit val timeout = Timeout(FiniteDuration(1, TimeUnit.SECONDS))
    system.actorSelection("user/" + m.id.toString).resolveOne().onComplete {
      case Success(actorRef) => actorRef ! m
      case Failure(ex) => (system.actorOf(ReceiverActor.props(), m.id.toString)) ! m
    }
  }

  def connect(): ActorRef = tcpConnection
    .to(Sink.foreachParallel(10)(spawnActorOrSendMessage))
    .run()
2
Added some more contentuser3139545

2 Answers

1
votes

EventStream

You can send messages to the ActorSystem's EventStream within a stream sink and separately have the Actors subscribe to the stream.

Split At Stream Level

You can split the stream at the stream level using Broadcast. The documentation has a good example of this.

Split At Actor Level

You could also use Sink.actorRef in combination with a BroadcastPool to broadcast the messages to multiple Actors.

1
votes

The below should be a somewhat improved version of what was updated in the question. The main improvement is that your actors are kept in a data structure to avoid actorSelection resolution for every incoming message.

  case class DeltaValue(id: String, value: Double)

  val src: Source[DeltaValue, NotUsed] = ???

  src.runFold(Map[String, ActorRef]()){
    case (actors, elem) if actors.contains(elem.id) ⇒
      actors(elem.id) ! elem.value
      actors
    case (actors, elem) ⇒
      val newActor = system.actorOf(ReceiverActor.props(), ReceiverActor.name)
      newActor ! elem.value
      actors.updated(elem.id, newActor)
  }

Keep in mind that, when you integrate Akka Streams with bare actors, you lose backpressure support. This is one of the reasons why you should try and implement your logic within the boundaries of Akka Streams whenever possible. And this is not always possible - e.g. when remoting is needed etc.

In your case, you could consider leveraging groupBy and the concept of substream. The example below is folding the elements of each substream by summing them, just to give an idea:

  src.groupBy(maxSubstreams = Int.MaxValue, f = _.id)
    .fold("" → 0d) {
      case ((id, acc), delta) ⇒ id → delta.value + acc
    }
    .mergeSubstreams
    .runForeach(println)