6
votes

I'm currently trying to rewrite an existing untyped actor into a typed one. Since the actor is talking to a MySQL database using ScalikeJDBC, and since I'd like to have that done asynchronously, I'm dealing with Futures coming out of a separate (non-actor) repository class.

With untyped Akka, in an actor's receive method, I could do this:

import akka.pattern.pipe
val horseList : Future[Seq[Horse]] = horseRepository.listHorses(...)
horseList pipeTo sender()

And the sender actor would eventually receive a list of horses. I can't figure out how to do this inside a Behaviour, like:

val behaviour : Behavior[ListHorses] = Behaviors.receive { 
    (ctx,msg) => msg match {
        case ListHorses(replyTo) => 
            val horseListF : Future[Seq[Horse]] = horseRepository.listHorses(...)
            // -> how do I make horseListF's content end up at replyTo? <-
            Behaviors.same
    }
}

The pipe pattern doesn't work (as it expects an untyped ActorRef), and so far I haven't found anything else in the akka-actor-typed (2.5.12) dependency I'm using to make this work.

How do I do this?

3
how can you be sure that another message from another "sender" hasn´t arrived to the mailbox in your sample. If that was the case you would be sending the database response to a wrong client - Emiliano Martinez
As far as I know, sender() is guaranteed to be the sender of the current message for any single invocation of receive(), which as described is where the sample code would live. - 1flx

3 Answers

3
votes

In Akka 2.5.22 (maybe earlier) there is context.pipeToSelf:

  def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit

You still have to provide a pattern match for Success and Failure, which in my code I've reduced with this sugar:

def mapPipe[A, T](success: A => T, failure: Throwable => T): Try[A] => T = {
  case Success(value) => success(value)
  case Failure(e) => failure(e)
}

Resulting in a call like this:

case class Horses(horses: Seq[Horse]) extends Command
case class HorseFailure(e: Throwable) extends Command

...

context.pipeToSelf(horseList) {
  mapPipe(Horses,HorseFailure)
}
2
votes

You can simply send a message to replyTo when the future completes successfully:

case ListHorses(replyTo) => 
    horseRepository.listHorses(...) foreach { horses => replyTo ! horses }
    Behaviors.same

Or if you want to handle errors as well:

case ListHorses(replyTo) =>
    horseRepository.listHorses(...) onComplete { 
        case Success(horses) => replyTo ! horses
        case Failure(e) => // error handling 
    }
    Behaviors.same

In order for this to work, you need an ExecutionContext. It usually makes sense to use the same one as the actor, so you will have to make it available to onComplete or foreach first:

implicit val ec = ctx.executionContext
0
votes

Why don´t you use the future callback to send your message back?. Check that example, maybe you could use a simliar approximation:

import akka.NotUsed
import akka.typed.{ActorRef, ActorSystem, Behavior}
import akka.typed.scaladsl.Actor

import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global

sealed trait Response
case class Message(msg: String) extends Response

case class Greet(whom: String, replayTo: ActorRef[Response])

object Methods {
  def GetRecipient : Future[String] = Future { "Me" }
}

object Greeter {
  import Methods._
  import akka.typed.scaladsl.Actor

  val behavior =
    Actor.immutable[Greet] { (ctx, greet) =>
      println(greet)
      GetRecipient onComplete {
        case Success(str) => {
          // Use the future call back instad the pipeTo
          greet.replayTo ! Message("Hi!")
        }
        case Failure(err) => greet.replayTo ! Message("Error")
      }
      Actor.same
    }
}

object Man extends App {

  import Greeter._
  import scala.concurrent.duration._

  val main: Behavior[Response] = {
    Actor.deferred[Response] { ctx =>
      val enricherRef = ctx.spawn(behavior, "greet")
      enricherRef ! Greet("hey", ctx.self)

      Actor.immutable[Response] {
        case (ctx, m: Response) => {
          println(m)
          Actor.same
        }
      }
    }
  }

  val system = ActorSystem( "GreetDemo", main)

  Thread.sleep(5000)
}

This example only sends a message to a new spawned actor, but in your case I´d use a new actor for each query, for example.