I installed Apache Kafka on centos 7 (confluent), am trying to run filestream kafka connect in distributed mode but I was getting below error:
[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65)
Which is now resolved by updating the workers.properties as mentioned in http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config
Command used:
/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties
Filestream properties file (workers.properties):
name=file-stream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
group.id=""
I added below properties and command went through without any errors.
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
group.id=""
But, now when I run consumer command, I am unable to see the messages in /tmp/demo-file.txt. Please let me know if there is a way I can check if the messages are published to kafka topics and partitions ?
kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning
I believe I am missing something really basic here. Can some one please help?
internal.key.converter
configuration property must be defined (along with others). - Randall Hauch...replication.factor
properties; see docs.confluent.io/current/connect/… - Randall Hauch