1
votes

I'm new to Kafka and I can't figure out how to use "transforms.router.type" to make it work with my Debezium setup. So I made special event transformation java class and prepeared configuration for it to deploy to container which looks like this:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d

{
  "name": "task-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "slot.name" : "task_engine_saga",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "tasks",
    "schema.whitelist": "public",
    "table.whitelist" : "public.task",
    "tombstones.on.delete" : "false",
    "transforms" : "router",
    "transforms.router.type" : "com.task.connect.TaskEventRouter"
  }
}

And the response says that this config can't be found.

CREATE kafka connector task-connector....
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1091  100   516  100   575   8322   9274 --:--:-- --:--:-- --:--:-- 17596{"error_code":400,"message":"Connector configuration is invalid and contains the following 3 error(s):\nInvalid value com.task.connect.TaskEventRouter for configuration transforms.router.type: Class com.task.connect.TaskEventRouter could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nA value is required\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

Then I copied to container /connect directory my host folder with jar file in which I have my java class with event transformation logic, but it didn't help too. Could someone please help me and tell what should I do to make this custom transforms.router.type work with my Debezium set up?

My containers docker-compose set up:

version: '3'
services:
    pgadmin:
        container_name: pgadmin_container
        image: dpage/pgadmin4
        environment:
            PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:[email protected]}
            PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
        volumes:
            - pgadmin:/root/.pgadmin
        ports:
            - "${PGADMIN_PORT:-5050}:80"
        restart: unless-stopped

    zookeeper:
        image: debezium/zookeeper:1.3
        ports:
            - 2181:2181
            - 2888:2888
            - 3888:3888
    kafka:
        image: debezium/kafka:1.3
        ports:
            - 9092:9092
        links:
            - zookeeper
        environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
    postgres:
        image: debezium/example-postgres:1.3
        ports:
            - 5432:5432
        environment:
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - PGDATA=/data/postgres
            - POSTGRES_DB=${POSTGRES_DB:-task_engine}
    connect:
        image: debezium/connect:1.3
        ports:
            - 8083:8083
        links:
            - kafka
            - postgres
        environment:
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses

volumes:
    postgres:
    pgadmin:

1
What does your plugin.path property look like? Have you tried putting the JAR containing your custom transform in the same location as the JAR containing PostgresConnector? - dnault
Alternatively, you could add the JAR containing your custom transform to the kafka lib directory, but that wouldn't be as clean. - dnault
@dnault I have plugin.path=/kafka/connect in my connect-distribted.properties. So there I have debezium-connector-postgres folder also and putted there jar file, but still no luck with this. Tried putting jar into kafka/libs the same situation. Maybe there somekind of a reconfig script should be executed after I put this file in kafka folders? And only after that I should deploy configurations? - xeLL
unclear what com.task.connect.TaskEventRouter is... Did you add this yourself? Because this does not exist in Debezium, as the error suggests - OneCricketeer
I've created custom docker image with transformation copied and it helped - xeLL

1 Answers

0
votes

I made special event transformation java class

From what I can tell, you have not mounted this compiled class into your debezium image or edited the plugin.path file to include that JAR

If you have created your own Docker image with that data, then you should change image: debezium/connect:1.3

This would explain why the transform is not available