1
votes

I'm trying to integrate spark and Kafka for consuming the messages from Kafka. I have producer code also to send messages on "temp" topic. Also, I'm using Kafka's Console Producer to producer the messages on "temp" topic.

I have created below code to consume the messages from the same "temp" topic but it will not receive single message also.

Program:

import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
import static org.apache.commons.lang3.StringUtils.SPACE;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import scala.collection.immutable.ListSet;
import scala.collection.immutable.Set;

public class ConsumerDemo {

    public void main() {
        String zkGroup = "localhost:2181";
        String group = "test";
        String[] topics = {"temp"};
        int numThreads = 1;

        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<>();
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        System.out.println("topics : " + Arrays.toString(topics));
        JavaPairReceiverInputDStream<String, String> messages
                = KafkaUtils.createStream(jssc, zkGroup, group, topicMap);

        messages.print();

        JavaDStream<String> lines = messages.map(Tuple2::_2);

        //lines.print();
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);

        //wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }

    public static void main(String[] args) {
        System.out.println("Started...");
        new ConsumerDemo().main();
        System.out.println("Ended...");
    }
}

I added following dependencies in the pom.xml file:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>0.9.0-incubating</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.3</version>
        <type>jar</type>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

    <dependency>
        <groupId>org.anarres.lzo</groupId>
        <artifactId>lzo-core</artifactId>
        <version>1.0.5</version>
        <type>jar</type>
    </dependency>

    <dependency> 
        <groupId>com.fasterxml.jackson.core</groupId> 
        <artifactId>jackson-databind</artifactId> 
        <version>2.8.2</version> 
    </dependency> 
    <dependency> 
        <groupId>com.fasterxml.jackson.module</groupId> 
        <artifactId>jackson-module-scala_2.10</artifactId> 
        <version>2.8.2</version> 
    </dependency>
    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>

Is I'm missing some dependency or issue is in code? Why this code will not receive any messages?

2
Are you able to consume messages using console based consumer ? If not then there might be issue with producer. Also, check if your port number is correct or not. I don't think there should be any issue in POM, if there is one, it should not allow you to built/compile the project.Nilesh Pharate
@NileshPharate- Yes I'm able to consume messages using Console Consumer of Kafka so we can say that issue is not related to kafka or zookeeper i.e. same ip and port I'm using for console approach.ketan

2 Answers

0
votes

You are not calling the method where you have code to connect and consume messages from Kafka. Either write that logic in public static void main() or call the method where you have written this logic.

0
votes

When using Kafka consumer, and especially when we are testing and debugging in development environment the producer may not be pushing messages to Kafka continuously. In this scenario we need to take care of this Kafka consumer parameter auto.offset.reset which determines whether to read only new messages which are written to topic after the consumer starts running? or to read from the beginning of the topic

here is the official explanation given in Kafka documentation:

auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  1. earliest: automatically reset the offset to the earliest offset
  2. latest: automatically reset the offset to the latest offset
  3. none: throw exception to the consumer if no previous offset is found for the consumer's group
  4. anything else: throw exception to the consumer.

a sample code snippet on how to create KafkaDStream using kafkaParams as below:

    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "localhost:2181");
    kafkaParams.put("group.id", "test02");  //While you are testing the codein develeopment system, change this groupid each time you run the consumer
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("metadata.broker.list", "localhost:9092");
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    Map<String, Integer> topics = new HashMap<String, Integer>();
    topics.put("temp", 1);
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();

    JavaPairDStream<String, String> messages =
        KafkaUtils.createStream(jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topics,
                storageLevel)    
        ;
    messages.print();