2
votes

I'm trying to run this (https://github.com/cloudfoundry-samples/wgrus/tree/master/spring-integration) wgrus sample application using spring integration v2.2.6.RELEASE and keep getting a MessageTransformationException. The application is split into 3 components - a store UI, an inventory service and a shipping service (this service is irrelevant here) and uses the AMQP protocol to send messages to RabbitMQ. In brief, you enter order details in the UI and send them to an order channel in the form of a message. Behind the scenes the application performs a claim check, which stores the message in Mongo (I can see that it gets stored) and returns a new message, whose payload is the id of the stored message - so far so good, the message is stored and I can see the amqpOut channel returning the Id of the stored message, i.e. 826bcbfb-21fa-424d-aecd-0bab3d1a690b - this can be seen in the debug shown at the bottom of my question. Next, the message is sent to RabbitMQ through an outbound AMQP channel and is supposed to be picked up by the inventory service and at this point the exception is thrown. I can see that at some point something happens with the id of the stored message as the debug prints Payload=???sr?java.util.UUID????m?/

Has anyone come across this issue and knows how to resolve it?

Config used by the store UI:

<int:object-to-json-transformer input-channel="orderChannel" output-channel="jsonOrders" content-type="text/x-json"/>

<int:claim-check-in input-channel="jsonOrders" output-channel="amqpOut" message-store="messageStore"/>

<amqp:outbound-channel-adapter id="amqpOut" amqp-template="rabbitTemplate" routing-key="orders"/>
...

Config used by the inventory service:

<int:claim-check-out message-store="messageStore" input-channel="orderChannel" output-channel="inventoryChannel" remove-message="true" />

<amqp:inbound-channel-adapter channel="orderChannel"
  connection-factory="rabbitConnectionFactory"
  queue-names="orders"
  error-channel="errorLogger" />

<int:logging-channel-adapter id="errorLogger" log-full-message="true" level="INFO"/>

<int:chain input-channel="inventoryChannel" output-channel="amqpOut">
  <int:json-to-object-transformer type="org.wgrus.Order"/>
  <int:enricher request-channel="creditCheck">
    <int:property name="approved" expression="payload.startsWith('OK')"/>
  </int:enricher>
  <int:enricher request-channel="inventoryRouter">
    <int:property name="reserved" expression="payload"/>
  </int:enricher>
  <int:object-to-json-transformer content-type="text/x-json" />
  <int:claim-check-in/>
</int:chain>
...

DEBUG:

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'amqpOut', message: [Payload=826bcbfb-21fa-424d-aecd-0bab3d1a690b][Headers={timestamp=1384617973920, id=e98cd0c1-bd8c-4786-be31-1e77b0200934, content-type=text/x-json}]

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'jsonOrders', message: [Payload={"id":1,"approved":false,"reserved":false,"customerId":"","quantity":1,"productId":"widget"}][Headers={timestamp=1384617973852, id=826bcbfb-21fa-424d-aecd-0bab3d1a690b, content-type=text/x-json}]

DEBUG: http-bio-8080-exec-3 org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'orderChannel', message: [Payload=Order #1: 1 widgets for ][Headers={timestamp=1384617973824, id=1b700a4b-35e1-4d16-8ca0-7cd20ccfb85e}]

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[content-type] WILL be mapped, matched pattern=content-type

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=amqp_receivedRoutingKey

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=amqp_deliveryMode

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=amqp_redelivered

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=amqp_deliveryTag

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper - headerName=[content-type] WILL be mapped, matched pattern=content-type

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.channel.DirectChannel - preSend on channel 'orderChannel', message: [Payload=???sr?java.util.UUID????m?/?J?leastSigBitsJ?mostSigBitsxp???=i?k??!?][Headers={timestamp=1384617974260, id=81a2fb77-0f1e-4be7-9148-84da86a30ed8, content-type=text/x-json, amqp_receivedRoutingKey=orders, amqp_deliveryMode=PERSISTENT, amqp_redelivered=false, amqp_deliveryTag=1}]

DEBUG: SimpleAsyncTaskExecutor-1 org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#1 received message: [Payload=???sr?java.util.UUID????m?/?J?

1

1 Answers

3
votes

Well, your issue is here:

<int:object-to-json-transformer input-channel="orderChannel" output-channel="jsonOrders" content-type="text/x-json"/>

Can you explain what is the reason to set content-type, if you after that use <claim-check-in>, who just return UUID - id of the stored Message?

And what is happened?

Your UUID is convert to serialized bytes by SimpleMessageConverter and this converter sets contentType as application/x-java-serialized-object to the MessageProperties. But after that is called AmqpHeaderMapper, who changes contentType with your value text/x-json from MessageHeaders. That's fine for Producer.

But Consumer can't convert the Body correctly, because here also works SimpleMessageConverter by default. And it checks contentType.startsWith("text"). And just create simple String from bytes of serialized UUID.

Hope it is clear

UPDATE

Unfortunately <object-to-json-transformer> sets content-type header anyway, to application/json by default.

To prevent it you should configure like this content-type="".