7
votes

I'm designing a small tool that will generate CSV test data. I want to use Akka Streams (1.0-RC4) to implement the data flow. There will be a Source that generates random numbers, a transformation into CSV strings, some rate limiter and a Sink that writes into a file.

Also there should be a clean way of stopping the tool using a small REST interface.

This is where I'm struggling. After the stream has been started (RunnableFlow.run()) there seems to be no way of stopping it. Source and Sink are infinite (at least until disk runs full :)) so they will not stop the stream.

Adding control logic to Source or Sink feels wrong. Using ActorSystem.shutdown() too. What would be a good way of stopping the stream?

2
I think that you could use the Zip stage to achieve this: zip your source with another source that always produces a value until a certain condition is met and use your REST API to pilot the second source. I'm not sure that this is the best way though, so I preferred not answering the question; I took inspiration from this: doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/… - Aldo Stracquadanio
Akka seems to follow a similar approach in their Tcp server implementation. When opening a socket the source returns incoming connection and materializes into a ServerBinding object that can be used to close the socket. Due to this and the lack of better ideas I'd accept your response as the correct one. - ErosC

2 Answers

10
votes

Ok, so I found a decent solution. It was already sitting there under my nose, I just did not see it. Source.lazyEmpty materializes into a promise that when completed will terminate the Source and the stream behind it.

The remaining question is, how to include it into the infinite stream of random numbers. I tried Zip. The result was that no random numbers made it through the stream because lazyEmpty never emits values (doh). I tried Merge but the stream never terminated because Merge continues until all sources have completed.

So I wrote my own merge. It forwards all values from one of the input ports and terminates when any source completed.

object StopperFlow {

  private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
    val in = newInlet[A]("in")
    val stopper = newInlet[Unit]("stopper")

    override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
  }

  private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
    new StopperMergeShape(), Attributes.name("StopperMerge")) {
    import FlexiMerge._

    override def createMergeLogic(p: PortT) = new MergeLogic[In] {
      override def initialState =
        State[In](Read(p.in)) { (ctx, input, element) =>
          ctx.emit(element)
          SameState
        }

      override def initialCompletionHandling = eagerClose
    }
  }

  def apply[In](): Flow[In, In, Promise[Unit]] = {
    val stopperSource = Source.lazyEmpty[Unit]

    Flow(stopperSource) { implicit builder =>
      stopper =>
        val stopperMerge = builder.add(new StopperMerge[In]())

        stopper ~> stopperMerge.stopper

        (stopperMerge.in, stopperMerge.out)
    }
  }    
}

The flow can be plugged into any stream. When materialized it will return a Promise that terminates the stream on completion. Here's my test for it.

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val startTime = System.currentTimeMillis()

def dumpToConsole(f: Float) = {
  val timeSinceStart = System.currentTimeMillis() - startTime
  System.out.println(s"[$timeSinceStart] - Random number: $f")
}

val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)

val (_, promise) = flow.run()

Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)

I hope this is useful for other too. Still leaves me puzzled why there is no built in way for terminating streams from outside of the stream.

0
votes

Not exactly stopping, but limiting. You can use limit or take.

Example from Streams Cookbook:

val MAX_ALLOWED_SIZE = 100

// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
val limited: Future[Seq[String]] =
  mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq)

// OK. Collect up until max-th elements only, then cancel upstream
val ignoreOverflow: Future[Seq[String]] =
  mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)