I'm using Kafka Connect in distributed mode. A strange behavior I observed multiple times now is that, after some time (can be hours, can be days), what appears to be a balancing error happens: same tasks get assigned to multiple workers. As a result, they run concurrently and, depending on the nature of the connector, fail or produce "unpredictable" outputs.
The simplest configuration I was able to use to reproduce the behavior is: two Kafka Connect workers, two connectors, each connector with one task only. Kafka Connect is deployed into Kubernetes. Kafka itself is in Confluent Cloud. Both Kafka Connect and Kafka are of the same version (5.3.1).
Relevant messages from the log:
Worker A:
[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Worker B:
[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
In above log extracts you can observe that same task (some-mqtt-source-0
) is assigned to two workers. After this message, I can also see log messages by the task instances on both workers.
This behavior doesn't depend on the connector (I observed it with other tasks as well). It also doesn't happen immediately after the workers are started, but after some time only.
My question is what can be the cause of this behavior?
EDIT 1: I've tried running 3 workers, instead of two, thinking that it might be a distributed consensus issue. It appears not to be, and having 3 workers doesn't fix the issue.
EDIT 2:
I've noticed that just before a worker A is assigned a task that originally ran on worker B, that worker (B) observes an error joining a group. For example, if tasks get "duplicated" in generation N, worker B would not have a "Successfully joined group with generation N" message in logs. More so, between generation N-1 and N+1, worker B typically logs errors like Attempt to heartbeat failed for since member id
and Group coordinator bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092 (id: 1234567890 rack: null) is unavailable or invalid
. Worker B typically joins the generation N+1 shortly after the generation N (sometimes in as few as just about 3 seconds later). It is now clear what triggers the behavior. However:
although I understand that there may be temporary issues like these and they are probably normal in a general case, why doesn't rebalancing fix the issue after all servers successfully join the next generation? Although more rebalnce follow - it doesn't correctly distribute the tasks, and keeps the "duplicates" forever (until I restart workers).
it appears that in some periods, rebalance happens almost once per several hours, and in other periods it happens every 5 minutes (precisely up to seconds); what could be the reason? what is the normal?
what could be the reason for "Group coordinator is unavailable or invalid" errors, given that I use a Confluent Cloud, and are there any configuration parameters that can be tweaked in Kafka Connect in order to make it more resilient with regards to this error? I know there are
session.timeout.ms
andheartbeat.interval.ms
, but the documentation is so minimalist it is not even clear what is the practical impact of changing these parameters to smaller or bigger values.
EDIT 3: I observed that the issue is not critical for sink tasks: although same sink tasks get assigned to multiple workers, corresponding consumers are assigned to different partitions as they normally should, and everything works almost as it should - I simply got more tasks than I originally asked for. However, in case of source tasks, the behavior is breaking - tasks run concurrently and compete for resources on the source side.
EDIT 4: Meanwhile, I downgraded Kafka Connect to version 2.2 (Confluent Platform 5.2.3) - a pre-"Incremental Cooperative Rebalancing" version. It works fine for last 2 days. So, I assume the behavior is related to the new rebalancing mechanism.