Ok, so I found a decent solution. It was already sitting there under my nose, I just did not see it. Source.lazyEmpty
materializes into a promise that when completed will terminate the Source and the stream behind it.
The remaining question is, how to include it into the infinite stream of random numbers. I tried Zip
. The result was that no random numbers made it through the stream because lazyEmpty
never emits values (doh). I tried Merge
but the stream never terminated because Merge
continues until all sources have completed.
So I wrote my own merge. It forwards all values from one of the input ports and terminates when any source completed.
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
The flow can be plugged into any stream. When materialized it will return a Promise
that terminates the stream on completion. Here's my test for it.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
I hope this is useful for other too. Still leaves me puzzled why there is no built in way for terminating streams from outside of the stream.
Zip
stage to achieve this: zip your source with another source that always produces a value until a certain condition is met and use your REST API to pilot the second source. I'm not sure that this is the best way though, so I preferred not answering the question; I took inspiration from this: doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/… - Aldo Stracquadanio