0
votes

We are using Kafka Connect JDBC to sync tables between to databases (Debezium would be perfect for this but is out of the question).

The Sync in general works fine but it seems there are 3x the number of events / messages stored in the topic than expected.

What could be the reason for this?

Some additional information

The target database contains the exact number of messages (count of messages in the topics / 3).

Most of the topics are split into 3 partitions (Key is set via SMT, DefaultPartitioner is used).

JDBC Source Connector

{
  "name": "oracle_source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:oracle:thin:@dbdis01.allesklar.de:1521:stg_cdb",
    "connection.user": "****",
    "connection.password": "****",
    "schema.pattern": "BBUCH",
    "topic.prefix": "oracle_",
    "table.whitelist": "cdc_companies, cdc_partners, cdc_categories, cdc_additional_details, cdc_claiming_history, cdc_company_categories, cdc_company_custom_fields, cdc_premium_custom_field_types, cdc_premium_custom_fields, cdc_premiums, cdc, cdc_premium_redirects, intermediate_oz_data, intermediate_oz_mapping",
    "table.types": "VIEW",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "ts",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "validate.non.null": false,
    "numeric.mapping": "best_fit",
    "db.timezone": "Europe/Berlin",
    "transforms":"createKey, extractId, dropTimestamp, deleteTransform",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractId.field": "id",
    "transforms.dropTimestamp.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.dropTimestamp.blacklist": "ts",
    "transforms.deleteTransform.type": "de.meinestadt.kafka.DeleteTransformation"
  }
}

JDBC Sink Connector

{
  "name": "postgres_sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://writer.branchenbuch.psql.integration.meinestadt.de:5432/branchenbuch",
    "connection.user": "****",
    "connection.password": "****",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.schemas.enable": true,
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "delete.enabled": true,
    "auto.create": true,
    "auto.evolve": true,
    "topics.regex": "oracle_cdc_.*",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "oracle_cdc_(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}

Strange Topic Count

Topic Count

1
Why are you using mode timestamp+incrementing why not just timestamp? - Nitin
Correct - timestamp+incrementing is a good option - Robin Moffatt
I can recommend KSQL for this :) You could then run a COUNT…GROUP BY against the data. - Robin Moffatt
Can you edit your question to include your source and sink connector config in plain text please? - Robin Moffatt
Is it all topics that have the duplicates or just some of them? - Robin Moffatt

1 Answers

4
votes

This isn't an answer per-se but it's easier to format here than in the comments box.

It's not clear why you'd be getting duplicates. Some possibilities would be:

  1. You have more than one instance of the connector running
  2. You have on instance of the connector running but have previously run other instances which loaded the same data to the topic
  3. Data's coming from multiple tables and being merged into one topic (not possible here based on your config, but if you were using Single Message Transform to modify target-topic name could be a possibility)

In terms of investigation I would suggest:

  1. Isolate the problem by splitting the connector into one connector per table.
  2. Examine each topic and locate examples of the duplicate messages. See if there is a pattern to which topics have duplicates. KSQL will be useful here:

    SELECT ROWKEY, COUNT(*) FROM source GROUP BY ROWKEY HAVING COUNT(*) > 1
    

    I'm guessing at ROWKEY (the key of the Kafka message) - you'll know your data and which columns should be unique and can be used to detect duplicates.

  3. Once you've found a duplicate message, use kafkacat to examine the duplicate instances. Do they have the exact same Kafka message timestamp?

For more back and forth, StackOverflow isn't such an appropriate platform - I'd recommend heading to http://cnfl.io/slack and the #connect channel.