In our data pipelines ,we ingest CDC events from data-sources and write these changes into "incremental data" folder in AVRO format.
Then periodically, we run Spark jobs to merge this "incremental data" with our current version of the "snapshot table" (ORC format) to get the latest version of the upstream snapshot.
During this merge logic :
1) we load the "incremental data" as an DataFrame df1
2) load the current "snapshot table" as an DataFrame df2
3) merge df1 and df2 de-duplicating ids and taking the latest version of the rows (using update_timestamp column)
This logic loads the entire data for both "incremental data" and current "snapshot table" into Spark memory which can be quite huge depend on the database.
I noticed that in Delta Lake, similar operation is done using following code:
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // define the updates DataFrame[date, eventId, data]
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId")
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Here, the "updatesDF" can be considered our "incremental data" which coming from a CDC source.
My questions:
1) How does merge/upsert internally works? Does it load entire "updatedDF" and "/data/events/" into Spark memory?
2) If not, does it apply the delta changes something similar to Apache Hudi ?
3) During deduplication how this upsert logic knows to take the latest version of a record? Because I don't see any setting to specify the "update timestamp" column?