1
votes

I am working in Microsoft Azure Databricks environment using sparksql and pyspark. So I have a delta table on a lake where data is partitioned by say, file_date. Every partition contains files storing millions of records per day with no primary/unique key. All these records have a "status" column which can either contain values NULL (if everything looks good on that specific record) or Not null (say if a particular lookup mapping for a particular column is not found). Additionally, my process contains another folder called "mapping" which gets refreshed on a periodic basis, lets say nightly to make it simple, from where mappings are found.

On a daily basis, there is a good chance that about 100~200 rows get errored out (status column containing not null values). From these files, on a daily basis, (hence is the partition by file_date) , a downstream job pulls all the valid records and sends it for further processing ignoring those 100-200 errored records, waiting for the correct mapping file to be received. The downstream job, in addition to the valid status records, should also try and see if a mapping is found for the errored records and if present, take it down further as well (after of course, updating the data lake with the appropriate mapping and status).

What is the best way to go? The best way is to directly first update the delta table/lake with the correct mapping and update the status column to say "available_for_reprocessing" and my downstream job, pull the valid data for the day + pull the "available_for_reprocessing" data and after processing, update back with the status as "processed". But this seems to be super difficult using delta.

I was looking at "https://docs.databricks.com/delta/delta-update.html" and the update example there is just giving an example for a simple update with constants to update, not for updates from multiple tables.

The other but the most inefficient is, say pull ALL the data (both processed and errored) for the last say 30 days , get the mapping for the errored records and write the dataframe back into the delta lake using the replaceWhere option. This is super inefficient as we are reading everything (hunderds of millions of records) and writing everything back just to process say a 1000 records at the most. If you search for deltaTable = DeltaTable.forPath(spark, "/data/events/") at "https://docs.databricks.com/delta/delta-update.html", the example provided is for very simple updates. Without a unique key, it is impossible to update specific records as well. Can someone please help?

I use pyspark or can use sparksql but I am lost

2

2 Answers

1
votes

If you want to update 1 column ('status') on the condition that all lookups are now correct for rows where they weren't correct before (where 'status' is currently incorrect), I think UPDATE command along with EXISTS can help you solve this. It isn't mentioned in the update documentation, but it works both for delete and update operations, effectively allowing you to update/delete records on joins.

For your scenario I believe the sql command would look something like this:

UPDATE your_db.table_name AS a 
SET staus = 'correct'
  WHERE EXISTS 
  (
    SELECT * 
    FROM your_db.table_name AS b 
    JOIN lookup_table_1 AS t1 ON t1.lookup_column_a = b.lookup_column_a
    JOIN lookup_table_2 AS t2 ON t2.lookup_column_b = b.lookup_column_b
    -- ... add further lookups if needed
    WHERE
    b.staus = 'incorrect' AND
    a.lookup_column_a = b.lookup_column_a AND 
    a.lookup_column_b = b.lookup_column_b
  )
0
votes

Merge did the trick...

MERGE INTO deptdelta AS maindept USING updated_dept_location AS upddept ON upddept.dno = maindept.dno WHEN MATCHED THEN UPDATE SET maindept.dname = upddept.updated_name, maindept.location = upddept.updated_location