0
votes

I'm trying to execute the following Scala code inside an Akka Actor.

class FilteringService(implicit timeout: Timeout) extends Actor {

  def receive: PartialFunction[Any, Unit] = {

    case GetProfiles ⇒
      val requester = sender
      def getProfiles = {

        var result = new Array[Profile](0)

        println("[GET-PROFILES] Entered, making request")
        val req = Get("http://localhost:9090/profiles")
        implicit val profileFormat = jsonFormat16(Profile)


        val responseFuture: Future[HttpResponse] = Http().singleRequest(req)
        println("[GET-PROFILES] Entered, request sent")
        responseFuture.onComplete {

          case Success(response) =>
            println("[RES - SUCCESS] Request returned with " + response.status)
            val responseAsProfiles =  Unmarshal(response.entity).to[Array[Profile]]
            responseAsProfiles.onComplete {
            println("[UNMARSH - SUCCESS] Unmarshaling Done!")
            _.get match {
              case profiles: Array[Profile] =>
                println("[UNMARSH - SUCCESS] Sending Profiles message to " + sender())
                requester ! profiles
                println("[UNMARSH - SUCCESS] Message sent to " + sender())
              case _ => println("error")
              }
            }

          case Failure(_)   =>
            sys.error("something wrong")
            //return Future[Array[Profile]]

        }
      }
      println("[RECEIVE] Message GetProfiles received from " + sender().toString())
      getProfiles
      println("[RECEIVE] Message GetProfiles invoked")

  }

When the Actor receives the message "GetProfiles":

1- it sends a request to a remote server, so the result of operation is a Future[HttpResponse]

2- in case of success it retrieves the response (a JSON array) and asks for unmarshalling the object to Array[Profile]. (It's not important the Profile model). The result of Unmarshall method is a Future[Array[Profile]]

3- In case of success, I want to send the result back to the original sender!

I managed to do this, but it's a trick because I'm saving the sender in a variable, that is visible in the scope (requester). I know that there is the pipe pattern, so I could send the responseAsProfiles object back to the sender in theory, but the object is created inside the onComplete method of the responseFuture object (we have to wait it, of course!)

So that's all! How could I send the result back to the sender using the pipe pattern in this case? Thanks in advance!!!

1

1 Answers

1
votes

General idea is that you compose futures using map and flatMap and try to avoid using onComplete as much as possible.

See if you can convert your code to following smaller pieces and then compose:

def getRawProfileData(): Future[HttpResponse] = {
 // ... here you make http request
}

def unmarshalProfiles(response: HttpResponse): Future[List[Profile]] = {
  // ... unmarshalling logic
}

def getProfiles(): Future[List[Profile]] = getRawProfileData().flatMape(unmarshalProfiles)

// now from receive block

case GetProfiles ⇒ getProfiles().pipeTo(sender())