I am running a pyspark job and the data streaming is from Kafka. I am trying to replicate a scenario in my windows system to find out what happens when the consumer goes down while the data is continuously being fed into Kafka.
Here is what i expect.
- producer is started and produces message 1, 2 and 3.
- consumer is online and consumes messages 1, 2 and 3.
- Now the consumer goes down for some reason while the producer produces messages 4, 5 and 6 and so on...
- when the consumer comes up, it is my expectation that it should read where it left off. So the consumer must be able to read from message 4, 5 , 6 and so on....
My pyspark application is not able to achieve what i expect. here is how I created a Spark Session.
session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickapijson")
.option("startingoffsets" , "latest") \
.load()
I googled and gathered quite a bit of information. It seems like the groupID is relevant here. Kafka maintains the track of offsets read by each consumer in a particular groupID. If a consumer subscribes to a topic with a groupId, say, G1, kafka registers this group and consumerID and keeps a track of this groupID and ConsumerID. If at all, the consumer has to go down for some reason, and restarts with the same groupID, then the kafka will have the information of the already read offsets so the consumer will read the data from where it left off.
This is exactly happening when i use the following command to invoke consumer job in CLI.
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test
Now when my producer produces the messages 1,2 and 3, consumer is able to consume. I killed the running consumer job(CLI .bat file) after the 3 rd message is read. My producer produces the message 4, 5 and 6 and so on.... Now I bring back my consumer job (CLI .bat file) and it able to read the data from where it left off ( from message 4). This is behaving as I expect.
I am unable to do the same thing in pyspark.
when I am including the option("group.id" , "test")
, it throws an error saying Kafka option group.id
is not supported as user-specified consumer groups are not used to track offsets.
Upon observing the console output, each time my pyspark consumer job is kicked off, it is creating a new groupID. If my pyspark job has run previously with a groupID and failed, when it is restarted it is not picking up the same old groupID. It is randomly getting a new groupID. Kafka has the offset information of the previous groupID but not the current newly generated groupID. Hence my pyspark application is not able to read the data fed into Kafka while it was down.
If this is the case, then wont I lose my data when the consumer job has gone down due to some failure?
How can i give my own groupid to the pyspark application or how can i restart my pyspark application with same old groupid?