1
votes

I'm trying to mirror a KSQL table in my Postgres DB using the JDBC sink connector but unfortunately I'm not able to make it work.

I am using Kafka 5.4.1 and I have 2 debezium 1.0 topics serialized with Avro coming from my Postgres DB. This is the configuration of my Debezium connector:

    {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "xxx",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "kafka-svc:9092",
        "database.history.kafka.topic": "dbhistory.xxx",
        "database.server.name": "xxx",
        "database.port": "5432",
        "plugin.name": "decoderbufs",
        "table.whitelist": "public.a,public.b",
        "database.hostname": "app-db",
        "name": "connector",
        "connection.url": "jdbc:postgresql://app-db:5432/xxx",
        "database.whitelist": "xxx",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "table"
    }

I then use KSQL CLI to interact with my server and issue the following commands:

CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');

CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');

CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;

CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;

TLDR, I create 2 streams from the debezium topics and repartition them to get them ready for a JOIN. Then, I turn one of them (b_by_id) into a table because I don't want to use a windowed join in this case:

CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');

At this point everything works fine and I can play with my streams and tables and joins and see that changes in my source DB are immediately being reflected in my streaming queries in KSQL. My issue arises when I decide to perform some aggregate function on my data and mirror the results in my Postgres DB (the same as the source DB). In order to do that, I create a new KSQL table as a result of a SELECT:

CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;

Then, I set up a JDBC sink connector to dump the grouped_data changelog topic of my new table to my DB with the following configuration:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://app-db:5432/xxx",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "topics": "grouped_data",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-svc:8081",
    "pk.mode": "record_value",
    "pk.fields": "x, y, z",
    "table.name.format" : "kafka_${topic}",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "max_date",
    "transforms.TimestampConverter.target.type": "Timestamp"
}

Unfortunately nothing happens, no errors and no data on my sink DB. The connector is properly created and configured and even if I force new messages to be processed by my streaming queries, no data is being transferred to my sink DB, the destination table doesn't even get created. I tried creating the connector multiple times with different names and configurations, different values for pk.mode, etc but I couldn't get it to work. Creating a connector for my table "b" above works perfectly fine and all the data gets transferred immediately.

Here are additional details on the KSQL table that I am trying to mirror to postgres:

describe extended grouped_data;

Name                 : GROUPED_DATA
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

 Field              | Type                      
------------------------------------------------
 ROWTIME            | BIGINT           (system) 
 ROWKEY             | VARCHAR(STRING)  (system) 
 X                  | BIGINT                    
 Y                  | BIGINT                    
 Z                  | BIGINT                    
 MAX_DATE           | BIGINT                    
------------------------------------------------

Thanks!

1
Kudos for a well-written question with clear examples, versioning, etc 👏Robin Moffatt
Thanks @RobinMoffatt!Roberto Francescangeli

1 Answers

0
votes

You've configured Kafka Connect to use the lower-case topic name

"topics": "grouped_data",

but per your DESCRIBE output the topic that the table is writing to is in upper-case:

Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

If you check your Kafka Connect worker log closely you'll find this:

Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE} 

Kafka Connect won't abort if you give it a topic that doesn't exist - because maybe that's the topic that you meant to specify because you're going to subsequently populate it.

So, you can either amend your Kafka Connect worker config to use the upper-case topic name, or you can redefine your ksqlDB table and include …WITH (KAFKA_TOPIC='grouped_data') in the DDL.