0
votes

I am using Confluent JDBC Sink Connector to capture all changes from the Kafka topic to the database. My message is the JSON format without any attached schema. For example:

{ "key1": "value1", "key2": 100}

Here is my configuration:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

The issue that I met is: Because of the legacy system, I cannot change the message format. So my messages are the JSON object without schema information. Does the library support mapping fields? For example mapping from field A to field B under database.

Thanks

2

2 Answers

0
votes

You have to have a declared schema to your data to use the JDBC Sink. This means in practice that you need to :

If you don't have that option for when the data is produced into Kafka, you can build a stream processing stage that applies the schema. You can do this with something like Kafka Streams, or with KSQL. The output of this is a Kafka topic, which is then what you use as the source for Kafka Connect. An example of doing it in KSQL would be:

-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR, 
                          KEY2 INT) 
  WITH (KAFKA_TOPIC='send_1', 
        VALUE_FORMAT='JSON');

-- Run a continuous query populating the target topic `SEND_1_AVRO` 
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO 
  WITH (VALUE_FORMAT='AVRO') AS 
  SELECT * 
    FROM send_1_src;

  • To learn more about KSQL see here.
  • You can find some great examples of patterns of stream processing with raw Kafka consumers vs Kafka Streams vs KSQL in Kafka Tutorials here.
-1
votes

There is another option of writing a consumer interceptor and attaching the schema to the to the value before it is consumed by JDBC sink connecter.

I tried it and it worked!