1
votes

Given the following code:

def createKafkaStream(ssc: StreamingContext, 
                      kafkaTopics: String, brokers: String): DStream[(String, String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.foreach { msg =>
            // Now do some DataFrame-intensive work.
            // As I understand things, DataFrame ops must be run
            // on Workers as well as streaming consumers.
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

My understanding is that Spark and Kafka will automagically work together to figure out how many consumer threads to deploy to available Worker Nodes, which likely results in parallel processing of messages off a Kafka topic.

But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.

Also, when I call:

val ssc = new StreamingContext(sc, Seconds(10))

Does this mean:

  • That a single consumer thread will receive all messages that were published to the topic in the last 10 seconds; or
  • That a single consumer thread will receive the next (single) message from the topic, and that it will poll for the next message every 10 seconds?
1
Thanks for the feedback, a few things you said: "For starters pseudo-code you've shown cannot result in a valid Spark application (you cannot execute DataFrame-intensive work inside an action)." (1) What do you mean cannot execute? I have been running actions that do invoke DataFrame operations, so are you just saying that I shouldn't be doing this? Or do you mean that it should literally not be possible to do what I have been doing for a few weeks now?!? - smeeb

1 Answers

2
votes

But what if I don't want multiple, parallel consumers? What if want 1-and-only-1 consumer reading the next message from a topic, processing it completely, and then starting back over again and polling for the next message.

If that is your use-case, I'd say why use Spark at all? Its entire advantage is that you can read in parallel. The only hacky workaround I can think of is creating a Kafka topic with a single partition, which would make Spark assign the entire offset range to a single worker, but that is ugly.

Does that mean that a single consumer thread will receive all messages that were published to the topic in the last 10 seconds or that a single consumer thread will receive the next (single) message from the topic, and that it will poll for the next message every 10 seconds?

Neither. Since you're using direct (receiverless) stream approach, it means that every 10 seconds, your driver will ask Kafka to give him the offset ranges that have changed since the last batch, for each partition of the said topic. Then, Spark will take each such offset range, and send it to one of the workers to consume directly from Kafka. This means that with the direct stream approach, there is a 1:1 correspondence between Kafka partitions and Spark partitions.