1
votes

I am working on a Camunda BPM Spring Boot Application. The application reads messages from rabbitmq queue using Spring Cloud Stream. Once the message is received, the application invokes a process instance in Camunda.

If there are messages already in the rabbitmq queue during application startup, the cloud stream listener starts reading messages even before Camunda is initialized.

Is it possible to stop the cloud stream listener from listening to the queue till some event is fired - in this case PostDeployEvent.

I have created a sample application for reference https://github.com/kpkurian/spring-cloud-stream-camunda

Thanks!!

2
I am looking at your autowired RuntimeService and assuming that's the source of the problem. How is it configured (not clear)? Is it a Lifecycle type?Oleg Zhurakousky
Hi @OlegZhurakousky RuntimeService is the API to start new process instance in Camunda BPM. Ref - docs.camunda.org/manual/7.9/user-guide/process-engine/…. When spring boot starts, Camunda will first try to deploy the process definitions (from classpath simple_process.bpmn and registers in database).The issue is that cloud stream reads the message before Camunda is able to deploy and register the process definition in the db and the subsequent call to runtime service.Hope this helps.KP Kurian
I don't know anything about Camunda , but RuntimeService is autowired, and by the time application is started it the assumption is that all services, beans etc are fully initialized. If it is still going through the process of initialization and startup then it is not properly implemented from Spring idioms perspective, hence my question about Lifecycle. What you can try to do is to somehow wrap RuntimeService with a custom Lifecycle implementation which would not return until its start() method is executed ensuring that RuntmeService is ready to go.Oleg Zhurakousky
Hi @OlegZhurakousky Thanks for your suggestion. I have implemented as per your suggestion and it is working as expected. The github repo is updated as well. If you get a chance, pl take a look and let me know if there is any room for improvements. Appreciate your help.KP Kurian
I have also updated the question since I can clearly see that stream listener starts listening only after the application context is completely loaded.KP Kurian

2 Answers

1
votes

As suggested by @OlegZhurakousky

Issue

RuntimeService is autowired, and by the time application is started it the assumption is that all services, beans etc are fully initialized. If it is still going through the process of initialization and startup then it is not properly implemented from Spring idioms perspective.

Solution

wrap RuntimeService with a custom Lifecycle implementation which would not return until its start() method is executed ensuring that RuntmeService is ready to go.

I have implemented this in the sample github application

0
votes

Spring cloud stream (kafka binder) added method for paussing and resuming consumer

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

please check the docs https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples

but i think there is some issue withe pause method: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS/ you can get partion id and topic name in a sample listener :

  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

or in errorChannel global Listener

   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}