0
votes

I have a bronze level delta lake table(events_bronze) at location "/mnt/events-bronze" to which data is streamed from kafka. Now I want to be able to stream from this table and update using "foreachBatch" into a silver table(events_silver". This can be achieved using bronze table as a source. However, during the initial run since events_silver doesn't exist, I keep getting error saying Delta table doesn't exist which is obvious. So how do I go about creating events_silver which has the same structure as events_bronze? I couldn't find a DDL to do the same.

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  DeltaTable.forPath(spark, "/mnt/events-silver").as("silver")
    .merge(
      microBatchOutputDF.as("bronze"),
      "silver.id=bronze.id")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}
 events_bronze
      .writeStream
      .trigger(Trigger.ProcessingTime("120 seconds"))
      .format("delta")
      .foreachBatch(upsertToDelta _)
      .outputMode("update")
      .start()

During initial run, the problem is that there is no delta lake table defined for path "/mnt/events-silver". I'm not sure how to create it having the same structure as "/mnt/events-bronze" for the first run.

4
Hey @Vikas Did below solution work for you ?Manish

4 Answers

4
votes

Before starting stream write/merge, check whether table is already exists. If not create one using empty dataframe & schema (of events_bronze)

  val exists = DeltaTable.isDeltaTable("/mnt/events-silver")

  if (!exists) {
    val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], <schema of events_bronze>)
    emptyDF
      .write
      .format("delta")
      .mode(SaveMode.Overwrite)
      .save("/mnt/events-silver")
  }

Table(delta lake metadata) will get created only one time at the start and if it doesn't exist. In case of job restart and all, it will be present & skip table creation

1
votes

Here's a pyspark example:

from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from delta.tables import DeltaTable

basePath = 'abfss://stage2@your_storage_account_name.dfs.core.windows.net'
schema = StructType([StructField('SignalType', StringType()),StructField('StartTime', TimestampType())])

if not DeltaTable.isDeltaTable(spark, basePath + '/tutorial_01/test1'):
    emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
    emptyDF.write.format('delta').mode('overwrite').save(basePath + '/tutorial_01/test1')
1
votes

As of release 1.0.0 of Delta Lake, the method DeltaTable.createIfNotExists() was added (Evolving API).

In your example DeltaTable.forPath(spark, "/mnt/events-silver") can be replaced with:

DeltaTable.createIfNotExists(spark)
  .location("/mnt/events-silver")
  .addColumns(microBatchOutputDF.schema)
  .execute
0
votes

You can check the table using spark SQL. First run below on spark SQL, which will give table definition of bronze table :

spark.sql("show create table event_bronze").show

After getting the DDL just change the location to silver table's path and run that statement is spark SQL.

Note: Use "create table if not exists..." as it will not fail in concurrent runs.