
I am currently using Spring Cloud Stream with Kafka binders with a GlobalChannelInterceptor to perform message-logging for my Spring Boot microservices.

I have:

  1. a producer to publish messages to a SubscribableChannel
  2. a consumer to listen from the Stream (using the @StreamListener annotation)

Throughout the process when a message is published to the Stream from the producer and listened by the consumer, it is observed that the preSend method was triggered twice:

  1. Once at producer side - when the message is published to the Stream
  2. Once at consumer side - when the message is listened from the Stream

However, for my logging purposes, I only need to intercept and log the message at consumer side.

Is there any way to intercept the SCS message ONLY at one side (e.g. consumer side)?

I would appreciate any thoughts on this matter. Thank you!


  1. GlobalChannelInterceptor documentation - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html



public void sendToPushStream(PushStreamMessage message) {
        try {
            boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
        log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
        } catch (JsonProcessingException ex) {
            log.error("Unable to parse push stream message.", ex);

Producer's streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    SubscribableChannel pushStream();



public void handle(Message<PushStreamMessage> message) {
    log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);


Consumer's streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    SubscribableChannel pushStream();


Interceptor (Common Library)

public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {

    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
       log.info("presend " + msg);
        return msg;

    public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
        log.info("postSend " + msg);


1 Answers


Right, why don't follow GlobalChannelInterceptor options and don't apply

An array of simple patterns against which channel names will be matched.


So, you may have something like this:

@GlobalChannelInterceptor(patterns = Processor.INPUT)

Or use a custom name of input channel to your SCSt app.