0
votes

Say I have a worker actor that receives a message, does a bit of processing and returns a result. And I have a sequence of messages that need to be converted into a sequence of results:

object Test {

  case class Message(str: String)
  case class Result(str: String)

  class Worker extends Actor {
    def receive = {
        case Message(data) =>
            println("Sleeping: " + data)
            Thread.sleep(10000)
            val result = Result(data + " - result")
            println("Sending result: " + result)
            sender ! result
    }
  }

  def test(messages: Seq[Message]): Future[Seq[Result]] = {
    val worker = ActorSystem().actorOf(Props(new Worker))
    val results = messages.map { m =>
        implicit val timeout = Timeout(20 seconds)
        println("Sending: " + m)
        val result = worker ? m
        result.asInstanceOf[Future[Result]]
    }
    Future.sequence(results)
  }

  def main(args: Array[String]): Unit = {
    val messages: Seq[Message] = args.map(Message(_))
    test(messages).foreach { r =>
        println("Result: " + r)
    }
}

}

If I run the above with just "message-1" as an argument it runs fine giving the the output is below:

Sending: Message(message-1)

Sleeping: message-1

Sending result: Result(message-1 - result)

Result: ArraySeq(Result(message-1 - result))

However say I do it with: "message-1" "message-2" "message-3" then the last message ends up being sent to deadLetters:

Sending: Message(message-1) Sending: Message(message-2) Sleeping: message-1 Sending: Message(message-3)

Sending result: Result(message-1 - result)

Sleeping: message-2

Sending result: Result(message-2 - result)

Sleeping: message-3

Sending result: Result(message-3 - result)

[INFO] [07/15/2016 09:07:49.832] [default-akka.actor.default-dispatcher-2] [akka://default/deadLetters] Message [util.Tester$Result] from Actor[akka://default/user/$a#1776546850] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

I am guessing this is because my calling thread has gone out of scope by the time the last message is sent. How can correctly collect all results into a sequence?

Note that changing my test method to below gives the same results:

def test(messages: Seq[Message]): Future[Seq[Result]] = {
    val worker = ActorSystem().actorOf(Props(new Worker))
    Future.traverse(messages) { m =>
        implicit val timeout = Timeout(20 seconds)
        println("Sending: " + m)
        val result = worker ? m
        result.asInstanceOf[Future[Result]]
    }
}
2
The variable actor doesn't exist, do you mean worker here? Not sure why you're seeing dead letters, unless your worker is failing to respond correctly, like maybe using sender() improperly. Also, you may want to look into Future.sequence to get all of your results as a single Future. But I can't say for sure if any of these fixes your problem.acjay

2 Answers

0
votes

The dumb answer is:

Future.traverse(messages)(m => actor ? m).map(_.asInstanceOf[Result])

But it might be better to send data all at once:

class Worker extends Actor { 
  def receive = {
    case Message(data) =>
      // Convert data into result
      ...
      sender ! result
    case seq: Seq[Message] =>
      ...
      sender ! results

  }
}
0
votes

Seems to be because my timeout was set too low. Should have been large enough to cover all the work - for example 40 seconds.