0
votes

I am trying to read from a Kafka broker with spark streaming but I am facing some issues.

def spark_streaming_from_STABLE_kafka_topic():
    conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)

    topic = "stable_topic"
    kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    {"metadata.broker.list": "my-broker",
                                    "auto.offset.reset": "smallest"},
                                    keyDecoder=lambda x: x,
                                    valueDecoder=lambda x: x
                                    )

    lines = kvs.window(2, 2).map(lambda x: x[1])
    lines.pprint()
    return ssc


if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
    ssc.start()
    ssc.awaitTermination()

Above code does no fetch anything except empty batches:

-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------

-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------

Topic stable_topic contains a fixed size of data. It does not change. I have another topic that receives data every second. If I use this topic instead of stable_topic and remove the "auto.offset.reset": "smallest" then the code fetches data.

I assume that there is something wrong with {"auto.offset.reset": "smallest"} but I cannot figure it out.

Does anyone now what I am doing wrong?

1

1 Answers

1
votes

In later versions, smallest was replaced by earliest. Make sure you check the documentation of the version you are using.

Also, the auto.offset.reset configuration will not get into effect, if the Consumer Group has already been consuming some data from the topic stable_topic. Therefore, you might consider changing the group.id in your streaming job.

If you are assigning a new group.id, make sure to set the auto.offset.reset to smalles (or earliest in newer versions).