4
votes

I have a database (Mariadb) relation with a column "modified" as "bigint(10)" that represents a timestamp, I believe in unix time format. When I try to run a kafka source connector with mode "timestamp" or "timestamp+incrementing" no events are pushed into the topic. If I run only incrementing, new entries are pushed to the topic. Can someone hint to me where I configured the connector wrongly? Or does the connector not recognize timestamps in unix time format?

I tried to run a connector (retrieval based only on timestamp) with the following properties:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name":"only_ts",
        "config": {
            "numeric.mapping": "best_fit",
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://mariadb/moodle",
            "connection.user": "user",
            "connection.password": "",
            "topic.prefix": "only_ts_",
            "mode": "timestamp", 
            "timestamp.column.name":"modified", 
            "table.whitelist":"mdl_forum_posts",
            "poll.intervals.ms": 10000
        }
  }'

I would expect to see entries from "mdl_forum_posts" to be pushed into a kafka topic "only_ts_mdl_forum_posts" whenever I create an entry or update an entry. However, using this connector, nothing happens. If I use only mode "incrementing" this works fine and as expected. But to get DB UPDATES too, I need to add the mode timestamp.

Output for "describe mdl_forum_posts"

+---------------+--------------+------+-----+---------+----------------+

| Field         | Type         | Null | Key | Default | Extra          |

+---------------+--------------+------+-----+---------+----------------+

| id            | bigint(10)   | NO   | PRI | NULL    | auto_increment |

| discussion    | bigint(10)   | NO   | MUL | 0       |                |

| parent        | bigint(10)   | NO   | MUL | 0       |                |

| userid        | bigint(10)   | NO   | MUL | 0       |                |

| created       | bigint(10)   | NO   | MUL | 0       |                |

| modified      | bigint(10)   | NO   |     | 0       |                |

| mailed        | tinyint(2)   | NO   | MUL | 0       |                |

| subject       | varchar(255) | NO   |     |         |                |

| message       | longtext     | NO   |     | NULL    |                |

| messageformat | tinyint(2)   | NO   |     | 0       |                |

| messagetrust  | tinyint(2)   | NO   |     | 0       |                |

| attachment    | varchar(100) | NO   |     |         |                |

| totalscore    | smallint(4)  | NO   |     | 0       |                |

| mailnow       | bigint(10)   | NO   |     | 0       |                |

| deleted       | tinyint(1)   | NO   |     | 0       |                |

+---------------+--------------+------+-----+---------+----------------+

And output for "show create table moodle.mdl_forum_posts;":

| mdl_forum_posts | CREATE TABLE mdl_forum_posts (

  id bigint(10) NOT NULL AUTO_INCREMENT,

  discussion bigint(10) NOT NULL DEFAULT '0',

  parent bigint(10) NOT NULL DEFAULT '0',

  userid bigint(10) NOT NULL DEFAULT '0',

  created bigint(10) NOT NULL DEFAULT '0',

  modified bigint(10) NOT NULL DEFAULT '0',

  mailed tinyint(2) NOT NULL DEFAULT '0',

  subject varchar(255) NOT NULL DEFAULT '',

  message longtext NOT NULL,

  messageformat tinyint(2) NOT NULL DEFAULT '0',

  messagetrust tinyint(2) NOT NULL DEFAULT '0',

  attachment varchar(100) NOT NULL DEFAULT '',

  totalscore smallint(4) NOT NULL DEFAULT '0',

  mailnow bigint(10) NOT NULL DEFAULT '0',

  deleted tinyint(1) NOT NULL DEFAULT '0',

  PRIMARY KEY (id),

  KEY mdl_forupost_use_ix (userid),

  KEY mdl_forupost_cre_ix (created),

  KEY mdl_forupost_mai_ix (mailed),

  KEY mdl_forupost_dis_ix (discussion),

  KEY mdl_forupost_par_ix (parent)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |

An example entry in the column "modified" is:

select modified from mdl_forum_posts;
1557487199

It is a timestamp in unix time as following seems to show:

select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59

The relevant log concerning the relevant connector (only timestamp) seems to show some query ?

kafka-connect_1    | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1    | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
1
does it work if you use just timestamp? Do you get an error in the Kafka Connect worker log? - Robin Moffatt
Good point, and: No, it is not working with just timestamp. If I start parallely an "only incrementing" connector, that one works. The log, as far as I see, does not show any exceptions. Connectors state is "Running". Thanks! - toschio
can you share your config when you tried it with timestamp only? And your table DDL? - Robin Moffatt
Hi, I added the connector config as I sent it as well as the DDL. Thanks for your help! - toschio
@toscio: Any luck finding a solution? I'm getting sporadic results personally. - paiego

1 Answers

0
votes

I had the same problem. The only workaround for me was as mentioned here: https://github.com/confluentinc/kafka-connect-jdbc/issues/566. It means that timestamp mode for unix timestamp (bigint) column can be used together with a custom query. You only need to use your own where clause. For example in your case, it could be something like:

SELECT id 
FROM mdl_forum_posts
WHERE to_timestamp(modified/1000) > ? AND to_timestamp(modified/1000) < ? ORDER BY modified ASC
--

to_timestamp is a date conversion function in your DB dialect. And please note -- that allows to comment autogenerated where clause.