1
votes

In new versions of Spring Cloud Stream @EnableBinding and declarative programming style will be deprecated. With functional programming style, how can I handle processed with pipe messages?

First case

I need to process message, that send to out channel, like logging success processed, or store them to database.

@EnableBinding(Processor.class)
public class MessageProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);
    private final MessageChannel output;

    public MessageProcessor(MessageChannel output) {
        this.output = output;
    }

    @StreamListener(Processor.INPUT)
    public void process(Message<String> message) {
        LOGGER.info("Receive message: {}", message);
        output.send(message);
        /*Do some work with message here*/
        LOGGER.info("Finish processing for message: {}", message);
    }
}

Second case

I receive a message, that store collection of DTO, and I need to process each DTO object separately.

@EnableBinding(Processor.class)
public class BatchMessageProcessor {
    private final MessageChannel output;

    public BatchMessageProcessor(MessageChannel output) {
        this.output = output;
    }

    @StreamListener(Processor.INPUT)
    public void process(Message<PackageDto> pgk) {
        Stream.ofNullable(pgk)
                .map(Message::getPayload)
                .map(PackageDto::getMessages)
                .flatMap(Collection::stream)
                .filter(Objects::nonNull)
                /*Sent messages separately*/
                .forEach(m -> output.send(MessageBuilder.withPayload(m).build()));
    }
}

How I can do this cases with funciton programming style in Spring Cloud Stream?

1

1 Answers

1
votes

Use a StreamBridge or reactor EmitterProcessor.

See Sending arbitrary data to an output.