
I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.

When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution.

I set the Spark property set("spark.files.overwrite","true") , but there is no luck.

How to overwrite or Predelete the files from spark?

set("spark.files.overwrite","true") works only for files added throught spark.addFile()aiman

UPDATE: Suggest using Dataframes, plus something like ... .write.mode(SaveMode.Overwrite) ....

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._

For older versions try

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

In 1.1.0 you can set conf settings using the spark-submit script with the --conf flag.

WARNING (older versions): According to @piggybox there is a bug in Spark where it will only overwrite files it needs to to write it's part- files, any other files will be left unremoved.


since df.save(path, source, mode) is deprecated, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

use df.write.format(source).mode("overwrite").save(path)
where df.write is DataFrameWriter

'source' can be ("com.databricks.spark.avro" | "parquet" | "json")


The documentation for the parameter spark.files.overwrite says this: "Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source." So it has no effect on saveAsTextFiles method.

You could do this before saving the file:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

From the pyspark.sql.DataFrame.save documentation (currently at 1.3.1), you can specify mode='overwrite' when saving a DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

I've verified that this will even remove left over partition files. So if you had say 10 partitions/files originally, but then overwrote the folder with a DataFrame that only had 6 partitions, the resulting folder will have the 6 partitions/files.

See the Spark SQL documentation for more information about the mode options.


df.write.mode('overwrite').parquet("/output/folder/path") works if you want to overwrite a parquet file using python. This is in spark 1.6.2. API may be different in later versions

  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)

This overloaded version of the save function works for me:

yourDF.save(outputPath, org.apache.spark.sql.SaveMode.valueOf("Overwrite"))

The example above would overwrite an existing folder. The savemode can take these parameters as well (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):

Append: Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

ErrorIfExists: ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.


If you are willing to use your own custom output format, you would be able to get the desired behaviour with RDD as well.

Have a look at the following classes: FileOutputFormat, FileOutputCommitter

In file output format you have a method named checkOutputSpecs, which is checking whether the output directory exists. In FileOutputCommitter you have the commitJob which is usually transferring data from the temporary directory to its final place.

I wasn't able to verify it yet (would do it, as soon as I have few free minutes) but theoretically: If I extend FileOutputFormat and override checkOutputSpecs to a method that doesn't throw exception on directory already exists, and adjust the commitJob method of my custom output committer to perform which ever logic that I want (e.g. Override some of the files, append others) than I may be able to achieve the desired behaviour with RDDs as well.

The output format is passed to: saveAsNewAPIHadoopFile (which is the method saveAsTextFile called as well to actually save the files). And the Output committer is configured at the application level.


Spark – Overwrite the output directory:

Spark by default doesn’t overwrite the output directory on S3, HDFS, and any other file systems, when you try to write the DataFrame contents to an existing directory, Spark returns runtime error hence. To overcome this Spark provides an enumeration org.apache.spark.sql.SaveMode.Overwrite to overwrite the existing folder.

We need to use this Overwrite as an argument to mode() function of the DataFrameWrite class, for example.

df. write.mode(SaveMode.Overwrite).csv("/tmp/out/foldername")

or you can use the overwrite string.


Besides Overwrite, SaveMode also offers other modes like SaveMode.Append, SaveMode.ErrorIfExists and SaveMode.Ignore

For older versions of Spark, you can use the following to overwrite the output directory with the RDD contents.

sparkConf.set("spark.hadoop.validateOutputSpecs", "false") val sparkContext = SparkContext(sparkConf)