Spring cloud stream (kafka binder) added method for paussing and resuming consumer
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
please check the docs
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples
but i think there is some issue withe pause method:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479
PS/
you can get partion id and topic name in a sample listener :
@StreamListener(Sink.INPUT)
public void in(String in,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
TopicPartition p = new TopicPartition(topic, partition);
consumer.pause(Collections.singleton(p));
}
or in errorChannel global Listener
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
TopicPartition p = new TopicPartition(topic, partition);
// ?
consumer.pause(Collections.singleton(p));
}
RuntimeService
and assuming that's the source of the problem. How is it configured (not clear)? Is it aLifecycle
type? – Oleg ZhurakouskyRuntimeService
is autowired, and by the time application is started it the assumption is that all services, beans etc are fully initialized. If it is still going through the process of initialization and startup then it is not properly implemented from Spring idioms perspective, hence my question aboutLifecycle
. What you can try to do is to somehow wrapRuntimeService
with a customLifecycle
implementation which would not return until itsstart()
method is executed ensuring that RuntmeService is ready to go. – Oleg Zhurakousky