1
votes

I installed Apache Kafka on centos 7 (confluent), am trying to run filestream kafka connect in distributed mode but I was getting below error:

[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65)

Which is now resolved by updating the workers.properties as mentioned in http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config

Command used:

/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties

Filestream properties file (workers.properties):

name=file-stream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
group.id=""

I added below properties and command went through without any errors.

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
group.id=""

But, now when I run consumer command, I am unable to see the messages in /tmp/demo-file.txt. Please let me know if there is a way I can check if the messages are published to kafka topics and partitions ?

kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning

I believe I am missing something really basic here. Can some one please help?

1
What is in the standalone worker properties? The internal.key.converter configuration property must be defined (along with others). - Randall Hauch
Thank you for response. I updated the question with updated properties. Can you please suggest ? Also, can I please know difference between config.storage.topic, offset.storage.topic and status.storage.topic ? All seems to be mentioning the topic on kafka to publish to. Please correct me if am incorrect. - Shankar Guru
How many brokers are running in your Kafka cluster? If less than 3, you'll have to specify the ...replication.factor properties; see docs.confluent.io/current/connect/… - Randall Hauch

1 Answers

2
votes

You need to define unique topics for Kafka connect framework to store its config, offset, and status.

In your workers.properties file change these parameters to something like the following:

config.storage.topic=demo-2-distributed-config
offset.storage.topic=demo-2-distributed-offset
status.storage.topic=demo-2-distributed-status

These topics are use to store state and configuration metadata of connect and not for storing the messages for any of the connectors that run on top of connect. Do not use console consumer on any of these three topics and expect to see the messages.

The messages are stored in the topic configured in the connector configuration json with the parameter called "topic".

Example file-sink-config.json file

{
  "name": "MyFileSink",
  "config": {
      "topics": "mytopic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "file": "/tmp/demo-file.txt"
    }
}

Once the distributed worker is running you need to apply the config file to it using curl like so:

curl -X POST -H "Content-Type: application/json" --data @file-sink-config.json http://localhost:8083/connectors

After that the config will be safely stored in the config topic you created for all distributed workers to use. Make sure the config topic (and the status and offset topics) will not expire messages or you will loose you Connector configuration when it does.