0
votes

TLDR: I have stream that gets consumed when I or a task I directly created issue a DML on it. But when it is a task created by a Stored Procedure, the stream does not get consumed.

I have a stream which behaves as expected and I can see it has data when I select on it: SELECT SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name');

Using the same role that created it I consume the stream:

INSERT INTO ANALYTICS_DB.schema.table
(stuff, last_checked, column_max)
SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
FROM (SELECT * FROM ANALYTICS_DB.schema.stream_name);

I SELECT again the SYSTEM$STREAM_HAS_DATA, all good it is consumed.

Now, I am bundling that into a task:

CREATE TASK IF NOT EXISTS ANALYTICS_DB.schema.table_test
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON * * * * * Etc/UTC'
            COMMENT = 'Checking when was the last time tables got updated'
        WHEN -- conditional check if the stream has new data
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS -- same previous query
    INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;

After a minute or so I check my stream, again, all good, it consumes the stream when running on the schedule.

The stored procedure to create the tasks.

My SQL part:

create PROCEDURE IF NOT EXISTS ANALYTICS_DB.schema.create_tasks()
    returns string
    language javascript
    EXECUTE AS CALLER
as

and the javascript part (trimmed to the important bits for the sake of readers). It runs fine, creates the tasks, the tasks run according to schedule, the queries are issued but the stream is not consumed. Therefore my max() calculation is done on an ever growing table.

$$
// trimmed some stuff here getting the data

    while (result_set.next())
    {
        var lagschema = result_set.getColumnValue(1);
        var lagtable = result_set.getColumnValue(2);
        var lagcolumn = result_set.getColumnValue(3);

        var sql_task = `CREATE TASK IF NOT EXISTS schema.ppw_freshness_schema.stream_name
            WAREHOUSE = wh
            SCHEDULE = 'USING CRON */5 * * * * Etc/UTC'
            COMMENT = 'Checking when was the last update'
        WHEN 
            SYSTEM$STREAM_HAS_DATA('ANALYTICS_DB.schema.stream_name')
        AS
                INSERT INTO ANALYTICS_DB.schema.table
    (stuff, last_checked, column_max)
    SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT)
    FROM FROM ANALYTICS_DB.schema.stream_name;`;

        var create_task = snowflake.createStatement({sqlText: sql_task});
        create_task.execute();
        var start_task = snowflake.createStatement({sqlText: `ALTER TASK IF EXISTS schema.ppw_freshness_schema.stream_name RESUME;`});
        start_task.execute();
    }
// error handling
    $$;

See below how the task I created via a Stored Procedure is running every single time, because it never empties the stream. As soon as I manually create the same task, it can empty the stream, and finally skip runs when there is no new data (which is the wanted behavior).

task_runs

1
Is data being inserted from the task that was generated from the SP or is it simply executing successfully without error, but no data is being inserted?Mike Walton
No error, data is being inserted in all cases. Only difference: when I run the insert myself it consumes the stream, when it runs in a task created by a standard procedure, it doesn't consume the thread (it reads from it though, all fine, it just doesn't move the stream position). This is problematic because this means I am performing a few operations on an ever growing stream, instead of a more limited data if it had been consumed, so it gets costlier...Pedro MS
I understand why its a problem. I was just trying to think of things that might be happening that you were unaware of to explain the issue. Might be worth opening a support ticket with Snowflake (provide a query_id that they can review).Mike Walton
Out of curiosity, why are you using a sub-select in your INSERT statement? INSERT INTO ANALYTICS_DB.schema.table (stuff, last_checked, column_max) SELECT 'RECORD_MODIFIED_AT', current_timestamp, max(RECORD_MODIFIED_AT) FROM ANALYTICS_DB.schema.stream_name; should work just as well, and I wonder if there is something to it.Mike Walton
thanks for the suggestion, will do and report back. For the sub-select, I had this theory that for the stream to properly consume, I'd need to explicitly display all rows before inserting. I think I'd need to go back to it tomorrow and validate that again, doesn't make much sense now...Pedro MS

1 Answers

0
votes

Nothing could have given a clue about the problem here as it lied in the naming of the stream itself. So entirely my mistake. On top of this, the tests I was running were using a very active table so it dissimulated the fact that the stream was actually performing as expected.