0
votes

I am trying to execute the below snowflake stored procedure:

data is initially loaded into a table in staging database. From the staging table, I am loading data into a temp table before swapping the main table with the temp table & this could be seen in the method: temp_table_insert_sql

Once this is successfully completed, the swapping is performed in the method: getSwapTableStmt.

Below is the stored procedure I am trying to execute.

CREATE OR REPLACE PROCEDURE "DBNAME"."SCHEMA"."FULL_LOAD_WITH_RETRY_FACILITY"(RETRY_CNT FLOAT, MIN_WAIT_SECOND FLOAT, MAX_WAIT_SECOND FLOAT, TABLE_NAME VARCHAR, TEMPDB VARCHAR, TEMPSCHEMA VARCHAR, TARGETDB VARCHAR, TARGETSCHEMA VARCHAR)
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS $$

    function getRandom(min,max){
        return Math.floor(Math.random()*(max-min+1))+min;
    }

    function create_temp_table(table_name) {
        var create_temp_table = `CREATE OR REPLACE TABLE ${TEMPDB}.${TEMPSCHEMA}.${table_name}_TEMP like ${TARGETDB}.${TARGETSCHEMA}.${table_name};`;
        return create_temp_table;
    }

    function retry(MIN_WAIT_SECOND, MAX_WAIT_SECOND, retry_cnt, stmt) {
        for (cnt=0; cnt<retry_cnt; cnt++) {
            try {
                var result = stmt.execute();
                break;
            } catch (err) {
                var get_error_queryid_result = snowflake.createStatement( { sqlText:`SELECT LAST_QUERY_ID();` } ).execute();
                get_error_queryid_result.next();
                var queryId = String( get_error_queryid_result.getColumnValue(1) ).trim();
                snowflake.createStatement( { sqlText: `SELECT SYSTEM$WAIT(${getRandom(MIN_WAIT_SECOND,MAX_WAIT_SECOND)});` } ).execute();
                var err_msg = err.message;
                if(!(err_msg.includes("locked")) | (cnt==(retry_cnt-1)) ){
                    err.stackTraceTxt += ` QueryId: ${queryId}`;
                    throw err;
                }
            }
        }
        return result;
    }

    function temp_table_insert_sql(table_name) {
        var select_columns = "";
        var insert_columns = "";
        var sql = `select column_name from ${TARGETDB}.INFORMATION_SCHEMA.COLUMNS where true and table_schema = '${TARGETSCHEMA}' and table_name = '${table_name}' order by ordinal_position;`;
        var result = snowflake.createStatement( { sqlText: sql } ).execute();
        if (result.next()) {
            insert_columns = `( ${String(result.getColumnValue(1).trim)}`;
            select_columns = `select ${String(result.getColumnValue(1).trim)}`;
            while(result.next) {
                insert_columns += `, ${String(result.getColumnValue(1).trim)}`;
                select_columns += `, ${String(result.getColumnValue(1).trim)}`;
            }
            insert_columns += `)`;
        } else {
            insert_columns = "";
        }
        return `CREATE OR REPLACE TABLE ${TEMPDB}.${TEMPSCHEMA}.${table_name}_TEMP AS ${select_columns} from ${TEMPDB}.${TEMPSCHEMA}.${table_name}_STG`;
    }

    function get_row_count(db, SCHEMA, table_name) {
        return `select count(*) from ${db}.${schema}.${table_name};`;
    }

    function getSwapTableStmt(table_name){
        return `alter table if exists ${TARGETDB}.${TARGETSCHEMA}.${table_name} swap with ${TEMPDB}.${TEMPSCHEMA}.${table_name}_TEMP;`;
    }
    
    try {
        var timest = new Date().getTime();
        var temp_row_count = 0;
        var stg_row_count = 0;
        var apptrace = "";
        var logs = "";
        var result = "";
        var sql = "";

        var create_temp_ddl = create_temp_table(TABLE_NAME);
        logs = create_temp_ddl;
        result = retry(MIN_WAIT_SECOND, MAX_WAIT_SECOND, RETRY_CNT, snowflake.createStatement({sqlText: create_temp_ddl}));

        var insert_into_temp_table = temp_table_insert_sql(TABLE_NAME);
        result = retry(MIN_WAIT_SECOND, MAX_WAIT_SECOND, RETRY_CNT, snowflake.createStatement({ sqlText: insert_into_temp_table }));
        
        var temprows = get_row_count(TEMPDB, TABLE_NAME + "_TEMP");
        result = retry(MIN_WAIT_SECOND, MAX_WAIT_SECOND, RETRY_CNT, snowflake.createStatement({ sqlText: temprows }));
        if (result.next()) {
            temp_row_count;
        } else {
            var err = new Error("rows inserted is zero");
            throw err;
        }

        var stgrows = get_row_count(TEMPDB, TABLE_NAME + "_STG");
        result = retry(MIN_WAIT_SECOND, MAX_WAIT_SECOND, RETRY_CNT, snowflake.createStatement({ sqlText: stgrows }));
        if (result.next()) {
            stg_row_count = parseInt(result.getColumnValue(1));
        } else {
            var err = new Error("rows inserted is zero");
            throw err;
        }

        logs = `SWAP ${TABLE_NAME}_TEMP ${TABLE_NAME};`;
        if(stg_row_count == temp_row_count) {
            sql = getSwapTableStmt(table_name);
        } else {
            var err = new Error(`rows of ${TABLE_NAME}_STG & ${TABLE_NAME}_TEMP are different`);
            throw err;
        }
        var duration = (new Date().getTime() - timest) / 1000;
        apptrace = `{
            "application_name": "FULL_LOAD"
            ,"feature_name": "exchange|3.0|SESSION|FILE|${TABLE_NAME}_STG|${TABLE_NAME}"
            ,"event_subtype": "Metric"
            ,"metrics": [
                {
                    "metric":"Execution_Result"
                    ,"measurement":1
                    ,"unit_of_measure":"Boolean"
                }
                ,{
                    "metric": "Execution_Duration"
                    ,"measurement": ${duration}
                    ,"unit_of_measure": "Seconds"
                }
                ,{
                    "metric": "Rows"
                    ,"measurement": ${temp_row_count}
                    ,"unit_of_measure": "Rows"
                }
            ]
        }`;
    } catch (err)  {
    logs += "  Code: " + err.code + "  State: " + err.state;
    logs += "  Message: " + err.message;
    apptrace = `{
        "application_name": "FULL_LOAD"
        ,"feature_name": "exchange|3.0|SESSION|FILE|${TABLE_NAME}_STG|${TABLE_NAME}|${logs}"
        ,"event_subtype": "Metric"
        ,"metrics": [
            {
                "metric":"Execution_Result"
                ,"measurement":0
                ,"unit_of_measure":"Boolean"
            }
        ]
    }`;
    }
    // COMMIT
    snowflake.execute (
        {sqlText: `COMMIT;`}
    );
    return apptrace;
$$
;

This is how I am calling the stored procedure:

CALL DBNAME.SCHEMANAME.PROCNAME(2, 1000, 5000, 'TABLENAME', 'TEMPDB', 'TEMPSCHEMA', 'TARGETDB ', 'TARGETSCHEMA');

Once I submit the above statement, I get an error:

org.jkiss.dbeaver.model.sql.DBSQLException: SQL Error [100176] [P0000]: JavaScript out of memory error: UDF thread memory limit exceeded

at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCPreparedStatementImpl.executeStatement(JDBCPreparedStatementImpl.java:208)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.executeStatement(SQLQueryJob.java:492)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.lambda$0(SQLQueryJob.java:427)
at org.jkiss.dbeaver.model.exec.DBExecUtils.tryExecuteRecover(DBExecUtils.java:170)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.executeSingleQuery(SQLQueryJob.java:419)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.extractData(SQLQueryJob.java:779)
at org.jkiss.dbeaver.ui.editors.sql.SQLEditor$QueryResultsContainer.readData(SQLEditor.java:2973)
at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.lambda$0(ResultSetJobDataRead.java:111)
at org.jkiss.dbeaver.model.exec.DBExecUtils.tryExecuteRecover(DBExecUtils.java:170)
at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.run(ResultSetJobDataRead.java:109)
at org.jkiss.dbeaver.ui.controls.resultset.ResultSetViewer$17.run(ResultSetViewer.java:3584)
at org.jkiss.dbeaver.model.runtime.AbstractJob.run(AbstractJob.java:104)
at org.eclipse.core.internal.jobs.Worker.run(Worker.java:63)

Caused by: net.snowflake.client.jdbc.SnowflakeSQLException: JavaScript out of memory error: UDF thread memory limit exceeded

at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowExceptionSub(SnowflakeUtil.java:153)
at net.snowflake.client.jdbc.SnowflakeUtil.checkErrorAndThrowException(SnowflakeUtil.java:77)
at net.snowflake.client.core.StmtUtil.pollForOutput(StmtUtil.java:503)
at net.snowflake.client.core.StmtUtil.execute(StmtUtil.java:380)
at net.snowflake.client.core.SFStatement.executeHelper(SFStatement.java:582)
at net.snowflake.client.core.SFStatement.executeQueryInternal(SFStatement.java:266)
at net.snowflake.client.core.SFStatement.executeQuery(SFStatement.java:202)
at net.snowflake.client.core.SFStatement.execute(SFStatement.java:877)
at net.snowflake.client.jdbc.SnowflakeStatementV1.executeInternal(SnowflakeStatementV1.java:331)
at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.execute(SnowflakePreparedStatementV1.java:535)
at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCPreparedStatementImpl.execute(JDBCPreparedStatementImpl.java:261)
at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCPreparedStatementImpl.executeStatement(JDBCPreparedStatementImpl.java:205)
... 12 more

The database I am running this stored procedure is Snowflake.

This is the first time I am working on a stored procedure and on java script. I don't understand what is causing this error and how to even analyze this error. Could anyone let me know how can I fix this issue ? Any help is really appreciated.

2

2 Answers

1
votes

There is a limit on the memory that can be consumed by JavaScript UDFS in Snowflake: https://docs.snowflake.com/en/sql-reference/udf-js.html#memory

If you try to simplify your query or in general split one transaction into several, the error probably won't appear. (This helped me in one case)

Also maybe this parameter may help you: https://docs.snowflake.com/en/sql-reference/parameters.html#client-memory-limit

0
votes

After debugging the issue, I found out that the while loop in the code is going infinite and resulting in Out Of Memory Exception. After I fixed the loop, the error is resolved.