0
votes

I am trying to write a tool to load CSV files into multiple database. While trying to search online how to use COPY command with Snowflake, I couldn't find information how to do it in Java. This is how I do with PostgreSQL

public void loadData(Message message) throws Exception {
    try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
            message.getUser(), message.password)) {
        loadDataWithMode(loadRequest, (BaseConnection) connection);
    } catch (Throwable error){
        throw error;   
    }
}


public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
    CopyManager copyManager = new CopyManager(connection);
    String fields = message.getFields();
    final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
        try (InputStream input = fileService.load(loadRequest.getFilePath())) {
            try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
                copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN 
            }
        }
 }

I'm not familiar with Snowflake how to do it, any help will be appreciated.

1
As @SrinathMenon says, this seems to be the answer you want: stackoverflow.com/a/62119754/132438Felipe Hoffa

1 Answers

0
votes
public void loadDataWithMode(Message message, Connection connection) throws Exception {
        String fields = message.getFields();
        final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
            LOG.info("about to copy data into table: " + targetTableWithFields);
            try (Statement statement = connection.createStatement()) {

            final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
            final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
            try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
                final String createStage = buildCreateStageStatement();
                LOG.info("Executing sql:{}", createStage);
                stmt.execute(createStage);
                LOG.info("Create stage was successfully executed");
                snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
                LOG.info("Upload stream was successfully executed");
                stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
                LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
                final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
                String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + "   ", message.getTableName(), fields, tempFile.getName());
                LOG.info("Executing sql:{}", sql);
                stmt.execute(sql);
            }
        connection.commit();
        LOG.info("data was successfully copied " + targetTableWithFields);
    }
}

private String buildCreateStageStatement() {
    return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}