0
votes

I'm using Spring Cloud Stream (Edgware.SR5) with Spring Boot (1.5.10.RELEASE). My @StreamListener is processing twice every message it receives.

The idea of the example is to publish a message in a queue and the process it.

Service:

@EnableBinding(ExampleBindings.class)
@Service
public class ExampleService {

    @Publisher(channel = ExampleBindings.OUTPUT)
    public String queue(String message){
        return message;
    }

    @StreamListener(ExampleBindings.INPUT)
    public void dequeue(String message){
        System.out.println("New message: " + message);
    }
}

Bindings:

public interface ExampleBindings {

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(ExampleBindings.INPUT)
    SubscribableChannel input();

    @Output(ExampleBindings.OUTPUT)
    MessageChannel output();
}

application.properties:

spring.cloud.stream.default.group=group1
spring.cloud.stream.default.binder=binder1

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

spring.cloud.stream.binders.binder1.type=rabbit
spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

Configuration (for injecting proxied service in the test):

@Configuration
public class ExampleConfig {

    @Bean
    public PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor(){
         PublisherAnnotationBeanPostProcessor publisherAnnotationBeanPostProcessor =
            new PublisherAnnotationBeanPostProcessor();
        publisherAnnotationBeanPostProcessor.setProxyTargetClass(true);
        return publisherAnnotationBeanPostProcessor;
    }
}

Test:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {

    @Autowired
    private ExampleService exampleService;

    @Test
    public void testQueue() throws InterruptedException {
        exampleService.queue("Hello!");
        Thread.sleep(1000);//Wait for message processing
        System.out.println("Ready!");
    }
}

I'm having the following output:

17:19:10.230 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b received message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.231 [dest1.group1-1] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
17:19:10.231 [dest1.group1-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=788e8bbf-4ae4-86cc-0859-d4f153cb5807, amqp_consumerTag=amq.ctag-fV0aaDzYUZfq08JsODq6pA, contentType=text/plain, timestamp=1547583550230}]
New message: Hello!
17:19:10.232 [dest1.group1-2] DEBUG o.s.c.s.b.StreamListenerMessageHandler - handler 'org.springframework.cloud.stream.binding.StreamListenerMessageHandler@575c3e9b' produced no reply for request Message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
17:19:10.232 [dest1.group1-2] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'input1', message: GenericMessage [payload=Hello!, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=dest1, amqp_receivedExchange=dest1, amqp_deliveryTag=1, amqp_consumerQueue=dest1.group1, amqp_redelivered=false, id=2f22ce16-bb5a-350c-8b3d-e6c898760888, amqp_consumerTag=amq.ctag-sxu6zQHJTGrsazfwbmol9Q, contentType=text/plain, timestamp=1547583550230}]
Ready!

I can't figure out what is the problem with my configuration or if it is some bug, any advice?

Thanks!

EDITED:

I uploaded a (non) working example here

You can create a RabbitMQ instance using:

docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management
3
You show Processor for input1/output1, but I see in logs input2. Does it say anything to you?Artem Bilan
I have edited manually the logs because I don't want to expose business logic, I will edit it.italktothewind
Now it's edited.italktothewind
OK. May we have some simple sample somewhere on GitHub to play with? ThanksArtem Bilan
Those messages are not the same. The content may be the same, but id, amqp_consumerTag etc are all different. So, indeed post a reproducible sample somewhere (you can exclude the business logic)Oleg Zhurakousky

3 Answers

1
votes

I detected that @Publisher was publishing twice because of the configuration in ExampleConfig. This new configuration (borrowed from here) seems to work fine:

@Bean
public static BeanFactoryPostProcessor bfpp() {
    return bf -> bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
        PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
}
1
votes

I was running my application in debug mode(intellij) due to which somehow the offset wasn't getting updated. Try running on run mode and it solved my problem.

0
votes

From the config, I think, you are trying to publish the same message again to the same destination dest_1.

spring.cloud.stream.bindings.input1.destination=dest_1
spring.cloud.stream.bindings.output1.destination=dest_1

And from the log it is clear, the 2nd message has a different ID

id=788e8bbf-4ae4-86cc-0859-d4f153cb5807
id=2f22ce16-bb5a-350c-8b3d-e6c898760888