0
votes

I configured a Kafka JDBC Source connector in order to push on a Kafka topic the record changed (insert or update) from a PostgreSQL database. I use "timestamp+incrementing" mode. Seems to work fine. I didnt't configure the JDBC Sink connector because I'm using a Kafka Consumer that listen on the topic.

The message on the topic is a JSON. This is an example:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_create_date"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_modify_date"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "entity_version"
      },
      {
        "type": "string",
        "optional": true,
        "field": "firstname"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lastname"
      }
    ],
    "optional": false,
    "name": "author"
  },
  "payload": {
    "id": 1,
    "entity_create_date": 1600287236682,
    "entity_modify_date": 1600287236682,
    "entity_version": 1,
    "firstname": "George",
    "lastname": "Orwell"
  }
}

As you can see there is no info about if this change is captured by Source connector because of an insert or an update. I need this information. How can solve?

2

2 Answers

0
votes

You can't get that information using the JDBC Source connector, unless you do something bespoke in the source schema and triggers.

This is one of the reasons why log-based CDC is generally a better way to get events from the source database, and for other reasons including:

  • capturing deletes
  • capturing the type of operation
  • capturing all events, not just what's there at the time when the connector polls.

For more details on the nuances of this see this blog or a talk based on the same.

0
votes

Using a CDC based approach as suggested by @Robin Moffatt may be the proper way to handle your requirement. Checkout https://debezium.io/

However, looking at your table data you could use "entity_create_date" and "entity_modify_date" in your consumer to determine if the message in an insert or update. If "entity_create_date" = "entity_modify_date" then it's an insert else it's an update.