I am trying to do structured streaming from Kafka. I am planning to store checkpoints in HDFS. I read a Cloudera blog recommending not to store checkpoints in HDFS for Spark streaming. Is it same issue for structure streaming checkpoints. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.
In structured streaming, If my spark program is down for certain time, how do I get latest offset from checkpoint directory and load data after that offset. I am storing checkpoints in a directory as shown below.
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
Update:
This is my Structured streaming program reads a Kafka message, decompresses and writes to HDFS.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
logInfo(s"GetBatch called with start = $start, end = $end")
? It should tell you what the query processed. – zsxwing