I have a bronze level delta lake table(events_bronze) at location "/mnt/events-bronze" to which data is streamed from kafka. Now I want to be able to stream from this table and update using "foreachBatch" into a silver table(events_silver". This can be achieved using bronze table as a source. However, during the initial run since events_silver doesn't exist, I keep getting error saying Delta table doesn't exist which is obvious. So how do I go about creating events_silver which has the same structure as events_bronze? I couldn't find a DDL to do the same.
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
.merge(
microBatchOutputDF.as("bronze"),
"silver.id=bronze.id")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
events_bronze
.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
During initial run, the problem is that there is no delta lake table defined for path "/mnt/events-silver". I'm not sure how to create it having the same structure as "/mnt/events-bronze" for the first run.