4
votes

In our data pipelines ,we ingest CDC events from data-sources and write these changes into "incremental data" folder in AVRO format.

Then periodically, we run Spark jobs to merge this "incremental data" with our current version of the "snapshot table" (ORC format) to get the latest version of the upstream snapshot.

During this merge logic :

1) we load the "incremental data" as an DataFrame df1

2) load the current "snapshot table" as an DataFrame df2

3) merge df1 and df2 de-duplicating ids and taking the latest version of the rows (using update_timestamp column)

This logic loads the entire data for both "incremental data" and current "snapshot table" into Spark memory which can be quite huge depend on the database.

I noticed that in Delta Lake, similar operation is done using following code:

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

Here, the "updatesDF" can be considered our "incremental data" which coming from a CDC source.

My questions:

1) How does merge/upsert internally works? Does it load entire "updatedDF" and "/data/events/" into Spark memory?

2) If not, does it apply the delta changes something similar to Apache Hudi ?

3) During deduplication how this upsert logic knows to take the latest version of a record? Because I don't see any setting to specify the "update timestamp" column?

1

1 Answers

3
votes
   1) How does merge/upsert internally works? Does it load entire "updatedDF" and 
   "/data/events/" into Spark memory?

Nope, Spark does not need to load entire Delta DF it needs to update into memory. It wouldn't be scalable otherwise. Approach it takes is very similar to other jobs that Spark does - the whole table is split into multiple partitions transparently if the dataset is large enough (or you cloud create explicit partitioning). Then each partition is assigned a single task that makes up your merge job. Tasks can run on different Spark executors etc.

   2) If not, does it apply the delta changes something similar to Apache Hudi ?

I heard about Apache Hudi, but haven't looked at it. Internally, Delta looks like versioned parquet files. Changes to the table are stored as ordered, atomic units called commits. When you save a table - look at what files it has - it will have files like 000000.json, 000001.json, etc, and each of of them will reference a set of operations on underlying parquet files in subdirectories. For example, 000000.json will say that this version in time references parquet files 001 and 002, and 000001.json will say that this version in time shouldn't reference those two older parquet files, and only use parquet file 003.

   3) During deduplication how this upsert logic knows to take the latest version of a record? 
Because I don't see any setting to specify the "update timestamp" column?

By default it references most recent changeset. Timestamping is internal to how this versioning is implemented in Delta. You can reference an older snapshot though through AS OF syntax - see https://docs.databricks.com/delta/delta-batch.html#syntax