0
votes

I'm investigating taking data from Kafka -> SnowFlake/Kafka connector -> SnowFlake. Unfortunately, the connector seems to use just two columns (and put the entire JSON payload into a single column). So I created a stream/task to periodically copy data from the landing table to the destination table (using insert). Everything works beautifully except for deleting data in the landing table once it has landed in the destination table. Using streams, I know what has landed. How do I delete the rest of the data? Truncate seems so much faster. Do I just periodically run a delete task that deletes these entries? I am also concerned about warehouse time to perform these deletes. Thanks

1
Did you ever find a solution that worked for you? If so, can you share?Looter

1 Answers

0
votes

For a use case where multiple statements(like insert, delete, etc) to access the same change records, surround them in explicit transaction statement(Begin..Commit) which will lock stream.

You can have an additional column like a Flag, lock stream using Begin, use the stream to insert to target table from staging, use stream to perform a second merge to staging table to mark the column Flag.

https://docs.snowflake.com/en/user-guide/streams.html#examples

begin;

select * from <stream>;

insert into target_table select columns from <stream> where metadata$action='INSERT' and flag=0;

merge into staging_table st
using (
select column
  from stream
  where flag = 0) sc
  on st.column=sc.column
  when matched then update set st.flag=1;
commit;
delete from staging_table where flag=1;