2
votes

I need to create a function with the following Interface:

import akka.kafka.scaladsl.Consumer.Control

object ItemConversionFlow {

def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
    // Implementation goes here
}

My problem is that I don't know how to define the flow in a way that it fits the interface above.

When I am doing something like this

val flow = Flow[Item]
    .map(item => doConversion(item)
    .filter(_.isDefined)
    .map(_.get)

the resulting type is Flow[Item, OtherItem, NotUsed]. I haven't found something in the Akka documentation so far. Also the functions on akka.stream.scaladsl.Flow only offer a "NotUsed" instead of Control. Would be great if someone could point me into the right direction.

Some background: I need to setup several pipelines which only distinguish in the conversion part. These pipelines are sub streams to a main stream which might be stopped for some reason (a corresponding message arrives in some kafka topic). Therefor I need the Control part. The idea would be to create a Graph template where I just insert the mentioned flow as argument (a factory returning it). For a specific case we have a solution which works. To generalize it I need this kind of flow.

1
What is Control, and when is it constructed? (you could construct it in mapMaterializedvalue) or in a custom GraphStage, or when using the GraphDSLViktor Klang
I will take a look. In regards to Control: Control = akka.kafka.scaladsl.Consumer.Control @ Viktor KlangStoecki

1 Answers

0
votes

You actually have backpressure. However, think about what do you really need about backpressure... you are not using asynchronous stages to increase your throughput... for example. Backpressure avoids fast producers overgrowing susbscribers https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html. In your sample don´t worry about it, your stream will ask for new elements to he publisher depending on how long doConversion takes to complete.

In case that you want to obtain the result of the stream use toMat or viaMat. For example, if your stream emits Item and transform these into OtherItem:

val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
  .map(item => doConversion(item))
  .filter(_.isDefined)
  .map(_.get)
  .toMat(Sink.fold(List[OtherItem]())((a, b) => {
      // Examine the result of your stream
      b :: a
    }))(Keep.right)
  .run()

str will be Future[List[OtherItem]]. Try to extrapolate this to your case.

Or using toMat with KillSwitches, "Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion * of that unique materialization. Different materializations result in different, independent switches."

  def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
    Flow[Item]
      .map(item => doConversion(item))
      .filter(_.isDefined)
      .map(_.get)
      .viaMat(KillSwitches.single)(Keep.right)
  }


  val stream = 
    Source.fromIterator(() => List(Item(Some(1))).toIterator)
    .viaMat(build(StreamConfig(1)))(Keep.right)
    .toMat(Sink.ignore)(Keep.both).run

  // This stops the stream
  stream._1.shutdown()

  // When it finishes
  stream._2 onComplete(_ => println("Done"))