I've got a SourceQueue
. When I offer an element to this I want it to pass through the Stream
and when it reaches the Sink
have the output returned to the code that offered this element (similar as Sink.head
returns an element to the RunnableGraph.run()
call).
How do I achieve this? A simple example of my problem would be:
val source = Source.queue[String](100, OverflowStrategy.fail)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.ReturnTheStringSomehow
val graph = source.via(flow).to(sink).run()
val x = graph.offer("foo")
println(x) // Output should be "Modified foo"
val y = graph.offer("bar")
println(y) // Output should be "Modified bar"
val z = graph.offer("baz")
println(z) // Output should be "Modified baz"
Edit: For the example I have given in this question Vladimir Matveev provided the best answer. However, it should be noted that this solution only works if the elements are going into the sink
in the same order they were offered to the source
. If this cannot be guaranteed the order of the elements in the sink
may differ and the outcome might be different from what is expected.