0
votes

I want my application to read something from the "messages" queue in RabbitMQ and respond with "pong" to the same queue if the message was "ping". I have tried two things so far:

A Consumer and Supplier connected with a queue, but I always get "Can NOT compose anything with Consumer" when the application starts.

    spring:
      cloud:
        function:
          definition: ping,pong
        stream:
          bindings:
            ping-in-0:
              destination: messages
            pong-out-0:
              destination: messages        
      rabbitmq:
        username: rabbitmq
        password: rabbitmq
    @SpringBootApplication
    public class SpringStreamPingPongApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringStreamPingPongApplication.class, args);
        }
    
        @Bean
        public Consumer<String> ping() {
            return input -> {
                System.out.println("Received: " + input);
                if (input.equals("ping"))
                    outQueue.add("pong");
            };
        }
        
        private Queue<String> outQueue = new LinkedList<String>();
        
        @Bean
        public Supplier<String> pong() {
            return () -> {
                System.out.println("poll");
                return outQueue.poll();
            };
        }
    
    }

This works, when the input and output queues are different, but my requirement is them to be the same.

A java.util.Function that returns null if the condition is not met.

spring:
  cloud:
    function:
      definition: pingPong
    stream:
      bindings:
        pingPong-in-0:
          destination: messages
        pingPong-out-0:
          destination: messages        
  rabbitmq:
    username: rabbitmq
    password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringStreamPingPongApplication.class, args);
    }

    @Bean
    public Function<String, String> pingPong() {
        return input -> {
            System.out.println("Received: " + input);
            if (input.equals("ping"))
                return "pong";
            return null;
        };
    }

}

When something that is not "ping" is received I see the message "Received: ..." three times and the it fails with

2021-02-14 10:21:11.103 ERROR 382793 --- [fGaUNiBJ_Uu8A-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@22f545cd]; nested exception is java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null, failedMessage=GenericMessage [payload=byte[6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=messages, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A, amqp_redelivered=false, id=db87cc60-6ec0-a8cc-4862-115c31a7f9b2, amqp_consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, sourceData=(Body:'[B@5119ad84(byte[6])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=messages, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A]), contentType=application/json, timestamp=1613294468100, target-protocol=amqp}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null
    at org.springframework.messaging.core.GenericMessagingTemplate.sendTimeout(GenericMessagingTemplate.java:252)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractDestinationResolvingMessagingTemplate.send(AbstractDestinationResolvingMessagingTemplate.java:72)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:585)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:571)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 27 more

What's the correct way to return nothing?

Are there any other solutions I am not seeing?

1

1 Answers

1
votes

First, you have definition: ping,pong which is equivalent to definition: ping|pong - which is function composition. You are composing a single function from two and since Consumer does not produce any output it can not be composed with anything, It is the end of the line, hence the message you see. If you want to identify two+ functions as message handlers you should use ;. For example definition: ping;pong. You can get more information on this here.

If input and output is the same destination you would be creating an infinite loop, so it is hard for me to assume that is what you want or expect. Perhaps explain you business requirement so we can help you to orchestrate the right solution. Saying "...read something from the "messages" queue in RabbitMQ and respond with "pong" to the same queue if the message was "ping"..." is not a business requirement, rather an assumed solution which seems to be wrong.