3
votes

I'm using Spring Cloud Stream with Spring Boot. My application is very simple:

ExampleService.class:

@EnableBinding(Processor1.class)
@Service
public class ExampleService {

    @StreamListener(Processor1.INPUT)
    @SendTo(Processor1.OUTPUT)
    public String dequeue(String message){
        System.out.println("New message: " + message);
        return message;
    }

    @SendTo(Processor1.OUTPUT)
    public String queue(String message){
        return message;
    }
}

Procesor1.class:

public interface Processor1 {

    String INPUT = "input1";
    String OUTPUT = "output1";

    @Input(Processor1.INPUT)
    SubscribableChannel input1();

    @Output(Processor1.OUTPUT)
    MessageChannel output1();
}

application.properties:

spring.cloud.stream.bindings.input1.destination=test_input
spring.cloud.stream.bindings.input1.group=test_group
spring.cloud.stream.bindings.input1.binder=binder1

spring.cloud.stream.bindings.output1.destination=test_output
spring.cloud.stream.bindings.output1.binder=binder1

spring.cloud.stream.binders.binder1.type=rabbit

spring.cloud.stream.binders.binder1.environment.spring.rabbitmq.host=localhost

Scenarios:

1) When I push a message in 'test_input.test_group' queue, message is correctly printed and correctly sent to 'test_output' exchange. So ExampleService::dequeue works well.

2) When I invoke ExampleService::queue method (from outside the class, in a test), message is never sent to 'test_output' exchange.

I'm working with Spring Boot 2.0.6.RELEASE and Spring Cloud Stream 2.0.2.RELEASE.

Anybody knows why scenario 2) is not working? Thanks in advance.

1
I have the same issueJorge Machado

1 Answers

4
votes

What leads you to believe that @SendTo on its own is supported? @SendTo is a secondary annotation used by many projects, not just Spring Cloud Stream; as far as I know, there is nothing that will look for it on its own.

Try Spring Integration's @Publisher annotation instead (with @EnablePublisher).

EDIT

To force proxying with CGLIB instead of a JDK proxy, you can do this...

@Bean
public static BeanFactoryPostProcessor bfpp() {
    return bf -> {
        bf.getBean(IntegrationContextUtils.PUBLISHER_ANNOTATION_POSTPROCESSOR_NAME,
                PublisherAnnotationBeanPostProcessor.class).setProxyTargetClass(true);
    };
}