0
votes

I'm trying to connect mysql and kafka using Connector.

When I run bin/connect-standalone.sh config/connect-standalone.properties test.config, an error occurs.

[2019-11-20 06:02:05,219] ERROR Failed to create job for test.config (org.apache.kafka.connect.cli.ConnectStandalone:110) [2019-11-20 06:02:05,219] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"config"={, "database.user"="root",, "database.port"="3306",, "include.schema.changes"="true", "database.server.name"="asgard",, "connector.class"="io.debezium.connector.mysql.MySqlConnector",, "database.history.kafka.topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"="localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.customers",} contains no connector type at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118) Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"config"={, "database.user"="root",, "database.port"="3306",, "include.schema.changes"="true", "database.server.name"="asgard",, "connector.class"="io.debezium.connector.mysql.MySqlConnector",, "database.history.kafka.topic"="dbhistory.demo" ,, "database.server.id"="42",, "name"="mysql-source-demo-customers",, "database.hostname"="localhost",, {=, "database.password"="dsm1234",, }=, "database.history.kafka.bootstrap.servers"="localhost:9092",, "table.whitelist"="demo.customers",} contains no connector type at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115) [2019-11-20 06:02:05,221] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66) [2019-11-20 06:02:05,221] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:241) [2019-11-20 06:02:05,224] INFO Stopped http_8083@2a7686a7{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341) [2019-11-20 06:02:05,225] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:167) [2019-11-20 06:02:05,226] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:258) [2019-11-20 06:02:05,226] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:98) [2019-11-20 06:02:05,226] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:194) [2019-11-20 06:02:05,226] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66) [2019-11-20 06:02:05,226] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:215) [2019-11-20 06:02:05,227] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:115) [2019-11-20 06:02:05,227] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:71)

Here is my test.config :

{
  "name": "mysql-source-demo-customers",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "localhost",
      "database.port": "3306",
      "database.user": "root",
      "database.password": "dsm1234",
      "database.server.id": "42",
      "database.server.name": "asgard",
      "table.whitelist": "demo.customers",
      "database.history.kafka.bootstrap.servers": "localhost:9092",
      "database.history.kafka.topic": "dbhistory.demo" ,
      "include.schema.changes": "true"
  }
}

Here is my connect-standalone.properties :

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

plugin.path=/home/ec2-user/share/confluent-hub-components

Error log says "contains no connector type".

I found a similar question on Stackoverflow and followed it, but it didn't work for me or not relevant to my case. (Another similar question is asked by me, and it was about plugin.path)

1

1 Answers

1
votes

When I changed the test.config like the below, it works. (JSON format -> regular properties format)

test.config

name=mysql-source-demo-customers
tasks.max=1
connector.class =io.debezium.connector.mysql.MySqlConnector
database.hostname=localhost
database.port = 3306
database.user =root
database.password= dsm1234
database.server.id= 1234
database.server.name =jin
table.whitelist= demo.customers
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic= dbhistory.demo
include.schema.changes =true

I also tried to name test.config to test.properties, but changing file extension didn't affect anything.