3
votes

With previous versions of Akka Streams, groupBy returned a Source of Sources that could be materialized into a Source[Seq[A]].

With Akka Streams 2.4 I see that groupBy returns a SubFlow - it's not clear to me how use this. The transformations I need to apply to the flow have to have the whole Seq available, so I can't just map over the SubFlow (I think).

I've written a class that extends GraphStage that does the aggregation via a mutable collection in the GraphStageLogic, but is there in-built functionality for this? Am I missing the point of SubFlow?

1

1 Answers

0
votes

I ended up writing a GraphStage:

class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] {
  val in: Inlet[A] = Inlet("in")
  val out: Outlet[Seq[A]] = Outlet("out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      private var counter: Option[B] = None
      private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A]

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val element = grab(in)

          counter.fold({
            counter = Some(f(element))
            aggregate += element
            pull(in)
          }) { p =>
            if (f(element) == p) {
              aggregate += element
              pull(in)
            } else {
              push(out, aggregate)
              counter = Some(f(element))
              aggregate = scala.collection.mutable.ArrayBuffer(element)
            }
          }
        }
        override def onUpstreamFinish(): Unit = {
          emit(out, aggregate)
          complete(out)
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}