4
votes

I am trying to do structured streaming from Kafka. I am planning to store checkpoints in HDFS. I read a Cloudera blog recommending not to store checkpoints in HDFS for Spark streaming. Is it same issue for structure streaming checkpoints. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/.

In structured streaming, If my spark program is down for certain time, how do I get latest offset from checkpoint directory and load data after that offset. I am storing checkpoints in a directory as shown below.

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()

Update:

This is my Structured streaming program reads a Kafka message, decompresses and writes to HDFS.

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()
4
Are you sure that the blog recommends you not to store checkpoints on HDFS? That's pretty weird. Do you have the link? For structured streaming question, just run the same codes using the same checkpoint directory, structured streaming will pick up the last failure offset and restart from it.zsxwing
@zsxwing This is cloudera blog link blog.cloudera.com/blog/2017/06/… I manually killed my streaming program for a minute and started it again and it started processing messages it received only after it is up. It ignored missed messages when it was down and it didnot process them againranjith reddy
Could you take a look at the driver log and find logs outputted by logInfo(s"GetBatch called with start = $start, end = $end")? It should tell you what the query processed.zsxwing

4 Answers

5
votes

Storing Checkpoint on longterm storage(HDFS, AWS S3,etc.) are most preferred. I would Like to add one point here that the property "failOnDataLoss" should not be set to false as it is not best practice. Data loss is something no one would like to afford. Rest you are on the right Path.

4
votes

In structured streaming, If my spark program is down for certain time, how do I get latest offset from checkpoint directory and load data after that offset.

Under your checkpointdir folder you will find a folder name 'offsets'. Folder 'offsets' maintain the next offsets to be requested from kafka. Open the latest file(latest batch file) under 'offsets' folder, the next expected offsets will be in format below

{"kafkatopicname":{"2":16810618,"1":16810853,"0":91332989}}

To load data after that offset, set below property to your spark read stream

 .option("startingOffsets", "{\""+topic+"\":{\"0\":91332989,\"1\":16810853,\"2\":16810618}}")

0,1,2 are the partitions in topic.

0
votes

As I understood the artificial it recommend maintaining the offset management either in: Hbase, Kafka, HDFS or Zookeeper.

"It is worth mentioning that you can also store offsets in a storage system like HDFS. Storing offsets in HDFS is a less popular approach compared to the above options as HDFS has a higher latency compared to other systems like ZooKeeper and HBase."

you can find in Spark Documentation how to restart a query from an existing checkpoint at: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

0
votes

In your query, try applying a checkpoint while writing results to some persistent storage like HDFS in some format like parquet. It worked good for me.