I'm testing code which streams messages over an outgoing stream TCP connection obtained via:
(IO(StreamTcp) ? StreamTcp.Connect(settings, address))
.mapTo[StreamTcp.OutgoingTcpConnection]
.map(_.outputStream)
In my tests, I substitute the resulting Subscriber[ByteString]
with a dummy subscriber, trigger some outgoing messages, and assert that have arrived as expected. I use the method below to produce the dummy subscriber and stream result future. (So far, so good)
def testSubscriber[T](settings: FlowMaterializer)(implicit ec: ExecutionContext): (Subscriber[T], Future[Seq[T]]) = {
var sent = Seq.empty[T]
val (subscriber, streamComplete) =
Duct[T].foreach( bs => sent = sent :+ bs)(settings)
(subscriber, streamComplete.map( _ => sent ))
}
My question is this: is there some canonical method for testing that streams output the expected values, something similar to Akka's TestActorRef
? And if not, is there some library function similar to the above function?