1
votes

I have two components (let's call them producer and consumer) connected to the same RabbitMQ topic-based exchange.

The producer can send two different message types; Foo and Bar(contents of each message is irrelevant, but let's just say that they both have an id field). The routing key used for each message is msg.foo and msg.bar respectively. Rather than relying on the default Java serialization, the producer is using the Jackson2JsonMessageConverter.

The consumer has a queue which is bound to the same exchange with a routing key of msg.#. Once consumed, all the consumer will want to do is print the id of each message in a log file. In order to retrieve the value for the id field, the JSON payload needs to be transformed into some sort of object.

The message classes (Foo and Bar) are not shared between the two components. The consumer's objects may have more or less fields in its representation of the message. This is fine, in that case any empty fields may be set to null.

Is there an elegant way to convert/serialize these messages from JSON to objects of type Foo and Bar? The only solution I can come up with is to manually write code which reads the amqp_receivedRoutingKey or the json__TypeId__ headers in order to determine the object type. Example:

@Bean
public IntegrationFlow inbound(ConnectionFactory connectionFactory, Queue queue) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queue))
        .handle(message -> {
            final byte[] payload = (byte[]) message.getPayload();
            final String payloadStr = new String(payload, Charset.defaultCharset());

            final String routingKey = (String) message.getHeaders().get("amqp_receivedRoutingKey");

            try {
                if (routingKey.equals("msg.foo")) {
                    final Foo foo = new Jackson2JsonObjectMapper().fromJson(payloadStr, Foo.class);

                    System.out.println("FOO id: " + foo.getId());
                } else if (routingKey.equals("msg.bar")) {
                    final Bar bar = new Jackson2JsonObjectMapper().fromJson(payloadStr, Bar.class);

                    System.out.println("BAR id: " + bar.getId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        })
        .get();
}

Unfortunately is quite flaky and also quite ugly due to the repeated if/else clauses. Are there any Spring Integration tricks I am missing which would make my code more readable?

I managed to find a similar question (spring boot rabbitmq MappingJackson2MessageConverter custom object conversion) but apart from the solution not working, it is very RabbitMQ specific. I would rather be RabbitMQ agnostic and use standard AMQP classes/imports where possible.

1

1 Answers

0
votes

It looks like you are missing the fact that Jackson2JsonMessageConverter populates appropriate headers in the AMQP message. And this one can reinstate POJO on the consumer side relying on those headers.

So, consider to configure your Amqp.inboundAdapter(connectionFactory, queue) with a Jackson2JsonMessageConverter as well.

See more info about converter in the Docs: https://docs.spring.io/spring-amqp/docs/2.2.0.RC1/reference/html/#json-message-converter