I am using below dependncies,
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.debezium/debezium-connector-mysql -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<version.debezium>0.8.3.Final</version.debezium>
Below is my java method,
public void runMysqlParsser() {
Configuration config = Configuration.create()
/* begin engine properties */
.with("connector.class",
"io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage",
"org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename",
"/home/mohit/tmp/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin connector properties */
.with("name", "my-sql-connector")
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "root")
.with("database.password", "root")
.with("server.id", 1)
.with("database.server.name", "my-app-connector")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/home/mohit/tmp/dbhistory.dat")
.with("database.whitelist", "mysql")
.with("table.whitelist", "mysql.customers")
.build();
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::handleEvent)
.build();
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
private void handleEvent(SourceRecord sourceRecord) {
try {
LOG.info("Got record :" + sourceRecord.toString());
} catch (Exception ex) {
LOG.info("exception in handle event:" + ex);
}
My sql configurations, .
general_log_file = /var/log/mysql/mysql.log
general_log = 1
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = row
binlog_row_image = full
binlog_rows_query_log_events = on
gtid_mode = on
enforce_gtid_consistency = on
When I am running this code, I am getting the offset for the history logs also the mysql.log file is getting offset added to it. However when I am executing any update statement to the table, it is not giving me any logs i.e. the handleEvent method is not getting called. Can anyone tell me what is wrong with the code or configuration ?
Below is the logs after running the java code,
$$ java -jar debezium-gcp-1.0-SNAPSHOT-jar-with-dependencies.jar
log4j:WARN No appenders could be found for logger (org.apache.kafka.connect.json.JsonConverterConfig). log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Nov 28, 2018 1:29:47 PM com.debezium.gcp.SampleMysqlEmbededDebezium handleEvent INFO: Got record :SourceRecord{sourcePartition={server=my-app-connector}, sourceOffset={file=mysql-bin.000002, pos=980, gtids=31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17, snapshot=true}} ConnectRecord{topic='my-app-connector', kafkaPartition=0, key=Struct{databaseName=}, value=Struct{source=Struct{version=0.8.3.Final,name=my-app-connector,server_id=0,ts_sec=0,file=mysql-bin.000002,pos=980,row=0,snapshot=true},databaseName=,ddl=SET character_set_server=latin1, collation_server=latin1_swedish_ci;}, timestamp=null, headers=ConnectHeaders(headers=)} Nov 28, 2018 1:29:47 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect INFO: Connected to localhost:3306 at 31b708c7-ee22-11e8-b8a3-080027fbf50e:1-17 (sid:6326, cid:21)