We have a source like follows and we are using spring cloud stream rabbit binder 3.0.1.RELEASE.
@Component
public class Handlers {
private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();
public void emitData(String str){
sourceGenerator.onNext(str);
}
@Bean
public Supplier<Flux<String>> generate() {
return () -> sourceGenerator;
}
@Bean
public Function<String, String> process() {
return str -> str.toUpperCase();
}
}
application.yml
spring:
profiles: dev
cloud:
stream:
function:
definition: generate;process
bindings:
generate-out-0: source1
process-in-0: source1
process-out-0: processed
bindingServiceProperties:
defaultBinder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
While calling emitData
method, we are not seeing data in RabbitMQ queue.
We also observed that consumer binding is working. That we checked by means of direct sending of messages into a consumer linked queue through RabbitMQ Admin. But supplier binding is not working.
Also, we observed that Supplier
without Flux
is working fine with the same application.yml
configuration.
Are we missing any configuration here?
Even test case with TestChannelBinderConfiguration is working fine as follows.
@Slf4j
@TestPropertySource(
properties = {"spring.cloud.function.definition = generate|process"}
)
public class HandlersTest extends AbstractTest {
@Autowired
private OutputDestination outputDestination;
@Test
public void testGeneratorAndProcessor() {
final String testStr = "test";
handlers.emitData(testStr);
Object eventObj;
final Message<byte[]> message = outputDestination.receive(1000);
assertNotNull(message, "processing timeout");
eventObj = message.getPayload();
assertEquals(new String((byte[]) eventObj), testStr.toUpperCase());
}
}
emitData
? To be specific, how do you get reference toHandlers
? – Oleg Zhurakousky