0
votes

I am trying to run Kafka workers in distributed mode. Unlike standalone mode, we cannot pass the connector property file while starting the worker in distributed mode. In Distributed mode, workers are started separately and we deploy and manage the connectors on those workers using REST API

Reference Link - https://docs.confluent.io/current/connect/managing/configuring.html#connect-managing-distributed-mode

I tried building a connector by passing the below values in curl command and execued it

curl -X POST -H "Content-Type: application/json" --data '{"name":"sailpointdb","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.password " : " abc","connection.url " : "jdbc:mysql://localhost:3306/db","connection.user " : "abc" ,"query" : " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS 
TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A","mode" : " timestamp","timestamp.column.name" : "TASKFAILEDON","topic.prefix" : "dbevents","validate.non.null" : "false" }}' http://localhost:8089/connectors/

I am getting below error - curl: (3) URL using bad/illegal format or missing URL

Please let me know what is wrong with the above curl statement, am i missing anything here

1

1 Answers

1
votes
  1. You had an extra closing curly brace in your JSON which won't help
  2. If you're POSTing to /connectors you need the name and config root level elements. But, I recommend using PUT /config because you can re-run it to update the config if you need to

Try this:

curl -X PUT -H  "Content-Type:application/json" \
      http://localhost:8089/connectors/source-jdbc-sailpointdb-00/config \
      -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.password ": " abc",
        "connection.url ": "jdbc:mysql://localhost:3306/db",
        "connection.user ": "abc",
        "query": " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A",
        "mode": " timestamp",
        "timestamp.column.name": "TASKFAILEDON",
        "topic.prefix": "dbevents",
        "validate.non.null": "false"
    }'