I want my application to read something from the "messages" queue in RabbitMQ and respond with "pong" to the same queue if the message was "ping". I have tried two things so far:
A Consumer and Supplier connected with a queue, but I always get "Can NOT compose anything with Consumer" when the application starts.
spring:
cloud:
function:
definition: ping,pong
stream:
bindings:
ping-in-0:
destination: messages
pong-out-0:
destination: messages
rabbitmq:
username: rabbitmq
password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {
public static void main(String[] args) {
SpringApplication.run(SpringStreamPingPongApplication.class, args);
}
@Bean
public Consumer<String> ping() {
return input -> {
System.out.println("Received: " + input);
if (input.equals("ping"))
outQueue.add("pong");
};
}
private Queue<String> outQueue = new LinkedList<String>();
@Bean
public Supplier<String> pong() {
return () -> {
System.out.println("poll");
return outQueue.poll();
};
}
}
This works, when the input and output queues are different, but my requirement is them to be the same.
A java.util.Function that returns null if the condition is not met.
spring:
cloud:
function:
definition: pingPong
stream:
bindings:
pingPong-in-0:
destination: messages
pingPong-out-0:
destination: messages
rabbitmq:
username: rabbitmq
password: rabbitmq
@SpringBootApplication
public class SpringStreamPingPongApplication {
public static void main(String[] args) {
SpringApplication.run(SpringStreamPingPongApplication.class, args);
}
@Bean
public Function<String, String> pingPong() {
return input -> {
System.out.println("Received: " + input);
if (input.equals("ping"))
return "pong";
return null;
};
}
}
When something that is not "ping" is received I see the message "Received: ..." three times and the it fails with
2021-02-14 10:21:11.103 ERROR 382793 --- [fGaUNiBJ_Uu8A-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@22f545cd]; nested exception is java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null, failedMessage=GenericMessage [payload=byte[6], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=messages, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A, amqp_redelivered=false, id=db87cc60-6ec0-a8cc-4862-115c31a7f9b2, amqp_consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, sourceData=(Body:'[B@5119ad84(byte[6])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=messages, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-5BBJ5nSfxegSvLHbqRlLug, consumerQueue=messages.anonymous.JP2O2DzUTfGaUNiBJ_Uu8A]), contentType=application/json, timestamp=1613294468100, target-protocol=amqp}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getHeaders()" because "requestMessage" is null
at org.springframework.messaging.core.GenericMessagingTemplate.sendTimeout(GenericMessagingTemplate.java:252)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractDestinationResolvingMessagingTemplate.send(AbstractDestinationResolvingMessagingTemplate.java:72)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:585)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:571)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 27 more
What's the correct way to return nothing?
Are there any other solutions I am not seeing?