Being new to Spring Reactor, I am trying to stream the data using Spring cloud stream(using rabbitMQ). I need to add some custom headers before the message is sent to the queue.
My spring-cloud-stream's configuration is:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
Producer reference:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
Headers are being added to the Message successfully, but it's getting overridden somewhere and not getting propagated to the consumer.
Could someone please share their thoughts on how we can add custom headers to a Message using Spring Cloud Stream.
Thanks in advance!