0
votes

I'm busy learning kafka, specifically the downstream element of using a sink connector (jdbc) to send data from a kafka topic to mysql.

I've build an image from the confluent kafka-connect-base image with the jdbc connector and the mysql jdbc driver, in a Dockerfile like this:

FROM confluentinc/cp-kafka-connect-base

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.1

ENV MYSQL_DRIVER_VERSION 5.1.39

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
     | tar -xzf - -C /usr/share/java/kafka/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar

I'll be honest, I'm getting a little lost in the documentation and I'm getting to the point where I want to include the config for the sink, but I have no idea how to include that or where to copy it to. I have create the sink configuration file, but not sure where to put it. As part of the image build, or when running the kafka-connect container?

The end game for this is the create a helm deployment for this connector, but I'm not there yet.

Any help would be greatly appreciated as I learn all about Kafka and Kafka Connect.

1

1 Answers

3
votes

Since you're using Kafka Connect in distributed mode (which is generally the best option) you pass your connector configuration as a REST call.

Here's an example JDBC sink configuration - you'd need to modify it to suit your source topic, serialisation, etc:

curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:mysql://mysql-host:3306/",
      "connection.user": "user",
      "connection.password": "pw",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "foo",
      "auto.create": "true",
      "auto.evolve":"true",
      "pk.mode":"none"          
    }'

If you want to pass your connector config when the container is instantiated you can embed it in the start command, which should start the worker, wait for it to be available, and then pass the config. Here's an example. Also see https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/.