2
votes

I'm testing code which streams messages over an outgoing stream TCP connection obtained via:

(IO(StreamTcp) ? StreamTcp.Connect(settings, address))
.mapTo[StreamTcp.OutgoingTcpConnection]
.map(_.outputStream)

In my tests, I substitute the resulting Subscriber[ByteString] with a dummy subscriber, trigger some outgoing messages, and assert that have arrived as expected. I use the method below to produce the dummy subscriber and stream result future. (So far, so good)

def testSubscriber[T](settings: FlowMaterializer)(implicit ec: ExecutionContext): (Subscriber[T], Future[Seq[T]]) = {
  var sent = Seq.empty[T]
  val (subscriber, streamComplete) = 
    Duct[T].foreach( bs => sent = sent :+ bs)(settings)
  (subscriber, streamComplete.map( _ => sent ))
}

My question is this: is there some canonical method for testing that streams output the expected values, something similar to Akka's TestActorRef? And if not, is there some library function similar to the above function?

1

1 Answers

3
votes

Testing streams is possible with the akka-streams-testkit.
Read about it here: http://doc.akka.io/docs/akka/current/scala/stream/stream-testkit.html