1
votes

I would like to write a GraphStage which can be paused/unpaused by sending a message from another actor.

The code snipped below shows a simple GraphStage which generates random numbers. When the stage gets materialized the GraphStageLogic sends a message (within preStart()) containing the StageActor to a supervisor. The supervisor keeps the stage's ActorRef and can therefore be used to control the stage.

object RandomNumberSource {
  case object Pause
  case object UnPause
}

class RandomNumberSource(supervisor: ActorRef) extends GraphStage[SourceShape[Int]] {

  val out: Outlet[Int] = Outlet("rnd.out")

  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new RandomNumberSourceLogic(shape)
  }

  private class RandomNumberSourceLogic(shape: Shape) extends GraphStageLogic(shape) with StageLogging {

    lazy val self: StageActor = getStageActor(onMessage)

    val numberGenerator: Random = Random
    var isPaused: Boolean = true

      override def preStart(): Unit = {
        supervisor ! AssignStageActor(self.ref)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          if (!isPaused) {
            push(out, numberGenerator.nextInt())
            Thread.sleep(1000)
          }
        }
      })

      private def onMessage(x: (ActorRef, Any)): Unit =
      {
        x._2 match {
          case Pause =>
            isPaused = true
            log.info("Stream paused")
          case UnPause =>
            isPaused = false
            getHandler(out).onPull()
            log.info("Stream unpaused!")
          case _ =>
        }
      }
    }
}

This is a very simple implementation of the supervisor actor:

object Supervisor {
  case class AssignStageActor(ref: ActorRef)
}

class Supervisor extends Actor with ActorLogging {

  var stageActor: Option[ActorRef] = None

  override def receive: Receive = {

    case AssignStageActor(ref) =>
      log.info("Stage assigned!")
      stageActor = Some(ref)
      ref ! Done

    case Pause =>
      log.info("Pause stream!")
      stageActor match {
        case Some(ref) => ref ! Pause
        case _ =>
      }

    case UnPause =>
      log.info("UnPause stream!")
      stageActor match {
        case Some(ref) => ref ! UnPause
        case _ =>
      }
  }
}

I'm using the following application to run the stream:

object Application extends App {

  implicit val system = ActorSystem("my-actor-system")
  implicit val materializer = ActorMaterializer()

  val supervisor = system.actorOf(Props[Supervisor], "supervisor")

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new RandomNumberSource(supervisor)
  val randomNumberSource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  randomNumberSource.take(100).runForeach(println)

  println("Start stream by pressing any key")

  StdIn.readLine()

  supervisor ! UnPause

  StdIn.readLine()

  supervisor ! Pause

  StdIn.readLine()

  println("=== Terminating ===")
  system.terminate()
}

When the application starts the stage ia in 'paused' state and does not produce any number. When i press a key my stage starts to emit numbers. But my problem is that all messages sent to the stage after it has been started are ignored. I can not pause the stage.

I'm interested in changing the behavior of a stage based on a message received from an actor, but all examples i found pass an actor's message into the stream.

Does somebody has a guess why my code does not work or has an idea how to build such a GraphStage?

Thank you very much!

1

1 Answers

1
votes

The Akka Stream Contrib project has a Valve stage that materializes a value that can pause and resume a flow. From the Scaladoc for this class:

Materializes into a Future of ValveSwitch which provides a the method flip that stops or restarts the flow of elements passing through the stage. As long as the valve is closed it will backpressure.

For example:

val (switchFut, seqSink) = Source(1 to 10)
  .viaMat(new Valve(SwitchMode.Close))(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()

switchFut is a Future[ValveSwitch], and since the switch is closed initially, the valve backpressures and nothing is emitted downstream. To open the valve:

switchFut.onComplete {
  case Success(switch) =>  
    switch.flip(SwitchMode.Open) // Future[Boolean]
  case _ =>
    log.error("the valve failed")
}

More examples are in ValveSpec.