I am currently using Spring Cloud Stream with Kafka binders with a GlobalChannelInterceptor to perform message-logging for my Spring Boot microservices.
I have:
- a producer to publish messages to a SubscribableChannel
- a consumer to listen from the Stream (using the @StreamListenerannotation)
Throughout the process when a message is published to the Stream from the producer and listened by the consumer, it is observed that the preSend method was triggered twice:
- Once at producer side - when the message is published to the Stream
- Once at consumer side - when the message is listened from the Stream
However, for my logging purposes, I only need to intercept and log the message at consumer side.
Is there any way to intercept the SCS message ONLY at one side (e.g. consumer side)?
I would appreciate any thoughts on this matter. Thank you!
Ref:
- GlobalChannelInterceptordocumentation - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html
EDIT
Producer
public void sendToPushStream(PushStreamMessage message) {
        try {
            boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
        log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
        } catch (JsonProcessingException ex) {
            log.error("Unable to parse push stream message.", ex);
        }
    }
Producer's streamChannel
public interface StreamChannel {
    String PUSH_STREAM = "PushStream";
    @Output(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();
}
Consumer
@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
    log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);
}
Consumer's streamChannel
public interface StreamChannel {
    String PUSH_STREAM = "PushStream";
    @Input(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();
}
Interceptor (Common Library)
public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {
    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
       log.info("presend " + msg);
        return msg;
    }
    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
        log.info("postSend " + msg);
    }
}