2
votes

We have a fairly complex system developed using Akka HTTP and Actors model. Until now, we extensively used ask pattern and mixed Futures and Actors.

For example, an actor gets message, it needs to execute 3 operations in parallel, combine a result out of that data and returns it to sender. What we used is

  1. declare a new variable in actor receive message callback to store a sender (since we use Future.map it can be another sender).
  2. executed all those 3 futures in parallel using Future.sequence (sometimes its call of function that returns a future and sometimes it is ask to another actor to get something from it)
  3. combine the result of all 3 futures using map or flatMap function of Future.sequence result
  4. pipe a final result to a sender using pipeTo

Here is a code simplified:

case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) => {
      val sen = sender

      val result: Future[Seq[Map[String, Any]]] = if (paging.getOrElse(Paging(0, 0)) == Paging(0, 0)) Future.successful(Seq.empty)
      else {
        val start = System.currentTimeMillis()

        val profileF = profileActor ? Get(userId)

        Future.sequence(Seq(profileF, getSymbols(`type`, id), getData(paging, timeRange, platform)).map { result =>
          logger.info(s"Got ${result.size} news in ${System.currentTimeMillis() - start} ms")
          result
        }.recover { case ex: Throwable =>
          logger.error(s"Failure on getting data: ${ex.getMessage}", ex)
          Seq.empty
        }
      }

      result.pipeTo(sen)
    }

Function getAndProcessData contains Future.sequence with executing 3 futures in parallel.

Now, as I'm reading more and more on Akka, I see that using ask is creating another actor listener. Questions are:

  1. As we extensively use ask, can it lead to a to many threads used in a system and perhaps a thread starvation sometimes?
  2. Using Future.map much also means different thread often. I read about one thread actor illusion which can be easily broken with mixing Futures.
  3. Also, can this affect performances in a bad way?
  4. Do we need to store sender in temp variable send, since we're using pipeTo? Could we do only pipeTo(sender). Also, does declaring sen in almost each receive callback waste to much resources? I would expect its reference will be removed once operation in complete.
  5. Is there a chance to design such a system in a better way, meadning that we don't use map or ask so much? I looked at examples when you just pass a replyTo reference to some actor and the use tell instead of ask. Also, sending message to self and than replying to original sender can replace working with Future.map in some scenarios. But how it can be designed having in mind we want to perform 3 async operations in parallel and returns a formatted data to a sender? We need to have all those 3 operations completed to be able to format data.

I tried not to include to many examples, I hope you understand our concerns and problems. Many questions, but I would really love to understand how it works, simple and clear

Thanks in advance

3
paging.getOrElse(Paging(0, 0)) == Paging(0, 0) => paging.forall(_ == Paging(0,0))Tim

3 Answers

2
votes

If you want to do 3 things in parallel you are going to need to create 3 Future values which will potentially use 3 threads, and that can't be avoided.

I'm not sure what the issue with map is, but there is only one call in this code and that is not necessary.

Here is one way to clean up the code to avoid creating unnecessary Future values (untested!):

case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) =>
  if (paging.forall(_ == Paging(0, 0))) {
    sender ! Seq.empty
  } else {
    val sen = sender
    val start = System.currentTimeMillis()

    val resF = Seq(
      profileActor ? Get(userId),
      getSymbols(`type`, id),
      getData(paging, timeRange, platform),
    )

    Future.sequence(resF).onComplete {
      case Success(result) =>
        val dur = System.currentTimeMillis() - start
        logger.info(s"Got ${result.size} news in $dur ms")

        sen ! result
      case Failure(ex)
        logger.error(s"Failure on getting data: ${ex.getMessage}", ex)

        sen ! Seq.empty
    }
  }

You can avoid ask by creating your own worker thread that collects the different results and then sends the result to the sender, but that is probably more complicated than is needed here.

1
votes

An actor only consumes a thread in the dispatcher when it is processing a message. Since the number of messages the actor spawned to manage the ask will process is one, it's very unlikely that the ask pattern by itself will cause thread starvation. If you're already very close to thread starvation, an ask might be the straw that breaks the camel's back.

Mixing Futures and actors can break the single-thread illusion, if and only if the code executing in the Future accesses actor state (meaning, basically, vars or mutable objects defined outside of a receive handler).

Request-response and at-least-once (between them, they cover at least most of the motivations for the ask pattern) will in general limit throughput compared to at-most-once tells. Implementing request-response or at-least-once without the ask pattern might in some situations (e.g. using a replyTo ActorRef for the ultimate recipient) be less overhead than piping asks, but probably not significantly. Asks as the main entry-point to the actor system (e.g. in the streams handling HTTP requests or processing messages from some message bus) are generally OK, but asks from one actor to another are a good opportunity to streamline.

Note that, especially if your actor imports context.dispatcher as its implicit ExecutionContext, transformations on Futures are basically identical to single-use actors.

Situations where you want multiple things to happen (especially when you need to manage partial failure (Future.sequence.recover is a possible sign of this situation, especially if the recover gets nontrivial)) are potential candidates for a saga actor to organize one particular request/response.

0
votes

I would suggest instead of using Future.sequence, Souce from Akka can be used which will run all the futures in parallel, in which you can provide the parallelism also. Here is the sample code:

Source.fromIterator( () => Seq(profileF, getSymbols(`type`, id), getData(paging, timeRange, platform)).iterator )
        .mapAsync( parallelism = 1 ) { case (seqIdValue, row) =>
          row.map( seqIdValue -> _ )
        }.runWith( Sink.seq ).map(_.map(idWithDTO => idWithDTO))

This will return Future[Seq[Map[String, Any]]]