2
votes

I'm trying to use Kafka Connect to sync data from an old DB2 database to a Postgres database using the JDBC Source and Sink Connectors. It works fine, but only if I am very strict on the case I use for table names.

For example, I have a table in DB2 called ACTION and it also exists in Postgres with the same columns, etc. The only difference is in DB2 it's upper case ACTION and in Postgres it's lowercase action.

Here's a sink file that works:

{
    "name": "jdbc_sink_pg_action",
    "config": {
        "_comment": "The JDBC connector class",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        "_comment": "How to serialise the value of keys ",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",


        "_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "_comment": " --- JDBC-specific configuration below here  --- ",

        "_comment": "JDBC connection URL.",
        "connection.url": "jdbc:postgresql://localhost:5435/postgres",
        "connection.user": "postgres",
        "connection.password": "*****",

        "topics": "ACTION",
        "table.name.format": "action",

        "_comment": "The insertion mode to use",
        "insert.mode": "upsert",

        "_comment": "The primary key mode",
        "pk.mode": "record_value",

        "_comment": "List of comma-separated primary key field names. The runtime interpretation of this config depends on the pk.mode",
        "pk.fields": "ACTION_ID",

        "quote.sql.identifiers": "never"
    }
}

This is ok, but it's not very flexible. For example, I have many other tables and I'd like to sync them too, but I don't want to create a connector file for each and every table. So I try using:

"table.name.format": "${topic}",

When I do this, I get the following error in the logs when I try to load my sink connector:

Caused by: org.apache.kafka.connect.errors.ConnectException: Table "ACTION" is missing and auto-creation is disabled

So it seems to me that "quote.sql.identifiers": "never" is not actually working otherwise the query the sink connector is doing would be unquoted and it would allow for any case (it would convert to lower).

Why isn't this working? I get the same results if I just use ACTION as the table.name.format.

1

1 Answers

1
votes

Your PostgreSQL table name (action) is not equal to the topic name (ACTION). Kafka Connect JDBC Connector uses getTables() method to check if a table exists, where tableNamePattern param is case sensitive (according the docs: must match the table name as it is stored in the database).

You can use ChangeTopicCase transformation from Kafka Connect Common Transformations.