2
votes

I am trying to use CockroachDB (v2.0.6) as a sink for one of my Kafka topics.

I wasn't able to find any Kafka connector specifically for CockroachDB so I decided to use the jdbc sink connector from Confluent since CockroachDB supports the postgreSQL syntax.

The connection string that I use on Kafka Connect is the following

"connection.url": "jdbc:postgresql://roach1:26257/mydb?sslmode=disable"

which basically is the only thing I changed on an existing working Postgres sink connector.

Unfortunately I was unable to make it work since the connector fails with an error

Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: syntax error at or near "."
  Detail: source SQL:
SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM,   ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME,   (i.keys).n AS KEY_SEQ, ci.relname AS PK_NAME FROM pg_catalog.pg_class ct   JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid)   JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid)   JOIN (SELECT i.indexrelid, i.indrelid, i.indisprimary,              information_schema._pg_expandarray(i.indkey) AS keys         FROM pg_catalog.pg_index i) i     ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid)   JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true  AND ct.relname = 'my_topic' AND i.indisprimary  ORDER BY table_name, pk_name, key_seq
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
    ... 10 more
Caused by: java.sql.SQLException: org.postgresql.util.PSQLException: ERROR: syntax error at or near "."
  Detail: source SQL:
SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM,   ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME,   (i.keys).n AS KEY_SEQ, ci.relname AS PK_NAME FROM pg_catalog.pg_class ct   JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid)   JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid)   JOIN (SELECT i.indexrelid, i.indrelid, i.indisprimary,              information_schema._pg_expandarray(i.indkey) AS keys         FROM pg_catalog.pg_index i) i     ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid)   JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true  AND ct.relname = 'collect_flow_tracking' AND i.indisprimary  ORDER BY table_name, pk_name, key_seq 

So my question is, has anyone used Kafka Connect with CockroachDB successfully ? Also does anyone have any pointers on this error (what causes it) and how to circumvent it and make this work ?

1

1 Answers

2
votes

CockroachDB PM here. It looks like the problem is an unsupported database introspection query performed by the Kafka Connect Postgres connector. The good news is that this particular query does appear to be supported by CockroachDB 2.1. Can you try again using the latest CockroachDB beta?