0
votes

I would like to have each message in the flink consumer stream produce multiple messages, each via a seperate thread, to some topic in kafka using flink kafka producer.Im writing the program in Scala but answers in Java will do

Something like this:

def thread(x:String): Thread =
    {   
    val thread_ = new Thread {
          override def run {

              val str = some_processing(x)
              flink_producer(str)

      }
    }
    return thread_
 }

val stream = flink_consumer()

stream.map(x =>{

                 var i = 0
                 while(i < 10){

                                val th = thread(x)
                                th.start()
                                i = i+1
                                }

           })

So for each input in flink consumer I would like to produce 10 messages to some other queue using multi-threading.

1

1 Answers

1
votes

Most of Flink operators are parallel operators, so there's no reason for you to create any kind of thread in your data pipeline, Flink should be the one that manages how many parallel instances could exist of an operator and if you want to set that value, you should use the following API method.

.setParallelism(N) //N is 10 for you,

You can get more info in Fink documentation

You sould do something like this:

  1. add more task managers slots to your cluster config
  2. instead of map use a flatMap that generates thoose 10 messages
  3. increase the parallelism of the flatMap operator to 10.

Your code should look like this:

val stream = flink_consumer()

stream.flatMap((x, out) =>{
                 var i = 0
                 while(i < 10){
                      val valueToCollect = process(x,i)
                      out.collect(valueToCollect)
                 }

           }).setParallelism(10)
           .map(doSomethingWithGeneratedValues)
           .addSink(sinkThatSendsDataToYourDesiredSystem)

Another aproach if you know how many parallel task you want to have

val stream = flink_consumer()

val resultStream = stream.map(process)
val sinkStream = resultStream.union(resultStream,resultStream,resultStream,...) // joins resultStream N times
sinkStream.addSink(sinkThatSendsDataToYourDesiredSystem)

Finally, you can also have multiple sinks for a DataStream

val stream = flink_consumer()

val resultStream = stream.map(process)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
...
N
...
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)

If you want to do parallel writes to your data sink, you must ensure that the sink you use has support to that kind of write operations.