0
votes

Not sure if that's the right stackexchange for this question

I've got an akka-http application that acts as a front to some heavy computation. The requests it handles vary in time it takes to process them. Some finish within one second, some take more than that. The computation is purely asynchronous, there's no Await at any point, I complete requests with a Future, i.e.:

val spotsJsonF: Future[String] = spotsF.map(spots => DebugFormatter.produceJson(text, spots._1, spots._2, env))

complete(spotsJsonF.map { t => HttpEntity(ContentTypes.`application/json`, t) })

My requirements/assumptions:

  • I need to maximise parallelism, i.e. connections shouldn't be rejected under heavy load
  • I can live with some (even small) requests taking longer if the service is busy
  • I can live with some extremely long requests timeouting under heavy load as long as they don't affect the parallelism too much after the HTTP request finished with a timeout.

To do that, I provided a separate execution context (i.e. Scala's default ExecutionContext.global) for the heavy computation, i.e. it spawns and modifies Futures on a different thread pool to the one used by Akka http dispatcher. I thought this would stop computation "sitting" on Akka's threads, so it could accept more connections. At the moment it's Akka's default dispatcher (my reference.conf is empty):

    "default-dispatcher": {
      "attempt-teamwork": "on",
      "default-executor": {
        "fallback": "fork-join-executor"
      },
      "executor": "default-executor",
      "fork-join-executor": {
        "parallelism-factor": 3,
        "parallelism-max": 64,
        "parallelism-min": 8,
        "task-peeking-mode": "FIFO"
      },
      "mailbox-requirement": "",
      "shutdown-timeout": "1s",
      "thread-pool-executor": {
        "allow-core-timeout": "on",
        "core-pool-size-factor": 3,
        "core-pool-size-max": 64,
        "core-pool-size-min": 8,
        "fixed-pool-size": "off",
        "keep-alive-time": "60s",
        "max-pool-size-factor": 3,
        "max-pool-size-max": 64,
        "max-pool-size-min": 8,
        "task-queue-size": -1,
        "task-queue-type": "linked"
      },
      "throughput": 5,
      "throughput-deadline-time": "0ms",
      "type": "Dispatcher"
    },

What happens though is that a long running computation keeps executing long after Akka has cancelled the request due to the timeout. With a limited number of cores this means that number of rejected requests starts increasing even though the computation that started this overload is no longer needed.

Clearly, I have no idea how to properly manage threads in this application.

What's the best way to satisfy my requirements? Several thread pools - good/bad idea? Do I need to explicitly cancel things? May be using Scala's vanilla Future isn't the best option at this point?

1
Have you considered creating separate actors, one per each resource? It should be quite easy to scale it, proxy heavy computations to other nodes, and the master node would be used only as a router in this scenario.Mariusz Beltowski
We don't use this as a distributed application as it has to keep a very heavy (13G) structure in memory, so we only deploy it on large EC2 instancesAnton

1 Answers

2
votes

To me this sounds like it isn't really so much about managing threads, isolating the heavy work to a separate dispatcher you have already done, but about managing the actual processing.

To be able to stop a long running process mid work so to speak you will need to split it up in smaller chunks so that you can abort it midstream if it no longer is needed.

A common pattern with actors is to have a processing actor either store the result "so far" or send it to itself as a message, this way it can react to a "stop working" message in between or possibly check if it has processed for such a long wall time that it should abort. The message triggering the work load could for example contain such a timeout value to allow the "client" to specify it.

(This is essentially pretty much the same thing as dealing with InterruptedException and Thread.isInterrupted correctly in a hand-threaded and blocking application)