2
votes

what is the depth with which schema evolution works while merging?

Automatic schema evolution does not work while merging in the following case.

import json
d1 = {'a':'b','b':{'c':{'1':1}}}
d2 = {'a':'s','b':{'c':{'1':2,'2':2}}}
d3 = {'a':'v','b':{'c':{'1':4}}}

df1 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d1)]))

#passes
df1.write.saveAsTable('test_table4',format='delta',mode='overwrite', path=f"hdfs://hdmaster:9000/dest/test_table4")


df2 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d2)]))
df2.createOrReplaceTempView('updates')

query = """
MERGE INTO test_table4 existing_records 
        USING updates updates 
        ON existing_records.a=updates.a
        WHEN MATCHED THEN UPDATE SET * 
        WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #passes



df3 = spark.read.json(spark.sparkContext.parallelize([json.dumps(d3)]))

df3.createOrReplaceTempView('updates')
query = """
MERGE INTO test_table4 existing_records 
        USING updates updates 
        ON existing_records.a=updates.a
        WHEN MATCHED THEN UPDATE SET * 
        WHEN NOT MATCHED THEN INSERT *
"""
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql(query) #FAILS #FAILS

This looks like failing when depth is more than 2 and incoming df has columns missing. Is this intentionally like this? This is handled perfectly with option("mergeSchema", "true") if want to append. But I want to UPSERT the data. But Merge is not able to handle this schema change

Using Delta Lake version 0.8.0

1

1 Answers

1
votes

In Delta 0.8, this should be regulated by setting spark.databricks.delta.schema.autoMerge.enabled to true, in addition to the mergeSchema that is more for append mode.

See the Delta 0.8 announcement blog post for more details of this feature.