0
votes

I am using kafka connect distribution. The command is : bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

The worker configuration is:


    bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
    group.id=connect-cluster
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

The kafka connect start over with no errors!

The topic connect-configs,connect-offsets,connect-statuses has been created. The topic mysiteview has been created.

Then i create kafka connectors using RESTful API like this:


    curl -X POST -H "Content-Type: application/json" --data '{"name":"hdfs-sink-mysiteview","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"3","topics":"mysiteview","hdfs.url":"hdfs://master1:8020","topics.dir":"/kafka/topics","logs.dir":"/kafka/logs","format.class":"io.confluent.connect.hdfs.avro.AvroFormat","flush.size":"1000","rotate.interval.ms":"1000","partitioner.class":"io.confluent.connect.hdfs.partitioner.DailyPartitioner","path.format":"YYYY-MM-dd","schema.compatibility":"BACKWARD","locale":"zh_CN","timezone":"Asia/Shanghai"}}'  http://kafka1:8083/connectors

And when i producer data to topic "mysiteview" something like this:


    {"f1":"192.168.1.1","f2":"aa.example.com"}

The java code is following:

Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++) {
    long runtime = new Date().getTime();
    String site = "www.example.com";
    String ipString = "192.168.2." + rnd.nextInt(255);
    String key = "" + rnd.nextInt(255);
    User u = new User();
    u.setF1(ipString);
    u.setF2(site+" "+rnd.nextInt(255));
    System.out.println(JSON.toJSONString(u));
    producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
    Thread.sleep(50);
}

producer.flush();
producer.close();

The weird things occured. I get data from kafka-logs but no data in hdfs(no topic directory). I try the connector command:


    curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/status

output is:

    {"name":"hdfs-sink-mysiteview","connector":{"state":"RUNNING","worker_id":"10.255.223.178:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":1,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":2,"worker_id":"10.255.223.178:8083"}]}

But when i inspect the task status using following command:

    curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/hdfs-sink-siteview-1

I get the result: "Error 404" . Three tasks is as the same error!

What' going wrong?

2

2 Answers

0
votes

Without seeing the worker's log, I'm not sure with which exception exactly your HDFS Connector instances are failing when you use the settings you describe above. However I can spot a few issues with the configuration:

  1. You mention that you start your Connect worker with: bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties. These properties default to having key and value converters set to AvroConverter and require you to run the schema-registry service. If indeed you've edited the configuration in connect-avro-distributed.properties to use the JsonConverter instead, your HDFS connector will probably fail during the conversion of Kafka records to Connect's SinkRecord data type, just before it tries to export your data to HDFS.
  2. Until recently, the HDFS connector was able to export only Avro records, to files of Avro or Parquet format. And that requires using the AvroConverter as mentioned above. The capability to export records to text files as JSON was added recently, and will appear in version 4.0.0 of the connector (you may try this capability by checking-out and building the connector from source).

At this point, my first suggestion would be to try and import your data with bin/kafka-avro-console-producer. Define their schema, confirm that the data are imported successfully with bin/kafka-avro-console-consumer and then set your HDFS Connector to use AvroFormat as above. The quickstart at the connector's page describes a very similar process, and maybe it would be a great starting point for your use case.

0
votes

maybe you are just using the REST-Api wrong. According to the documentation the call should be /connectors/:connector_name/tasks/:task_id

https://docs.confluent.io/3.3.1/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status