0
votes

I am trying to establish a JDBC (Postgres) Source to Snowflake (JDBC compliant) Sink pipeline. In order to integrate Snowflake I needed to patch the OOB JDBC Sink app according to the guide https://docs.spring.io/spring-cloud-stream-app-starters/docs/Einstein.SR6/reference/htmlsingle/#_patching_pre_built_applications Which I did and registered on my local SCDF stack.

However I do have "binding errors" at the Sink side of this simple equation.

The stacktrace is:

2020-06-22 13:19:15.762  INFO 17866 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-3, groupId=poc-stream-CCABUxRjhfLYIJvcqdRl] Resetting offset for partition poc-stream-CCABUxRjhfLYIJvcqdRl.source-0 to offset 0.
2020-06-22 13:19:15.767  INFO 17866 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [poc-stream-CCABUxRjhfLYIJvcqdRl.source-0]
2020-06-22 13:19:15.774  INFO 17866 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 28782 (http) with context path ''
2020-06-22 13:19:15.775  INFO 17866 --- [           main] c.u.p.s.s.SnowflakeSinkApplication       : Started SnowflakeSinkApplication in 10.866 seconds (JVM running for 11.581)
2020-06-22 13:19:51.914  WARN 17866 --- [container-0-C-1] com.zaxxer.hikari.pool.ProxyConnection   : HikariPool-1 - Connection net.snowflake.client.jdbc.SnowflakeConnectionV1@5b8398d7 marked as broken because of SQLSTATE(0A000), ErrorCode(200018)
net.snowflake.client.jdbc.SnowflakeSQLException: **Data type not supported for binding: Object type: class [B.** at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.setObject(SnowflakePreparedStatementV1.java:526) ~[snowflake-jdbc-3.12.7.jar!/:3.12.7]
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.setObject(HikariProxyPreparedStatement.java) ~[HikariCP-3.2.0.jar!/:na]

Here is my Stream definition:

source: jdbc --trigger.time-unit=SECONDS --spring.datasource.username='' --trigger.fixed-delay=1 --spring.datasource.url='' --jdbc.query='SELECT * FROM public.source_table_1 WHERE active=false' --jdbc.update='UPDATE public.source_table_1 SET active=true WHERE id in (:id)' --spring.cloud.stream.bindings.output.contentType=text/plain --spring.datasource.password='*****' | snowflake-sink --jdbc.columns=id:payload,first_name:payload,last_name:payload --spring.datasource.driver-class-name=net.snowflake.client.jdbc.SnowflakeDriver --spring.cloud.stream.bindings.input.contentType=text/plain --jdbc.table-name=target_table_1*

Chaning the jdbc.columns as:

--jdbc.columns=id,first_name,last_name

did not help either.

My target table at SnowFlake:

CREATE TABLE target_table_1 ( id bigint NOT NULL, first_name varchar, last_name varchar, primary key (id) ) Source table DDL is the same

Any help/pointers are appreciated.

1

1 Answers

0
votes

Replying to myself might also be useful for the others in the universe:

Added contentType property for both source and sink. Also declared the jdbc.columns property as shown below (as opposed to the ref. guide - https://github.com/spring-cloud-stream-app-starters/jdbc/tree/master/spring-cloud-starter-stream-sink-jdbc

@Bean

public StreamApplication jdbcSource() {

    return new StreamApplication("source: jdbc")
            .addProperties(Map.of("jdbc.query", "'SELECT * FROM public.source_table_1 WHERE imported=false'"))
            .addProperties(Map.of("jdbc.update", "'UPDATE public.source_table_1 SET imported=true WHERE id in (:id)'"))
            .addProperties(Map.of("spring.datasource.username", "postgres"))
            .addProperties(Map.of("spring.datasource.password", "mysecretpassword"))
            .addProperties(Map.of("spring.datasource.url", "'jdbc:postgresql://localhost:5432/content_types'"))
            .addProperties(Map.of("trigger.time-unit", "SECONDS"))
            .addProperties(Map.of("trigger.fixed-delay", "1"))
            .addProperties(Map.of("spring.cloud.stream.bindings.output.contentType", "application/json"));
}

@Bean
public StreamApplication sfSink() {

    return new StreamApplication("snowflake-sink")
            .addProperties(Map.of("jdbc.table-name", "target_table_1"))
            .addProperties(Map.of("jdbc.initialize", "true"))
            .addProperties(Map.of("spring.cloud.stream.bindings.input.contentType", "application/json"))
            .addProperties(Map.of("jdbc.columns", "'id:new String(payload).id,first_name:new String(payload).first_name,last_name:new String(payload).last_name'"))
            .addProperties(Map.of("spring.datasource.driver-class-name", "net.snowflake.client.jdbc.SnowflakeDriver"));
}