1
votes

I am running a pyspark job and the data streaming is from Kafka. I am trying to replicate a scenario in my windows system to find out what happens when the consumer goes down while the data is continuously being fed into Kafka.

Here is what i expect.

  • producer is started and produces message 1, 2 and 3.
  • consumer is online and consumes messages 1, 2 and 3.
  • Now the consumer goes down for some reason while the producer produces messages 4, 5 and 6 and so on...
  • when the consumer comes up, it is my expectation that it should read where it left off. So the consumer must be able to read from message 4, 5 , 6 and so on....

My pyspark application is not able to achieve what i expect. here is how I created a Spark Session.

session.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "clickapijson")
  .option("startingoffsets" , "latest") \
  .load()

I googled and gathered quite a bit of information. It seems like the groupID is relevant here. Kafka maintains the track of offsets read by each consumer in a particular groupID. If a consumer subscribes to a topic with a groupId, say, G1, kafka registers this group and consumerID and keeps a track of this groupID and ConsumerID. If at all, the consumer has to go down for some reason, and restarts with the same groupID, then the kafka will have the information of the already read offsets so the consumer will read the data from where it left off.

This is exactly happening when i use the following command to invoke consumer job in CLI.

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test 

Now when my producer produces the messages 1,2 and 3, consumer is able to consume. I killed the running consumer job(CLI .bat file) after the 3 rd message is read. My producer produces the message 4, 5 and 6 and so on.... Now I bring back my consumer job (CLI .bat file) and it able to read the data from where it left off ( from message 4). This is behaving as I expect.

I am unable to do the same thing in pyspark.

when I am including the option("group.id" , "test"), it throws an error saying Kafka option group.id is not supported as user-specified consumer groups are not used to track offsets.

Upon observing the console output, each time my pyspark consumer job is kicked off, it is creating a new groupID. If my pyspark job has run previously with a groupID and failed, when it is restarted it is not picking up the same old groupID. It is randomly getting a new groupID. Kafka has the offset information of the previous groupID but not the current newly generated groupID. Hence my pyspark application is not able to read the data fed into Kafka while it was down.

If this is the case, then wont I lose my data when the consumer job has gone down due to some failure?

How can i give my own groupid to the pyspark application or how can i restart my pyspark application with same old groupid?

1
yes Mike it did answer my question. Thanks for your help.shankar

1 Answers

2
votes

In the current Spark version (2.4.5) it is not possible to provide your own group.id as it gets automatically created by Spark (as you already observed). The full details on the offset management in Spark reading from Kafka is given here and summarised below:

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

group.id: Kafka source will create a unique group id for each query automatically.

auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.

enable.auto.commit: Kafka source doesn’t commit any offset.

For Spark to be able to remember where it left off reading from Kafka, you need to have checkpointing enabled and provide a path location to store the checkpointing files. In Python this would look like:

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()

More details on checkpointing are given in the Spark docs on Recovering from Failures with Checkpointing.