1
votes

I have implemented my own partition assignment strategy by implementing RangeAssignor in my spring boot application. I have overridden its subscriptionUserData method and adding some user data. Whenever this data is getting changed I want to trigger partition rebalance by invoking below kafkaConsumer's api kafkaconsumer apis enforce rebalance

I am not sure how can I get the object of kafka consumer and invoke this api. Please suggest

1

1 Answers

-1
votes

You can call consumer.wakeup() function

consumer.wakeup() is the only consumer method that is safe to call from a different thread. Calling wakeup will cause poll() to exit with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll() is called. The WakeupException doesn’t need to be handled, but before exiting the thread, you must call consumer.close(). Closing the consumer will commit off‐ sets if needed and will send the group coordinator a message that the consumer is leaving the group. The consumer coordinator will trigger rebalancing immediately

  Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            System.out.println("Starting exit...");
            consumer.wakeup();   **//1**
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
} });
    ...
    Duration timeout = Duration.ofMillis(100);
    try {
        // looping until ctrl-c, the shutdown hook will cleanup on exit
        while (true) {
            ConsumerRecords<String, String> records =
                movingAvg.consumer.poll(timeout);
            System.out.println(System.currentTimeMillis() +
                "--  waiting for data...");
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
            }
            for (TopicPartition tp: consumer.assignment())
                System.out.println("Committing offset at position:" +
                    consumer.position(tp));
                movingAvg.consumer.commitSync();
        }
    } catch (WakeupException e) {
        // ignore for shutdown. **//2**
    } finally {
        consumer.close(); **//3**
        System.out.println("Closed consumer and we are done");
    }
  1. ShutdownHook runs in a separate thread, so the only safe action we can take is to call wakeup to break out of the poll loop.
  2. Another thread calling wakeup will cause poll to throw a WakeupException. You’ll want to catch the exception to make sure your application doesn’t exit unexpect‐ edly, but there is no need to do anything with it.
  3. Before exiting the consumer, make sure you close it cleanly.

full example at:

https://github.com/gwenshap/kafka-examples/blob/master/SimpleMovingAvg/src/main/java/com/shapira/examples/newconsumer/simplemovingavg/SimpleMovingAvgNewConsumer.java