0
votes

Kafka 0.8 official doc describes Kafka Consumer as follows:

"Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines. If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers."

I setup a Kafka cluster with Kafka 0.8.1.1 and use Spark Streaming job (spark 1.3) to pull data from its topics. The Spark Streaming code as follows:

    ... ...

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokerList);
    kafkaParams.put("group.id", groupId);

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

    messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {

        @Override
        public Void call(JavaPairRDD<String, String> rdd) throws Exception {
            long msgNum = strJavaRDD.count();
            System.out.println("There are " + msgNum + " messages read from Kafka.");

        ... ...

        return null;}});

And then I submitted two Spark Streaming jobs to access the same topic with same group id. I assumed that when I send 100 messages to the topic, the two jobs totally get 100 message (e.g. job1 get 50 and job2 get 50; or job1 get 100 and job2 get 0). However, they get 100 respectively. Such a result seems different from what the Kafka doc said.

Is there anything with my code? Did I set the group id config correctly? Is this a bug or a design for createDirectStream()?

Test Env: Kafka 0.8.1.1 + Spark 1.3.1

2
Got it. I need to use "createStream" instead of "createDirectStream" to share messages in a topic among threads sharing a single group id. - JuliaLi

2 Answers

0
votes

Group is a feature of Kafka's high level consumer API before version 0.9, it's not available in simple consume API. createDirectStream use the simple consumer API.

Some Tips:

  1. The main reason to use a SimpleConsumer implementation is you want greater control over partition consumption than Consumer Groups give you. (EG: Read a message multiple times)

  2. createDirectStream: Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.

Refer:

  1. Spark Streaming + Kafka Integration Guide
  2. 0.8.0 SimpleConsumer Example

Kafka 0.9.0 release added a new Java consumer to replace the existing high-level ZooKeeper-based consumer and low-level consumer APIs. And then you can use group and commit the offset manual at the same time.

0
votes

Creating two different spark apps to do the same thing with the same messages doesn't make sense. Use one app with more executors.