0
votes

Working with Kafka Spark-Streaming. Able to read and process the data sent from Producer. I have a scenario here, lets assume Producer is producing messages and Consumer is turned down for a while and switched on. Now the Conumser is only reading live data. Instead, it should have also retained the data from where it stopped reading. Here is the pom.xml I have been using.

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.0.1</spark.version>
        <kafka.version>0.8.2.2</kafka.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.json/json -->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20160810</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-ast_2.11</artifactId>
            <version>3.2.11</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.2.0</version>
        </dependency>

I have tried working with Kafka-v0.10.1.0 Producer and Conumser. The behaviour is as expected(consumer reads data from where it left). So, in this version offset is picked up correctly.

Have tried using the same version in above pom.xml too, but failed with java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker.

I understand the compatability of versions, but I'm also looking for continuous stream.

1
Did you take a look at my answer? Have the problems been resolved?oh54

1 Answers

0
votes

The different behavior likely arises from the fact that Kafka underwent some rather large changes between versions 0.8 and 0.10.

Unless you absolutely have to use the old version, I suggest switching to newer ones.

Take a look at this link:

https://spark.apache.org/docs/latest/streaming-kafka-integration.html

The Kafka project introduced a new consumer api between versions 0.8 and 0.10, so there are 2 separate corresponding Spark Streaming packages available.

If you want to use Kafka v0.10.1.0, you must thus specify some kafka spark streaming integration dependency at https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11.

Something like this for example:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

Additional note: you're using hadoop 2.2.0 which was released in Oct, 2013 and is thus ancient in Hadoop terms, you should consider changing it to a newer version.

Let me know if this helps.