Say I have a worker actor that receives a message, does a bit of processing and returns a result. And I have a sequence of messages that need to be converted into a sequence of results:
object Test {
case class Message(str: String)
case class Result(str: String)
class Worker extends Actor {
def receive = {
case Message(data) =>
println("Sleeping: " + data)
Thread.sleep(10000)
val result = Result(data + " - result")
println("Sending result: " + result)
sender ! result
}
}
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
val results = messages.map { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
Future.sequence(results)
}
def main(args: Array[String]): Unit = {
val messages: Seq[Message] = args.map(Message(_))
test(messages).foreach { r =>
println("Result: " + r)
}
}
}
If I run the above with just "message-1" as an argument it runs fine giving the the output is below:
Sending: Message(message-1)
Sleeping: message-1
Sending result: Result(message-1 - result)
Result: ArraySeq(Result(message-1 - result))
However say I do it with: "message-1" "message-2" "message-3" then the last message ends up being sent to deadLetters:
Sending: Message(message-1) Sending: Message(message-2) Sleeping: message-1 Sending: Message(message-3)
Sending result: Result(message-1 - result)
Sleeping: message-2
Sending result: Result(message-2 - result)
Sleeping: message-3
Sending result: Result(message-3 - result)
[INFO] [07/15/2016 09:07:49.832] [default-akka.actor.default-dispatcher-2] [akka://default/deadLetters] Message [util.Tester$Result] from Actor[akka://default/user/$a#1776546850] to Actor[akka://default/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
I am guessing this is because my calling thread has gone out of scope by the time the last message is sent. How can correctly collect all results into a sequence?
Note that changing my test method to below gives the same results:
def test(messages: Seq[Message]): Future[Seq[Result]] = {
val worker = ActorSystem().actorOf(Props(new Worker))
Future.traverse(messages) { m =>
implicit val timeout = Timeout(20 seconds)
println("Sending: " + m)
val result = worker ? m
result.asInstanceOf[Future[Result]]
}
}
actor
doesn't exist, do you meanworker
here? Not sure why you're seeing dead letters, unless your worker is failing to respond correctly, like maybe usingsender()
improperly. Also, you may want to look intoFuture.sequence
to get all of your results as a singleFuture
. But I can't say for sure if any of these fixes your problem. – acjay