3
votes

How would I test that a given behavior sends the messages I expect?

Say, three messages of some type, one after the other...

With regular actors (untyped) there was the TestProbe from regular Akka with methods like expectedMsg:

http://doc.akka.io/api/akka/current/index.html#akka.testkit.TestProbe

With akka-typed I'm scratching my head still. There is something called EffectfulActorContext, but I've no idea how to use that.

Example

Say I am writing a simple PingPong service, that given a number n replies with Pong(n) n-times. So:

-> Ping(2)
Pong(2)
Pong(2)
-> Ping(0)
# nothing
-> Ping(1)
Pong(1)

Here is how this behavior might look:

case class Ping(i: Int, replyTo: ActorRef[Pong])
case class Pong(i: Int)

val pingPong: Behavior[Ping] = {
    Static {
      case Ping(i, replyTo) => (0 until i.max(0)).map(_=> replyTo ! Pong(i))
    }
  }

My Hack

Now since I can't figure out how to make this work, the "hack" that I am doing right now is making the actor always reply with a list of responses. So the behavior is:

case class Ping(i: Int, replyTo: ActorRef[List[Pong]])
  case class Pong(i: Int)

  val pingPong: Behavior[Ping] = {
    Static {
      case Ping(i, replyTo) => replyTo ! (0 until i.max(0)).map(_=>Pong(i)).toList
    }
  }

Given this hacky change, the tester is easy to write:

package com.test

import akka.typed.AskPattern._
import akka.typed.ScalaDSL._
import akka.typed.{ActorRef, ActorSystem, Behavior, Props}
import akka.util.Timeout
import com.test.PingPong.{Ping, Pong}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object PingPongTester {
  /* Expect that the given messages arrived in order */
  def expectMsgs(i: Int, msgs: List[Pong]) = {
    implicit val timeout: Timeout = 5 seconds
    val pingPongBe: ActorSystem[Ping] = ActorSystem("pingPongTester", Props(PingPong.pingPong))

    val futures: Future[List[Pong]] = pingPongBe ? (Ping(i, _))
    for {
      pongs <- futures
      done <- {
        for ((actual, expected) <- pongs.zip(msgs)) {
          assert(actual == expected, s"Expected $expected, but received $actual")
        }
        assert(pongs.size == msgs.size, s"Expected ${msgs.size} messages, but received ${pongs.size}")
        pingPongBe.terminate
      }
    } Await.ready(pingPongBe.whenTerminated, 5 seconds)
  }
}


object PingPong {
  case class Ping(i: Int, replyTo: ActorRef[List[Pong]])
  case class Pong(i: Int)

  val pingPong: Behavior[Ping] = {
    Static {
      case Ping(i, replyTo) => replyTo ! (0 until i.max(0)).map(_=>Pong(i)).toList
    }
  }
}

class MainSpec extends FlatSpec with Matchers {
  "PingPong" should "reply with empty when Pinged with zero" in {
    PingPongTester.expectMsgs(0, List.empty)
  }
  it should "reply once when Pinged with one" in {
    PingPongTester.expectMsgs(1, List(Pong(1)))
  }
  it should "reply with empty when Pinged with negative" in {
    PingPongTester.expectMsgs(-1, List.empty)
  }
  it should "reply with as many pongs as Ping requested" in {
    PingPongTester.expectMsgs(5, List(Pong(5), Pong(5), Pong(5), Pong(5), Pong(5)))
  }
}
2

2 Answers

1
votes

I'm using EffectfulActorContext for testing my Akka typed actors and here is an untested example based on your question.

Note: I'm also using the guardianactor provided in the Akka-typed test cases.

class Test extends TypedSpec{
    val system = ActorSystem("actor-system", Props(guardian()))
    val ctx: EffectfulActorContext[Ping] = new EffectfulActorContext[Ping]("ping", Ping.props(), system)

    //This will send the command to Ping Actor
    ctx.run(Ping)
    //This should get you the inbox of the Pong created inside the Ping actor. 
    val pongInbox = ctx.getInbox("pong")
    assert(pongInbox.hasMessages)
    val pongMessages = pongInbox.receiveAll()
    pongMessages.size should be(1) //1 or whatever number of messages you expect
  }

Edit (Some more info): Cases where I need to add a replyTo ActorRef in my messages I do the following:

case class Pong(replyTo: ActorRef[Response])
val responseInbox: SyncInbox[Response] = Inbox.sync[Response]("responseInbox")
Pong(responseInbox.ref) 
0
votes

My initial approach to testing was to extend Behavior class

  class TestQueueBehavior[Protocol] extends Behavior[Protocol] {
    val messages: BlockingQueue[Protocol] = new LinkedBlockingQueue[Protocol]()

    val behavior: Protocol => Unit = {
      (p: Protocol) => messages.put(p)
    }

    def pollMessage(timeout: FiniteDuration = 3.seconds): Protocol = {
      messages.poll(timeout.toMillis, TimeUnit.MILLISECONDS)
    }

    override def management(ctx: ActorContext[Protocol], msg: Signal): Behavior[Protocol] = msg match {
      case _ ⇒ ScalaDSL.Unhandled
    }

    override def message(ctx: ActorContext[Protocol], msg: Protocol): Behavior[Protocol] = msg match {
      case p =>
        behavior(p)
        Same
    }
  }

then I could call behavior.pollMessage(2.seconds) shouldBe somethingToCompareTo which was very similar to using TestProbe.

Although I think EffectfulActorContext is the right way to go, unfortunately couldn't figure out how to properly use it.