2
votes

Given simple source/sink/flows:

val source: Source[Int, NotUsed] = Source(1 to 5)
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val intDoublerFlow: Flow[Int, Int, NotUsed] = Flow.fromFunction[Int, Int](i => i * 2)

I can combine a Source[T, M] with Flow[T, T2, M2] and get a Source[T2, M2] with the via method:

val sourceWithFlow: Source[Int, NotUsed] = source.via(intDoublerFlow)

How can I do the analogous operation and combine a Flow[T, T2, M] with Sink[T2, M2] to get Sink[T, M2]

val sinkWithFlow: Sink[Int, Future[Done]] = ???
1
In this case a Sink[Any, Future[Done] is also a Sink[Int, Future[Done] already, since Sink is contravariant over its first type parameter.Michael Zajac
That's true regarding type signature, but to get the flow function applied to the sink, I needed to use flow.to(sink) or flow.toMat(sink)(Keep.right)clay

1 Answers

1
votes

You're looking to compose your flow and your sink by keeping the materialized value of the latter. As by default the fluent DSL would keep the materialized value of the former, a simple intDoubleFlow.to(sink) would not work. You need to be explicit by using toMat and enforce a Keep.Right.

The resulting code is:

val sinkWithFlow: Sink[Int, Future[Done]] = intDoublerFlow.toMat(sink)(Keep.right)

More info on composing graph stages with regards to materialized values can be found here.