I need to create a function with the following Interface:
import akka.kafka.scaladsl.Consumer.Control
object ItemConversionFlow {
def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
// Implementation goes here
}
My problem is that I don't know how to define the flow in a way that it fits the interface above.
When I am doing something like this
val flow = Flow[Item]
.map(item => doConversion(item)
.filter(_.isDefined)
.map(_.get)
the resulting type is Flow[Item, OtherItem, NotUsed]. I haven't found something in the Akka documentation so far. Also the functions on akka.stream.scaladsl.Flow only offer a "NotUsed" instead of Control. Would be great if someone could point me into the right direction.
Some background: I need to setup several pipelines which only distinguish in the conversion part. These pipelines are sub streams to a main stream which might be stopped for some reason (a corresponding message arrives in some kafka topic). Therefor I need the Control part. The idea would be to create a Graph template where I just insert the mentioned flow as argument (a factory returning it). For a specific case we have a solution which works. To generalize it I need this kind of flow.
mapMaterializedvalue
) or in a custom GraphStage, or when using the GraphDSL – Viktor Klang