0
votes

We are testing kafka connect in distributed mode to pull topic records from kafka to HDFS. We have two boxes. One in which kafka and zookeeper daemons are running. We have kept one instance of kafka connect in this box. We have another box where HDFS namenode is present. We have kept another instance of kafka connect here.

We started kafka,zookeeper and kafka connect in first box. We started kafka connect in second box as well. Now as per confluent documentation, we have to start the HDFS connector(or any other connector for that matter) using REST API. So, after starting kafka connect in these two boxes, we tried starting connector through REST API. We tried below command:-

curl -X POST -H "HTTP/1.1 Host: ip-10-16-34-57.ec2.internal:9092 Content-Type: application/json Accept: application/json" --data '{"name": "hdfs-sink", "config": {"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector", "format.class":"com.qubole.streamx.SourceFormat", "tasks.max":"1", "hdfs.url":"hdfs://ip-10-16-37-124:9000", "topics":"Prd_IN_TripAnalysis,Prd_IN_Alerts,Prd_IN_GeneralEvents", "partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner", "locale":"", "timezone":"Asia/Calcutta" }}' http://ip-10-16-34-57.ec2.internal:8083/connectors

As soon as we press enter here, we get below response:

    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
    <title>Error 415 </title>
    </head>
    <body>
    <h2>HTTP ERROR: 415</h2>
    <p>Problem accessing /connectors. Reason:
    <pre>    Unsupported Media Type</pre></p>
    <hr /><i><small>Powered by Jetty://</small></i>
    </body>
    </html>

The connect-distributed.properties file at etc/kafka/ is below in both the kafka connect nodes. We have created the said three topics as well (connect-offsets,connect-configs,connect-status)

bootstrap.servers=ip-10-16-34-57.ec2.internal:9092
group.id=connect-cluster
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
enable.auto.commit=true
auto.commit.interval.ms=1000
offset.flush.interval.ms=1000
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
rest.port=8083
config.storage.topic=connect-configs
status.storage.topic=connect-status
offset.flush.interval.ms=10000

What is the issue here? Are we missing something to start kafka connect in distributed mode to work with HDFS connectors. kafka connect in standalone mode is working fine.

1

1 Answers

0
votes

To upload a connector, this is a PUT command, not a POST: http://docs.confluent.io/3.1.1/connect/restapi.html#put--connectors-(string-name)-config

On a side note, I believe that you curl command might be wrong:

  • you need one -H switch per header, putting all headers in one -H parameter is not how it works (I think).
  • I do not think that the port is part of the Host header.