12
votes

There is a data lake of CSV files that's updated throughout the day. I'm trying to create a Spark Structured Streaming job with the Trigger.Once feature outlined in this blog post to periodically write the new data that's been written to the CSV data lake in a Parquet data lake.

Here's what I have:

val df = spark
  .readStream
  .schema(s)
  .csv("s3a://csv-data-lake-files")

The following command wrote all the data to the Parquet lake, but didn't stop after all the data was written (I had to manually cancel the job).

processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

The following job also worked, but didn't stop after all the data was written either (I had to manually cancel the job):

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.awaitTermination()

The following command stopped the query before any data got written.

val query = processedDf
  .writeStream
  .trigger(Trigger.Once)
  .format("parquet")
  .option("checkpointLocation", "s3-path-to-checkpoint")
  .start("s3-path-to-parquet-lake")

query.stop()

How can I configure the writeStream query to wait until all the incremental data has been written to Parquet files and then stop?

2
What do you mean by "didn't stop"? This is a streaming job, it isn't supposed to stop, just be triggered once a day. - Yuval Itzchakov
@YuvalItzchakov- I would like to spin up a cluster, write the new data in the CSV lake to the Parquet lake, and then shut down the cluster. I was assuming the writeStream process would stop. In the Databricks blog post (databricks.com/blog/2017/05/22/…), the "Scheduling Runs with Databricks" section has an image that shows jobs with a set duration and a status of succeeded. If the writeStream job keeps running, then the cluster won't shut down. I think I must be missing something. - Powers
You're not missing anything, I looked at the code and it does seem that the query should terminate after executing the single job. - Yuval Itzchakov
I cannot reproduce it. Could you create a reproducer using the local file system? - zsxwing
@Powers I am also facing the same issue. Ideally, the streaming query should stop after the job is done. But, it doesn't stop. In fact, it keeps running and also keeps the connections open. - himanshuIIITian

2 Answers

3
votes

I got Structured Streaming + Trigger.Once to work properly on a Parquet data lake.

I don't think it was working with the CSV data lake because the CSV data lake had a ton of small files in nested directories. Spark does not like working with small CSV files (I think it needs to open them all to read the headers) and really hates when it needs to glob S3 directories.

So I think the Spark Structured Streaming + Trigger.Once code is good - they just need to make the CSV reader tech better.

2
votes

The main purpose of structured streaming is to process data continuously without a need to start/stop streams when new data arrives. Read this for more details.

Starting from Spark 2.0.0 StreamingQuery has method processAllAvailable that waits for all source data to be processed and committed to the sink. Please note that scala docs states to use this method for testing purpose only.

Therefore the code should look like this (if you still want it):

query.processAllAvailable
query.stop