I would like to have each message in the flink consumer stream produce multiple messages, each via a seperate thread, to some topic in kafka using flink kafka producer.Im writing the program in Scala but answers in Java will do
Something like this:
def thread(x:String): Thread =
{
val thread_ = new Thread {
override def run {
val str = some_processing(x)
flink_producer(str)
}
}
return thread_
}
val stream = flink_consumer()
stream.map(x =>{
var i = 0
while(i < 10){
val th = thread(x)
th.start()
i = i+1
}
})
So for each input in flink consumer I would like to produce 10 messages to some other queue using multi-threading.