0
votes

Background: I have written a simple spark structured steaming app to move data from Kafka to S3. Found that in order to support exactly-once guarantee spark creates _spark_metadata folder, which ends up growing too large, when the streaming app runs for a long time the metadata folder grows so big that we start getting OOM errors. I want to get rid of metadata and checkpoint folders of Spark Structured Streaming and manage offsets myself.

How we managed offsets in Spark Streaming: I have used val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges to get offsets in Spark Structured Streaming. But want to know how to get the offsets and other metadata to manage checkpointing ourself using Spark Structured Streaming. Do you have any sample program that implements checkpointing?

How we managed offsets in Spark Structured Streaming?? Looking at this JIRA https://issues-test.apache.org/jira/browse/SPARK-18258. looks like offsets are not provided. How should we go about?

The issue is in 6 hours size of metadata increased to 45MB and it grows till it reaches nearly 13 GB. Driver memory allocated is 5GB. At that time system crashes with OOM. Wondering how to avoid making this meta data grow so large? How to make metadata not log so much information.

Code:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

enter image description here

Dataset dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("event"); Dataset flatDf = sqlContext .sql("select " + " date, time, id, " + flattenSchema(EVENT_SCHEMA, "event") + " from event"); StreamingQuery query = flatDf .writeStream() .outputMode("append") .option("compression", "snappy") .format("parquet") .option("checkpointLocation", checkpointLocation) .option("path", outputPath) .partitionBy("date", "time", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) .start(); query.awaitTermination();

1
batch or non-batch? - thebluephantom
add a comment for your consideration - thebluephantom
add your code pls - thebluephantom
45mb is nothing - thebluephantom
not convinced things related, pls show error - thebluephantom

1 Answers

1
votes

For non-batch Spark Structured Streaming KAFKA integration:

Quote:

Structured Streaming ignores the offsets commits in Apache Kafka.

Instead, it relies on its own offsets management on the driver side which is responsible for distributing offsets to executors and for checkpointing them at the end of the processing round (epoch or micro-batch).

You need not worry if you follow the Spark KAFKA integration guides.

Excellent reference: https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

For batch the situation is different, you need to manage that yourself and store the offsets.

UPDATE Based on the comments I suggest the question is slightly different and advise you look at Spark Structured Streaming Checkpoint Cleanup. In addition to your updated comments and the fact that there is no error, I suggest you consukt this on metadata for Spark Structured Streaming https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read. Looking at the code, different to my style, but cannot see any obvious error.