0
votes

I create a Stream in Snowflake on a table and created a task to move the data to a table. Even after the task is complete the data in the stream is not purging. Because of that the task is not getting skipped and keep reinserting data from stream to the table and the final table keeps on growing. What can be the reason? It was working yesterday but from today the stream is not purging even after a DML is executed using that stream using a task.

create or replace stream test_stream on table test_table_raw APPEND_ONLY = TRUE;
create or replace task test_task_task warehouse = test_warehouse
schedule = '1 minute'
when system$stream_has_data('test_stream') 
as insert into test_table
SELECT 
level1.FILE_NAME,
level1.FILE_ROWNUMBER,
GET(lvl, '@id')::string as app_id
FROM (SELECT FILE_NAME,FILE_ROWNUMBER,src:"$" as lvl FROM test_table_raw)  level1,
lateral FLATTEN(LVL:"$")  level2
where level2.value like '%<test %';

alter task test_task resume;

select 
(select count(*) from test_table) table_count,
(select count(*) from test_stream) stream_count;

TABLE_COUNT STREAM_COUNT
500             1
2
What SQL is the task running? It needs to include some sql to insert into another table / create another table and it cannot simply be a select on the stream from what I remember.Simon D
Hi SimonD, Please find the SQL in the above post.Kamalakannan Babuji

2 Answers

0
votes

Is the transaction committing; i.e. do you see the inserts or whatever the DML in the task using that stream is supposed to do happening? Any chance you can post the SQL.

Stream offset changes when a transaction where the stream is used commits. There is really no "purge" but the stream offset just moves forward so you don't see the same rows again.

Dinesh Kulkarni (PM, Snowflake)

0
votes

My bad! I am using the base table in the task instead of using the stream.

create or replace task test_task_task warehouse = test_warehouse
schedule = '1 minute'
when system$stream_has_data('test_stream') 
as insert into test_table
SELECT 
level1.FILE_NAME,
level1.FILE_ROWNUMBER,
GET(lvl, '@id')::string as app_id
FROM (SELECT FILE_NAME,FILE_ROWNUMBER,src:"$" as lvl FROM *test_table_raw* test_stream)  level1,
lateral FLATTEN(LVL:"$")  level2
where level2.value like '%<test %';