5
votes

I used structured streaming to load messages from kafka, do some aggreation then write to parquet file. The problem is that there are so many parquet files created (800 files) for only 100 messages from kafka.

The aggregation part is:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));

The query:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");

When loading one of the parquet file using spark, it shows empty

+------+-----+
|window|total|
+------+-----+
+------+-----+

How can I save the dataset to only one parquet file? Thanks

1
You can use result.repartition(1). But it may lead to OOM Exception. You can give some good number to repartition(), to avoid OOM Exception. - mrsrinivas
It still creates empty parquet files. It seems that every time the query processing, it writes the results to a separate parquet file. How to specify the name of the file and limit the query to write and update only on that file? - taniGroup
What did you do to solve the issue? - ArunK
I met the same issue, how did you resolve it? Thanks! - Casel Chen
Did Nobody solve this? I lost my way. - nullmari

1 Answers

2
votes

My idea was to use Spark Structured Streaming to consume events from Azure Even Hub then store them on storage in a parquet format.

I finally figured out how to deal with many small files created. Spark version 2.4.0.

This how my query looks like

dfInput
  .repartition(1, col('column_name'))
  .select("*")
  .writeStream
  .format("parquet")
  .option("path", "adl://storage_name.azuredatalakestore.net/streaming")
  .option("checkpointLocation", "adl://storage_name.azuredatalakestore.net/streaming_checkpoint")
  .trigger(processingTime='480 seconds')
  .start()

As a result, I have one file created on a storage location every 480 seconds. To figure out the balance between file size and number of files to avoid OOM error, just play with two parameters: number of partitions and processingTime, which means the batch interval.

I hope you can adjust the solution to your use case.