3
votes

I have a Spring Boot application which has problems retrieving JMS messages of type TextMessage from an ActiveMQ broker.

If the consumer tries to retrieve messages from the broker it cannot automatically convert a message to TextMessage but treats it as ByteMessage. There is a JmsListener which should read the messages from the queue as TextMessage:

...
@JmsListener(destination = "foo")
public void jmsConsumer(TextMessage message) {
...

The JmsListener produces warnings like the following, and drops the messages:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Listener method could not be invoked with incoming message
Endpoint handler details:
Method [public void net.aschemann.demo.springboot.jmsconsumer.JmsConsumer.jmsConsumer(javax.jms.TextMessage)]
Bean [net.aschemann.demo.springboot.jmsconsumer.JmsConsumer@4715f07]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [javax.jms.TextMessage] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298, failedMessage=org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@7c49d298
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:118) ~[spring-jms-5.1.4.RELEASE.jar:5.1.4.RELEASE]

I have extracted a small sample application to debug the problem: https://github.com/ascheman/springboot-camel-jms

The producer in real life is a commercial application which makes use of Apache Camel. Hence, I can hardly change/customize the producer. I have tried to build a sample producer which shows the same behavior.

Could I somehow tweak the consumer to treat the message as TextMessage?

Besides: Is there any way to retrieve the additional AMQP properties from the message programmatically directly in Spring? Of course, I could still read the message as ByteMessage and try to parse properties away. But I am looking for a cleaner way which is backed by any Spring API. The Spring @Headers annotation didn't help so far.

3
You are receiving a byte[] and that by default gets converted to a ByteMessage. I would suggest to either just receive the byte[] and not message with specific JMS typed messages. Try String instead of TextMessage. Properties aren't headers (AFAIK) so not sure where they get mapped to in this scenario. If you really want to use a TextMessage write a custom message converter to convert a byte[] into a TextMessage.M. Deinum
Thanks, but the problem is not about changing the type: I can get the byte[] contents as well as a String. However, in this case the message also contains some additional data - so called properties (check out the mentioned AMQP spec. for details). I am interested in the genuine text message which was originally sent by the producer. I would not like to parse it from the byte[] or String object myself. I expect to get a correct JMS object if the used transport is JMS compatible, not a number of bytes which I need to handle myself.Gerd Aschemann
But you aren't using JMS as the transport, that is the main problem as you are using AMQP. You are basically tunneling JMS through AMQP (which is different from JMS). So messages are adapted and the TextMessage will get converted to something that AMQP understands, which if you look at the Java API is a byte[]. Fact is you aren't using JMS but rather AMQP which you try to shoehorn into JMS. Why not simply use AMQP instead?M. Deinum
Is this really AMQP, or ActiveMQ (AMQ)? You can receive Message<?> instead, which is a spring-messaging abstraction over multiple transports (including JMS). It has a payload and headers (which are mapped from the JMS message).Gary Russell
To avoid people wasting their time, this question was also cross-posted as a Spring Boot issue. My suspicion is that it's a broker configuration problem with the default native mapping being used rather than the desired jms mapping. See activemq.apache.org/amqp.html for details on the various mapping options.Andy Wilkinson

3 Answers

0
votes

I ever faced the same issue with the question owner, after I followed the comment from @AndyWilkinson by adding transport.transformer option on the transportConnector in activemq.xml as the following, the issue is solved.

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
0
votes

I had the same error, and it was caused because LazyResolutionMessage is called from MessagingMessageConverter that is the default implementation to MessageConverter, which converts your message (actually it doesn't, since it's the default):

return ((org.springframework.messaging.Message) payload).getPayload();

I have accomplished what you want, at the end my consumer was working like:

@JmsListener(destination = "${someName}")
public void consumeSomeMessages(MyCustomEvent e) {
  ....
}

What I had to do was:

@Bean(name = "jmsListenerContainerFactory")
public DefaultJmsListenerContainerFactory whateverNameYouWant(final ConnectionFactory genericCF) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> log.error("bad consumer, bad", t));
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    factory.setConnectionFactory(genericCF);
    factory.setMessageConverter(
            new MessageConverter() {
                @Override
                public Message toMessage(Object object, Session session) {
                    throw new UnsupportedOperationException("since it's only for consuming!");
                }

                @Override
                public MyCustomEvent fromMessage(Message m) {
                    try {
                      // whatever transformation you want here...
                      // here you could print the message, try casting,
                      // building new objects with message's attributes, so on...
                      // example:
                      return (new ObjectMapper()).readValue(((TextMessage) m).getText(), MyCustomEvent.class);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
    );
    return factory;
}

A few keypoints:

  1. If your DefaultJmsListenerContainerFactory method is also called jmsListenerContainerFactory you don't need name attribute at Bean annotation

  2. Notice you can also implement an ErrorHandler to deal with exceptions when trying to convert/cast your message's type!

  3. ConnectionFactory was a Spring managed bean with Amazon's SQSConnectionFactory since I wanted to consume from a SQS queue. Please provide your equivalent correctly. Mine was:

    @Bean("connectionFactory")
    public SQSConnectionFactory someOtherNome() {
        return new SQSConnectionFactory(
                new ProviderConfiguration(),
                AmazonSQSClientBuilder.standard()
                        .withRegion(Regions.US_EAST_1)
                        .withCredentials(
                                new AWSStaticCredentialsProvider(
                                        new BasicAWSCredentials(
                                                "keyAccess",
                                                "keySecret"
                                        )
                                )
                        )
                        .build()
        );
    }
    
-1
votes

If you have a problem with conversion from byte[] to String use:

.convertBodyTo(String.class)

Route example:

from(QUEUE_URL)
                .routeId("consumer")
                .convertBodyTo(String.class)
                .log("${body}")
                .to("mock:mockRoute");