1
votes

I'm starting to explore the akka-stream API and I'm looking to test it out. I have a specific use case I can't seem to find an appropriate streaming API to use.

The idea is similar to the following with a bit of a twist:

Source(1 to 10).mapAsync(num => actorRef ? Request(num)).to(Sink.foreach(println))

In this case the ask will return a single message through the future. I'd like to return more than one message for each request sent to the actor. For example:

  1. ActorRef gets sent Request(2)
  2. ActorRef sends N messages out of the flow.
  3. ActorRef gets sent Request(3)
  4. ActorRef sends M messages out of the flow.

I can add extra logic into the Actor to aggregate/buffer these extra messages into a Iterable of some sort but I'm wondering if I'm missing something in the API to handle a case like this.

The closest I've come while looking through the docs is using a Sink.actorRefWithAck and Source.queue with Flow.fromSinkAndSourceMat and passing the materialized SourceQueue to the actorRef before acking the onInitMessage. This allows control of up and downstream back pressure while handling one-to-many messages through the flow. It seems counter productive in making the Graph easy to follow.

2

2 Answers

0
votes

I believe you're looking for flatMapConcat.

Source(1 to 10).flatMapConcat(num => 
  // Dummy example - outputs first 5 successors for each incoming element
  Source(num to (num + 5))
).to(Sink.foreach(println))

This way you let Akka lift all the reactive streams weight, and you let no bare actor pollute your pipeline.

Documentation here.

0
votes

Unfortunately what you are looking for doesn't seem to exist. The ask pattern, e.g. the ? method, can only return 1 value. Therefore, when you do something like

type Input = ???
type Outupt = ???

def askActorFlow(actorRef : ActorRef) : Flow[Input, Output, NotUsed] = 
  Flow[Input] mapAsync { input => (actorRef ? input).mapTo[Output] }

You can only get 1 value at a time back from the ActorRef.

The work around you suggested, e.g. "add extra logic into the Actor to aggregate/buffer these extra messages into a Iterable", is a better solution anyway. It makes it easier for your Actor users to know they will get only 1 response back for any query. Otherwise, the user will never truly know if they've gotten the "complete" answer back from the Actor after a query or if more data is forthcoming.