1
votes

I am looking to replace one part of my Akka Streams-based stream processor with Flink. Is it currently possible to use Akka Streams as a source for Flink, and then Flink as a source for Akka Streams in the same codebase?

Current flow with Akka Streams is as follows:

 // Kafka Source -> Read Flow involving Azure cosmosDB -> Evaluate Flow -> Write Flow -> Logger Flow -> Sink
  lazy private val converterGraph: Graph[ClosedShape.type, Future[Done]] =
    GraphDSL.create(sink) { implicit builder => out =>
      source.watchTermination()(Keep.none) ~> prepareFlow ~> readFlow ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out
      ClosedShape
  }

Flows above are defined like this:

def prepareFlow: Flow[FromSource, ToRead, NotUsed]

def readFlow: Flow[ToRead, ToEvaluate, NotUsed]

Now instead of the readFlow being an Akka flow, I would like to replace it with a Flink stream processor. So the output of prepareFlow would be an input for Flink-based readFlow, and output of that would be input to evaluateFlow.

Basically, is it possible to do something like this:

  prepareFlow ~> [Flink source ->read -> result] ~> evaluateFlow ~> writeFlow ~> loggerFlow ~> out

I see that there is a Flink Akka connector (Sink) in Apache Bahir, but not sure if that could be used with just Akka actors or also streams.

1

1 Answers

0
votes

You can wrap your prepareFlow reading from CosmosDB as a custom flink Source (by extending a SourceFunction), and wrap the whole evaluate-write-logger flow as a custom SinkFunction.

As Flink itself is distributed, then you will integrate an akka-stream into a Flink job, but not vice versa. Main problems I see with this approach is that akka-stream had back-pressure out of the box, but Flink itself is mostly blocking. For example, SourceFunction.run() method requires to have an internal infinite loop producing messages on every iteration, so you have to block there to wait for akka-stream to produce next message there.