4
votes

I'm running a kafka-connect distributed setup.

I was testing with a single machine/process setup (still in distributed mode), which worked fine, now I'm working with 3 nodes (and 3 connect processes), logs do not contain errors, but when I submit an s3-connector request through the rest-api, it returns: {"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}.

When I stop the kafka-connect process on one of the nodes, I can actually submit the job and everything is running fine.

I have 3 brokers in my cluster, the partition number of the topic is 32.

This is the connector I'm trying to launch:

{
    "name": "s3-sink-new-2",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "32",
        "topics": "rawEventsWithoutAttribution5",
        "s3.region": "us-east-1",
        "s3.bucket.name": "dy-raw-collection",
        "s3.part.size": "64000000",
        "flush.size": "10000",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "partition.duration.ms": "60000",
        "path.format": "\'year\'=YYYY/\'month\'=MM/\'day\'=dd/\'hour\'=HH",
        "locale": "US",
        "timezone": "GMT",
        "timestamp.extractor": "RecordField",
        "timestamp.field": "procTimestamp",
        "name": "s3-sink-new-2"
    }
}

Nothing in the logs indicate a problem, and I'm really lost here.

3
Will provide more info from logs if necessary :)OmriManor
I think I figured it out, I have 3 workers, and 32 partitions/tasks. I think kafka-connect is trying to evenly distribute the work between 3 workers, and is unable to (32 / 3 = 10.66667). I will test tomorrow with 4 workers.OmriManor
I've seen this error before when the rest.advertised.host.name cannot be resolved over each workerOneCricketeer
thanks for the comment, the documentation on this configuration parameter is lacking to say the least. What exactly should this resolve to? the host-name of one of the workers?, I thought they communicated strictly through kafka.OmriManor
It needs to be set to the external host or IP of the machine, and the port is set with rest.port. But the rebalancing and REST requests communicate between the workers directly, not just through Kafka, from what I've noticed. If this isn't the issue, the consumer group is literally rebalancing and there might be other instabilities in the cluster, not just ConnectOneCricketeer

3 Answers

5
votes

I had the same problem with my setup on Kubernetes. The issue was that I had CONNECT_REST_ADVERTISED_HOST_NAME set to same value on each of 16 nodes. It causes constant rebalancing issue. Have unique value and you should be fine.

The solution for K8S, which works for me:

- env:
  - name: CONNECT_REST_ADVERTISED_HOST_NAME
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
1
votes

As Wojciech Sznapka has said, CONNECT_REST_ADVERTISED_HOST_NAME (rest.advertised.host.name if you're not using Docker) is the issue here. It needs to be set not just to a unique value but the correct hostname of the worker and that can be resolved from the other workers.

rest.advertised.host.name is used by Kafka Connect to determine how to contact the other workers - for example when it needs to forward on a REST request to a worker if it is not the leader. If this config is not set correctly then problems ensue.

If you have a cluster of workers and you shut all but one down and suddenly things work, that's because by shutting the others down you've guaranteed that the remaining worker is the leader and thus won't have to forward the request on.

For more details see https://rmoff.net/2019/11/22/common-mistakes-made-when-configuring-multiple-kafka-connect-workers/

0
votes

Same as for @OmriManor, in my case it was an issue with one of the nodes, causing a rebalance loop. What I did was to pause the connector, then I stopped all nodes accept for one, then i was able to delete the connector since the single node did not cause the rebalance loop.