3
votes

I tried to set up a database with a table with timestamp column . I am trying to implement timestamp mode to capture incremental changes in the DB.

But kafka-connect-jdbc is not reading any data from the table. Here is what I have done.

Created a table.

sqlite> CREATE TABLE test_timestamp(id integer primary key not null,
   ...>                   payment_type text not null,
   ...>                   Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
   ...>                   user_id int not null);
sqlite> INSERT INTO test_timestamp (ID, PAYMENT_TYPE, USER_ID) VALUES (3,'FOO',1);
sqlite> select * from test_timestamp;
3|FOO|2019-06-18 05:31:22|1

My jdbc-source connector configuration is as follows:

$ curl -s "http://localhost:8083/connectors/jdbc-source/config"|jq '.'
{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "timestamp",
  "topic.prefix": "testdb-",
  "validate.non.null": "false",
  "tasks.max": "1",
  "name": "jdbc-source",
  "connection.url": "jdbc:sqlite:/tmp/test.db"
}

jdbc-source-connector successfully loads and topic is created

$ kafka-topics --list --bootstrap-server localhost:9092
..
testdb-test_timestamp

But no data appears in the topic.

Any help ?

Thanks in advance.

2
Can you try to include "query": "select * from test_timestamp" in the configuration? - Giorgos Myrianthous
Yes. I have added this in the configuration file but have no luck. - kurian
Looks like this can help you stackoverflow.com/questions/54518763/… Also your timestamp column name is "Timestamp" and not "timestamp", try changing that. - suraj_fale

2 Answers

0
votes

You are hitting a known issue, detailed here: https://github.com/confluentinc/kafka-connect-jdbc/issues/219

Steps to reproduce:

  1. Create database:

    $ echo 'DROP TABLE test_timestamp; CREATE TABLE test_timestamp(id integer primary key not null,
                     payment_type text not null,
                     Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                     user_id int not null);
    INSERT INTO test_timestamp (ID, PAYMENT_TYPE, USER_ID) VALUES (3,\'FOO\',1);
    select * from test_timestamp;' | sqlite3 /tmp/test.db
    3|FOO|2019-07-03 08:28:43|1
    
  2. Create connector

    curl -X PUT "http://localhost:8083/connectors/jdbc-source/config" -H  "Content-Type:application/json"  -d '{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "mode": "timestamp",
    "timestamp.column.name": "timestamp",
    "topic.prefix": "testdb-",
    "validate.non.null": "false",
    "tasks.max": "1",
    "name": "jdbc-source",
    "connection.url": "jdbc:sqlite:/tmp/test.db"
    }'
    
  3. Check status of connector

    $ curl -s "http://localhost:8083/connectors"| \
    jq '.[]'| \
    xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
    jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
    column -s : -t| sed 's/\"//g'| sort
    jdbc-source  |  RUNNING  |  RUNNING
    
  4. Check Kafka Connect worker log, observe error parsing date, per issue 219

    [2019-07-03 10:40:58,260] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="test_timestamp", query='null', topicPrefix='testdb-', incrementingColumn='', timestampColumns=[timestamp]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:332)
    java.sql.SQLException: Error parsing time stamp
        at org.sqlite.jdbc3.JDBC3ResultSet.getTimestamp(JDBC3ResultSet.java:576)
        at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.currentTimeOnDB(GenericDatabaseDialect.java:484)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.endTimetampValue(TimestampIncrementingTableQuerier.java:203)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.setQueryParametersTimestamp(TimestampIncrementingCriteria.java:164)
        at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.setQueryParameters(TimestampIncrementingCriteria.java:126)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:176)
        at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:92)
        at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:60)
        at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:310)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.text.ParseException: Unparseable date: "2019-07-03 08:40:58" does not match (\p{Nd}++)\Q-\E(\p{Nd}++)\Q-\E(\p{Nd}++)\Q \E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q:\E(\p{Nd}++)\Q.\E(\p{Nd}++)
        at org.sqlite.date.FastDateParser.parse(FastDateParser.java:299)
        at org.sqlite.date.FastDateFormat.parse(FastDateFormat.java:490)
        at org.sqlite.jdbc3.JDBC3ResultSet.getTimestamp(JDBC3ResultSet.java:573)
        ... 17 more
    
0
votes

Stumbled across similar issue. In my case, even the topic didn't create. Found out that timezone for connect worker must be same as that of DB. Setting db.timezone property in the connect worker's property file correctly based on this list (under SHORT_IDS section) makes it work:

db.timezone=Asia/Kolkata