11
votes

Need an elegant way to rollback Delta Lake to a previous version.

My current approach is listed below:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

This is ugly though, as the whole data set need to be rewritten. It seems that some meta update would be sufficient and no data I/O should be necessary. Anyone knows a better approach for this?

5
I agree this is not an ideal solution, but given that overwriting a large data set with partitions could be expensive, this easy solution could be helpful.Fang Zhang

5 Answers

4
votes

Here is a brutal solution. It is not ideal, but given that overwriting a large data set with partitions could be expensive, this easy solution could be helpful.

If you are not very sensitive to updates after the desired rollback time, simply remove all version files in _delta_log that are later than the rollback time. Unreferenced files could be released later using vacuum.

Another solution that preserves the full history is to 1) deltaTable.delete 2) Copy all logs up to the rollback sequentially (with increasing version number) to the end of the delete log file. This mimics the creation of the delta lake up to the rollback date. But it is surely not pretty.

4
votes

As of Delta Lake 0.7.0, you can rollback to an earlier version of your Delta Lake table using the RESTORE command. This is a much simpler way to use time travel to roll back your tables.

Scala:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

Python:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL:

RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0

You can also use the restoreToTimestamp command if you'd prefer to do things that way instead. Read the documentation for more details.

2
votes

If your goal is to fix wrong data and you are not very sensitive to updates you can replace an interval of time.

 df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")
1
votes

You should use the time travel feature: https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

You read the data as at a timestamp:

val inputPath = "/path/to/my/table@20190101000000000"

And then overwrite the existing data with the "rolled back" version.

With regards to it being ugly, I'm not sure I can help. You could limit the data using partitioning. Or you could work out which records have changed and only overwrite them.

1
votes

I had faced similar kind of problem with Delta, where I had been calling multiple dml operations in 1 transaction. e.g. I have had a requirement of calling merge and then delete in 1 single transaction. So, in this case, either both of them have to be successful together or rollback the state if any of them fails.

To solve the problem, I had taken the back up of _delta_log (let's call it stable state) directory before transaction starts. If both the DML operations in the transaction are successful then discard the previous stable state and use the new state committed in _delta_log, in case if any of the dml operation fails then just replace the _delta_log directory with the stable state which you took the backup before starting the transaction. Once replaced with a stable state, then just run the vacuum to remove stale files that may have been created during the transaction.