2
votes

Let's say I have a service that that consumes messages through kafka-rest-proxy and always on the same consumer group. Let's also say that it is consuming on a topic that has one partition. When the service is started, it creates a new consumer in kafka-rest-proxy, and uses the generated consumer url until the service is shutdown. When the service comes back up, it will create a new consumer in kafka-rest-proxy, and use the new url (and new consumer) for consuming.

My Questions

  1. Since kafka can only have at most one consumer per partition. What will happen in kafka and kafka-rest-proxy, when the consumer is restarted? i.e. A new consumer is created in kafka-rest-proxy, but the old one didn't have a chance to be destroyed. So now there are 'n' consumers after 'n' restarts of my service in kafka-rest-proxy, but only one of them is actively being consumed. Will I even be able to consume messages on my new consumer since there are more consumers than partitions?

  2. Let's make this more complicated and say that I have 5 instances of my service on the same consumer group and 5 partitions in the topic. After 'n' restarts of all 5 instances of my service, would I even be guranteed to consume all messages without ensuring the proper destruction of the existing consumers. i.e. What does Kafka and kafka-rest-proxy do during consumer creation, when the consumers out number the partitions?

  3. What is considered to be the kafka-rest-proxy best practice, to ensure stale consumers are always cleaned up? Do you suggest persisting the consumer url? Should I force a kafka-rest-proxy restart to ensure existing consumers are destroyed before starting my service?

* EDIT * I believe part of my question is answered with this configuration, but not all of it.

consumer.instance.timeout.ms - Amount of idle time before a consumer instance is automatically destroyed. Type: int Default: 300000 Importance: low

1

1 Answers

2
votes
  1. If you cannot cleanly shutdown the consumer, it will stay alive for a period after last request was made to it. The proxy will garbage collect stale consumers for exactly this case -- if it isn't cleanly shutdown, the consumer would hold on to some partitions indefinitely. By automatically garbage collecting the consumers, you don't need some separate durable storage to keep track of your consumer instances. As you discovered, you can control this timeout via the config consumer.instance.timeout.ms.

  2. Since instances will be garbage collected, you are guaranteed to eventually consume all the messages. But during the timeout period, some partitions may still be assigned to the old set of consumers and you will not make any progress on those partitions.

  3. Ideally unclean shutdown of your app is rare, so best practice is just to clean up the consumer when you're app is shutting down. Even in exceptional cases, you can use the finally block of a try/catch/finally to destroy the consumer. If one is left alive, it will eventually recover. Other than that, consider tweaking the consumer.instance.timeout.ms setting to be lower if your application can tolerate that. It just needs to be larger than the longest period between calls that use the consumer (and you should keep in mind possible error cases, e.g. if processing a message requires interacting with another system and that system can become slow/inaccessible, you should account for that when setting this config).

You can persist the URLs, but even that is at some risk for losing track of consumers since you can't atomically create the consumer and save its URL to some other persistent storage. Also, since completely uncontrolled failures where you have no chance to cleanup shouldn't be a common case, it often doesn't benefit you much to do that. If you need really fast recovery from that failure, the consumer instance timeout can probably be reduced significantly for your application anyway.

Re: forcing a restart of the proxy, this would be fairly uncommon since the REST Proxy is often a shared service and doing so would affect all other applications that are using it.