Given the following code:
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String, String)] = {
// some configs here
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, props, topicsSet)
}
def consumerHandler(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(10))
createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
rdd.foreach { msg =>
// Now do some DataFrame-intensive work.
// As I understand things, DataFrame ops must be run
// on Workers as well as streaming consumers.
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()
My understanding is that Spark and Kafka will automagically work together to figure out how many consumer threads to deploy to available Worker Nodes, which likely results in parallel processing of messages off a Kafka topic.
But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.
Also, when I call:
val ssc = new StreamingContext(sc, Seconds(10))
Does this mean:
- That a single consumer thread will receive all messages that were published to the topic in the last 10 seconds; or
- That a single consumer thread will receive the next (single) message from the topic, and that it will poll for the next message every 10 seconds?