0
votes

I am using confluent-3.0.1 platform and building a Kafka-Elasticsearch connector. For this I am extending SinkConnector and SinkTask (Kafka-connect APIs) to get data from Kafka.

As part of this code i am extending taskConfigs method of SinkConnector to return "max.poll.records" to fetch only 100 records at a time. But its not working and I am getting all records at same time and I am failing to commit offsets within the stipulated time. Please can any one help me to configure "max.poll.records"

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
2
BTW, Confluent 3.1 (released today) includes an Elasticsearch sink connector, in case that would meet your needs. docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/…shikhar

2 Answers

7
votes

You can't override most Kafka consumer configs like max.poll.records in the connector configuration. You can do so in the Connect worker configuration though, with a consumer. prefix.

2
votes

It was solved. I added below configuration in connect-avro-standalone.properties

 group.id=mygroup
 consumer.max.poll.records=1000

and ran below command for running my connector.

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties