1
votes

how can I delete old data created by Spark Structured Streaming (Spark 2.4.5)?

I have data on HDFS in parquet/avro format (not Delta), that is created by Spark Structured Streaming and partitioned by time (year,month,day of month,hour).

The data is created as following:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

As result i have following partition folder layout:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

How can I delete old data, for example older than year=2020,month=2,day=13,hour=14?

Just deleting relevant folders

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

throws an exception while reading batch dataframe from file-system:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

As I've figured out that is somehow related to _spark_metadata folder that is used by checkpoints.

Appreciate for your help.

3

3 Answers

1
votes

You can't delete that folder unless you delete it's corresponding checkpoint folders too. You are trying to delete the folder while the checkpoint still has knowledge of it, so that is why the error is occurring.

However, I really wouldn't recommend messing with the checkpoint folder unless necessary. If it's possible in your situation, I'd suggest instead moving your old data to different data storage types such as in AWS Standard -> Glacier.

4
votes

It seems that I found solution/workaround. The key concept is to use FileStreamSinkLog andn udpate it with SinkFileStatus with action set to delete:

  1. load FileStreamSinkLog

     sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. get latest SinkFileStatus

     Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
     long batchId = (long)latest.get()._1;
     SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. delete old files

  4. Add new new entry with delete action to fileStatuses array

  5. write batchId log file back with updated fileStatuses

However this requires that structured streaming job be stopped. So there's no solution for delete old files written by Spark Structured Streaming without stopping it.

0
votes

For your ease of copy/paste, here's a working code (scala) snippet as of spark 3.0.1. Deletes one file and writes a new batch:

import org.apache.spark.sql.execution.streaming.FileStreamSinkLog

import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try

        val sinkLog = new FileStreamSinkLog (
            1,
            spark,
            SPARK_METADATA_ROOT
        )
        val head = sinkLog.allFiles().head

        val deleteCommand = s"hadoop fs -rm ${head.path}"
        println (Try (deleteCommand ! processlogger).getOrElse(s""""$deleteCommand" failed""") )

        head.copy(action = FileStreamSinkLog.DELETE_ACTION)

        sinkLog
            .add (
                latestBatch.get._1+1,
                Array(head.copy(action = FileStreamSinkLog.DELETE_ACTION))
                )