We have the following logic implemented to manage jobs targeting different backends: A Manager Actor is started. This actor:
- Loads the configuration required to target each backen (mutable map backend name -> backend connector configuration);
- Loads a pool of Actors (RoundRobinPool) to handle the jobs for each backend (mutable map backend name -> RoundRobinPool Actor Ref)
When a request is received by the Manager actor, it retrieves the backend name from the message and forward it to the corresponding pool of Actor to handle the job (assuming a configuration for this backend was registered). The result of the job request is then returned from the actor to the original sender (reason why we use forward).
This logic works very well, but backend being slow to handle job, we are in a typical case of fast publisher, slow consumer and this is raising issues when the load increases.
After doing some research, Akka Streams seems the way to go as it allows to implement back pressure and throttling which would be perfect for our usage (for exemple, limit to 5 requests per seconds).
The idea is to keep the Manager Actor with the same routing logic but replace the pools of Actors with a Source.queue.
When registering the Source.queue, this would bed perform like this:
val queue = Source
.queue[RunBackendRequest](0, OverflowStrategy.backpressure)
.throttle(5, 1.second)
.map(r => runBackendRequest(r))
.toMat(Sink.ignore)(Keep.left)
.run())
Where the definition of RunBackendRequest is:
case class RunBackendRequest(originalSender: ActorRef, backendConnector: BackendConnector, request: BackendRequest)
And the function runBackendRequest is defined as such:
private def runBackendRequest(runRequest: RunBackendRequest): Unit = {
val connector = BackendConnectorFactory.getBackendConnector(configuration.underlying, runRequest.backendConnector.toConfig(), materializer, environment.asJava)
Future { connector.doSomeWork(runRequest.request) } map { result =>
runRequest.originalSender ! Success(result)
} recover {
case e: Exception => runRequest.originalSender ! Failure(e)
}
}
}
When the Manager Actor receive a message, it will 'offer' it to the correct queue based on the name of the target backend contained in the message.
Therefore, I have a few question:
- Is this the correct way to use Akka Stream in this particular use case or could it we written differently and more efficiently?
- Is that ok to provide the actorRef of the original sender in RunBackendRequest object so that the request will be answered in the Flow?
- Is there a way to retrieve the result of the Flow into a Future instead and the Manager actor could then return the result of the request itself?
Akka Streams seems to be very powerful, but there is clearly a learning curve!