I have a database (Mariadb) relation with a column "modified" as "bigint(10)" that represents a timestamp, I believe in unix time format. When I try to run a kafka source connector with mode "timestamp" or "timestamp+incrementing" no events are pushed into the topic. If I run only incrementing, new entries are pushed to the topic. Can someone hint to me where I configured the connector wrongly? Or does the connector not recognize timestamps in unix time format?
I tried to run a connector (retrieval based only on timestamp) with the following properties:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name":"only_ts",
"config": {
"numeric.mapping": "best_fit",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mariadb/moodle",
"connection.user": "user",
"connection.password": "",
"topic.prefix": "only_ts_",
"mode": "timestamp",
"timestamp.column.name":"modified",
"table.whitelist":"mdl_forum_posts",
"poll.intervals.ms": 10000
}
}'
I would expect to see entries from "mdl_forum_posts" to be pushed into a kafka topic "only_ts_mdl_forum_posts" whenever I create an entry or update an entry. However, using this connector, nothing happens. If I use only mode "incrementing" this works fine and as expected. But to get DB UPDATES too, I need to add the mode timestamp.
Output for "describe mdl_forum_posts"
+---------------+--------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+----------------+
| id | bigint(10) | NO | PRI | NULL | auto_increment |
| discussion | bigint(10) | NO | MUL | 0 | |
| parent | bigint(10) | NO | MUL | 0 | |
| userid | bigint(10) | NO | MUL | 0 | |
| created | bigint(10) | NO | MUL | 0 | |
| modified | bigint(10) | NO | | 0 | |
| mailed | tinyint(2) | NO | MUL | 0 | |
| subject | varchar(255) | NO | | | |
| message | longtext | NO | | NULL | |
| messageformat | tinyint(2) | NO | | 0 | |
| messagetrust | tinyint(2) | NO | | 0 | |
| attachment | varchar(100) | NO | | | |
| totalscore | smallint(4) | NO | | 0 | |
| mailnow | bigint(10) | NO | | 0 | |
| deleted | tinyint(1) | NO | | 0 | |
+---------------+--------------+------+-----+---------+----------------+
And output for "show create table moodle.mdl_forum_posts;":
| mdl_forum_posts | CREATE TABLE mdl_forum_posts (
id bigint(10) NOT NULL AUTO_INCREMENT,
discussion bigint(10) NOT NULL DEFAULT '0',
parent bigint(10) NOT NULL DEFAULT '0',
userid bigint(10) NOT NULL DEFAULT '0',
created bigint(10) NOT NULL DEFAULT '0',
modified bigint(10) NOT NULL DEFAULT '0',
mailed tinyint(2) NOT NULL DEFAULT '0',
subject varchar(255) NOT NULL DEFAULT '',
message longtext NOT NULL,
messageformat tinyint(2) NOT NULL DEFAULT '0',
messagetrust tinyint(2) NOT NULL DEFAULT '0',
attachment varchar(100) NOT NULL DEFAULT '',
totalscore smallint(4) NOT NULL DEFAULT '0',
mailnow bigint(10) NOT NULL DEFAULT '0',
deleted tinyint(1) NOT NULL DEFAULT '0',
PRIMARY KEY (id),
KEY mdl_forupost_use_ix (userid),
KEY mdl_forupost_cre_ix (created),
KEY mdl_forupost_mai_ix (mailed),
KEY mdl_forupost_dis_ix (discussion),
KEY mdl_forupost_par_ix (parent)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |
An example entry in the column "modified" is:
select modified from mdl_forum_posts;
1557487199
It is a timestamp in unix time as following seems to show:
select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59
The relevant log concerning the relevant connector (only timestamp) seems to show some query ?
kafka-connect_1 | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1 | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
timestamp
? Do you get an error in the Kafka Connect worker log? - Robin Moffatttimestamp
only? And your table DDL? - Robin Moffatt