3
votes

Since Spring Cloud Stream has not an annotation for sending a new message to a stream (@SendTo only works when @StreamListener is declared), I tried to use Spring Integration annotation for that purpose, that is @Publisher.

Because @Publisher takes a channel and @EnableBinding annotations of Spring Cloud Stream can bind an output channel using @Output annotation, I tried to mix them in the following way:

@EnableBinding(MessageSource.class)
@Service
public class ExampleService {

    @Publisher(channel = MessageSource.OUTPUT)
    public String sendMessage(String message){
        return message;
    }
}

Also, I declared @EnablePublisher annotation in a configuration file:

@SpringBootApplication
@EnablePublisher("")
public class ExampleApplication {

    public static void main(String[] args){
        SpringApplication.run(ExampleApplication.class, args);
    }
}

My test:

@RunWith(SpringRunner.class)
@SpringBootTest
public class ExampleServiceTest {

    @Autowired
    private ExampleService exampleService;

    @Test
    public void testQueue(){
        exampleService.queue("Hi!");
        System.out.println("Ready!");
    }
}

But I'm getting the following error:

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'com.example.ExampleServiceTest': Unsatisfied dependency expressed through field 'exampleService'; nested exception is 
org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'exampleService' is expected to be of type 'com.example.ExampleService' but was actually of type 'com.sun.proxy.$Proxy86'

Problem here is that ExampleService bean can not be injected.

Anyone knows how can I make this work?

Thanks!

2

2 Answers

2
votes

Since you use a @Publisher annotation in your ExampleService, it is proxied for that publishing stuff.

Only the way to overcome the issue is to expose an interface for your ExampleService and inject already that one into your test class:

public interface ExampleServiceInterface {

     String sendMessage(String message);

}

...

public class ExampleService implements ExampleServiceInterface {

...


@Autowired
private ExampleServiceInterface exampleService;

On the other hand it looks like your ExampleService.sendMessage() does nothing with the message, so you may consider to use a @MessagingGateway on some interface instead: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway

1
votes

Why not just send the message to a stream manually, like below.

@Component
@Configuration
@EnableBinding(Processor.class)
public class Sender {

    @Autowired
    private Processor processor;

    public void send(String message) {

        processor.output().send(MessageBuilder.withPayload(message).build());

    }

}

You can test it through the tester.

@SpringBootTest
public class SenderTest {

    @Autowired
    private MessageCollector messageCollector;

    @Autowired
    private Processor processor;

    @Autowired
    private Sender sender;

    @SuppressWarnings("unchecked")
    @Test
    public void testSend() throws Exception{

        sender.send("Hi!");
        Message<String> message = (Message<String>) this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
        String messageData = message.getPayload().toString();
        System.out.println(messageData);

    }

}

You should see "Hi!" in the console.