1
votes

I have an Akka messaging engine that delivers millions of messages during the day, both SMS and Email. I need to introduce a new type of messaging (PushNotification) which consists in having each request consume a REST API (it will also process millions). I believe that consuming a Webservice is a blocking operation, so from what I have read I need to add a separate dispatcher for this new Actor, my questions is, does it necessarily need to be a thread-pool-executor with a fixed-pool-size as mentioned here? (See https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html) or is it possible to use a fork-join-executor instead? Also what is the best approach in order not to affect the current 2 types of Messaging ? (SMS and EMAIL) I mean how do I avoid to starve their thread-pool ? Currently EMAIL is using a separate Dispatcher and SMS is using the Default Dispatcher. Instead of creating a new Dispatcher for the Actor with blocking operation (calling WebService) is there any other way ? Like creating a reactive web service ?

1
What REST client are you using? Akka Http works asynchronously using the same thread pool and integrates well with Akka Actors.Tim
@Tim, I am using Jersey (com.sun.jersey.api.client.ClientResponse), do you mean Akka Http is reactive ?Diego Ramos

1 Answers

2
votes

Using a RESTful API from a web service does not have to be blocking.

An easy way to consume a RESTful API from an actor is to use Akka HTTP Client. This allows you to send an HTTP request and have the result sent back as a message to an actor using the pipeTo method.

This is a very cut-down example (lightly modified from the sample in the documentation).

import akka.http.scaladsl.Http

object RestWorker {
  def props(replyTo: ActorRef): Props =
    Props(new RestWorker(replyTo))
}

class RestWorker(replyTo: ActorRef) extends Actor
{
  implicit val ec: ExecutionContext = context.system.dispatcher

  override def preStart() = {
    Http(context.system).singleRequest(HttpRequest(uri = "https://1.2.3.4/resource"))
      .pipeTo(self)
  }

  def receive = {
    case resp: HttpResponse =>

      val response = ??? // Process response

      replyTo ! response

      self ! PoisonPill
  }
}