0
votes

I am playing with Spring-cloud-stream and RabbitMQ.

I have a REST endpoint which produces the messages.

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class ProducerDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerDemoApplication.class, args);
    }
}

@RestController
class ProducerController {

    @Autowired
    MyProcessor myProcessor;

    @RequestMapping(value = "sendmessage/{message}", method = RequestMethod.GET)
    public String sendMessage(@PathVariable("message") String message) {
        myProcessor.anOutput().send(MessageBuilder.withPayload(message).build());
        return "sent";  
    }

}

interface MyProcessor {
    String INPUT = "myInput";

    @Output("myOutput")
    MessageChannel anOutput();
}

Through another application I am consuming these messages.

@StreamListener(MyProcessor.INPUT)
public void eventHandler(String message) {
    System.out.println("**************  Message received => "+message);
}

When both applications are UP and running. I am able to publish the message and consume it at the consumer.

The problem I am facing in the following scenario:

I am purposefully making the consumer down and publishing the message through the producer. Now when the consumer starts it is receiving no message.

I suppose RabbitMQ guarantees message delivery.

Github links
https://github.com/govi20/producer-demo
https://github.com/govi20/consumer-demo

2
Can you post your app on Github so we can take a look? There is something missing from what you've provided above. For example, String INPUT = "myInput"; - I don't see and @Input configuration, etcOleg Zhurakousky
@OlegZhurakousky I've added GitHub links in original question.Govinda Sakhare

2 Answers

1
votes

You need a group on the consumer input binding. Otherwise it is anonymous and binds an auto delete queue, which only exists while the consumer is running.

1
votes

As I mentioned before, you already have miss-configuration in 'myInput' since you don't have @Input configuration which results in A component required a bean named 'myInput' that could not be found. error during consumer startup. So, something like this would be required on hte consumer side

 interface MyProcessor {
    String INPUT = "myInput";

    @Input("myInput")
    MessageChannel myInput();
}

Further more if you don't define a group it results in anonymous queue on the Rabbit side (something like this myInput.anonymous.pZg03h0zQ2-SHLh1_QL8DQ) which essentially results in a different name of the queue every time you start, so

spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=myGroup

will result in queue name myInput.myGroup which is persistent and consistent between startups.

Also, on the producer side myOutput results in creation of a Rabbit Exchange that has no routing to the aforementioned (or any other) queue, therefore Rabbit drops messages, so you can't possibly be receiving messages originated from the producer until you create a route between the myOutput exchange and the myInput.myGroup queue. However, if you configure the input as I described above, spring-cloud-stream will also create an exchange called myInput which will be automatically routed to myInput.myGroup, so if you change your producer to send to that destination you will receive messages on the consumer.