In Akka Stream 2.4.2, PushStage has been deprecated. For Streams 2.0.3 I was using the solution from this answer:
How does one close an Akka stream?
which was:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
How would I close a stream in 2.4.2 immediately, from inside a GraphStage / onPush() ?