0
votes

I am pretty sure that there is no simple way of doing this, but here is my use case:

I have a Spark Streaming job (version 2.1.0) with a 5 second duration for each micro batch.

My goal, is to consume data from 1 different topic at every microbatch interval, of a total 250 Kafka topics. You can take the code bellow as a simple example:

 val groupId:String = "first_group"
 val kafka_servers:String =  "datanode1:9092,datanode2:9092,datanode3:9092"

 val ss:SparkSession = SparkSession.builder().config("spark.streaming.unpersist","true").appName("ConsumerStream_test").getOrCreate()
 val ssc:StreamingContext= new StreamingContext(ss.sparkContext,Duration(5000))

val kafka_parameters:Map[String,Object]=Map(
"bootstrap.servers"       -> kafka_servers,
"key.deserializer"        -> classOf[StringDeserializer],
"value.deserializer"      -> classOf[ByteArrayDeserializer],
"heartbeat.interval.ms"   -> (1000:Integer),
"max.poll.interval.ms"    -> (100:Integer),
"enable.auto.commit"      -> (false: java.lang.Boolean),
"autoOffsetReset"         -> OffsetResetStrategy.EARLIEST,
//"connections.max.idle.ms" -> (5000:Integer),
"group.id"                -> groupId
)

val r = scala.util.Random
val kafka_list_one_topic=List("topic_"+ r.nextInt(250))

val consumer:DStream[ConsumerRecord[String,Array[Byte]]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferBrokers, ConsumerStrategies.
Subscribe[String, Array[Byte]](kafka_list_one_topic , kafka_parameters))

consumer.foreachRDD( eachRDD => {
     // DOING SOMETHING WITH THE DATA...
  })
ssc.start()
ssc.awaitTermination()

But the issue with this approach, is that Spark will only run the initial code (everything before the foreachRDD command) once, in order to create the Kafka consumer DStream, but in the following micro batch, it only runs the "foreachRDD" statement.

As an example, let's say that r.nextInt(250) returned 40. The Spark Streaming job will connect to topic_40 and process its data. But in the next micro batches, it will still connect to topic_40, and ignore all the commands before the foreachRDD statement.

I guess this is expected, since the code before the foreachRDD statement runs only on the Spark driver.

My question is, is there a way that I can do this without having to relaunch a Spark application every 5 seconds?

Thank you.

1
Can't you create a list of all the topics and consume data ? - koiralo
@ShankarKoirala , that approach would be fine if I had 5 to 10 topics to consume from. But with 250, it takes a long time to connect to the Kafka brokers, mainly because the connection to each topic is not parallel, but sequential (you can find more about it here stackoverflow.com/a/34448362/4135691) - manuel mourato
@ShankarKoirala each topic has one partition and receives data from a different data source. It is a common practice to keep data separated in this way, both for organization and scalability purposes. - manuel mourato
I would love to know why you want to do this? What's the design pattern you're trying to implement here? - Robin Moffatt
@RobinMoffatt , here is the use case: I have 250 data sources that send data to a kafka topic each via a Kafka Producer. a Spark Streaming job then consumes from those topics every 5 seconds to process that data. However, not all topics have new data in a given micro batch. So based on the topics offset, I determine to which topics I should connect in that micro batch. I do this, because trying to connect to all 250 kafka topics at the same time takes way too long, for the processing time that I need to achieve. - manuel mourato

1 Answers

0
votes

My approach would be really simple, if you want it to be really random and don't care about any other consequences, make the kafka_list_one_topic as a mutable variable and change it in the streaming code.

val r = scala.util.Random
var kafka_list_one_topic=List("topic_"+ r.nextInt(250))

val consumer:DStream[ConsumerRecord[String,Array[Byte]]] = 
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferBrokers, 
ConsumerStrategies.
Subscribe[String, Array[Byte]](kafka_list_one_topic , kafka_parameters))

consumer.foreachRDD( eachRDD => {
 // DOING SOMETHING WITH THE DATA...
 kafka_list_one_topic=List("topic_"+ r.nextInt(250))
 })
ssc.start()
ssc.awaitTermination()