0
votes

I am trying to run kafka connect in distributed mode to read from topic and write to HDFS.
The process is starting successfully with the required properties files given, but I am not able to commit the events to HDFS.

Please see the full logs below.

Note: Broker details have been masked and all other details are mentioned in the logs.

[2018-06-26 11:49:48,474] INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [****:****, ****:****]
        check.crcs = true
        client.id = consumer-3
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-cluster
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:180)
[2018-06-26 11:49:48,476] WARN The configuration 'config.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'status.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'value.converter.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'internal.key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'internal.value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'internal.value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'offset.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,476] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,477] WARN The configuration 'key.converter.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:188)
[2018-06-26 11:49:48,477] INFO Kafka version : 0.10.1.0-IBM-8 (org.apache.kafka.common.utils.AppInfoParser:83)
[2018-06-26 11:49:48,477] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser:84)
[2018-06-26 11:49:48,682] INFO Discovered coordinator ****:**** (id: 2147483644 rack: null) for group connect-cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:555)
[2018-06-26 11:49:48,686] INFO Finished reading KafkaBasedLog for topic connect-test-configs (org.apache.kafka.connect.util.KafkaBasedLog:148)
[2018-06-26 11:49:48,686] INFO Started KafkaBasedLog for topic connect-test-configs (org.apache.kafka.connect.util.KafkaBasedLog:150)
[2018-06-26 11:49:48,686] INFO Started KafkaConfigBackingStore (org.apache.kafka.connect.storage.KafkaConfigBackingStore:261)
[2018-06-26 11:49:48,687] INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:171)
[2018-06-26 11:49:48,689] INFO Discovered coordinator ****:**** (id: 2147483644 rack: null) for group connect-cluster. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:555)
[2018-06-26 11:49:48,691] INFO (Re-)joining group connect-cluster (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:381)
[2018-06-26 11:49:48,702] INFO Successfully joined group connect-cluster with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:349)
[2018-06-26 11:49:48,702] INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-67ad9053-4b20-41dd-9a6a-ec8e4f4d622e', leaderUrl='http://****:****/', offset=-1, connectorIds=[], taskIds=[]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1009)
[2018-06-26 11:49:48,702] INFO Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:745)
[2018-06-26 11:49:48,702] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:752)
[2018-06-26 11:49:55,748] INFO Reflections took 7022 ms to scan 225 urls, producing 15521 keys and 98977 values  (org.reflections.Reflections:229)
1
Looks like it's starting to me... What's the problem? Where's the error? Please show your JSON connector property file that you've posted to connect (if you have) - OneCricketeer
Just want to know where should we give the properties of the topics,hdfs url,task in a distributed mode? Should we pass it to a separate hdfs.properties file? - BARATH

1 Answers

1
votes

but I am not able to commit the events to HDFS

When you start distributed mode, it doesn't take any individual connector properties, you load those later

Just want to know where should we give the properties of the topics,hdfs url,task in a distributed mode?

You POST JSON into it.

Make a connect-hdfs.json file, for example

{
   "name": "quickstart-hdfs-sink"
   "config": {
      "store.url": "hdfs:///apps/kafka-connect",
      "hadoop.conf.dir": "/etc/hadoop/conf",
      "topics" :  ... 
   }
} 

Send it to Connect

curl -XPOST [email protected] http://connect-server:8083/connectors