Support for sending HashMaps between Source/Processor/Sink through RabbitBinder seems changed in new SpringBoot (2.0.2).
the common parent pom.xml fro all modules is as follows:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>demo.stream</groupId>
<artifactId>demo-stream-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<modules>
<module>source-module</module>
<module>processor-module</module>
<module>sink-module</module>
</modules>
<packaging>pom</packaging>
<name>demo-stream</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath />
<!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RC2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>
It is clearly stated the following:
If no content-type property is set on an outbound channel, Spring Cloud Stream will serialize the payload using a serializer based on the Kryo serialization framework. Deserializing messages at the destination requires the payload class to be present on the receiver’s classpath.
Given the rules it is expected to work with the standard java types such as "HashMap" without need for the custom message converter.
Could not get this simple code to work properly work between source and processor:
Source configuration and code:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-serialized-object'
#content-type: 'application/json'
content-type:
destination: demo-stream-source-output
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {
@Autowired
private Source channels;
//@InboundChannelAdapter(Source.OUTPUT)
@Scheduled(fixedRate = 2000)
public MessageSource<Map<String, Object>> timerMessageSource() {
Map<String, Object> mapa = new HashMap<>();
mapa.put("string", "string");
mapa.put("string", "string");
mapa.put("long", 1212121L);
mapa.put("integer", 1212121);
Map<String, Object> mapaInner = new HashMap<>();
mapaInner.put("string", "string");
mapaInner.put("string", "string");
mapaInner.put("long", 1212121L);
mapaInner.put("integer", 1212121);
mapa.put("innerMapa", mapaInner);
channels.output().send(MessageBuilder.withPayload(mapa).build());
}
}
From the debugger it clearly visible that the payload is converted into JSON string (ApplicationJsonMessageMarshallingConverter) and the content type header is set to "application/json".
This not what is expected although it is acceptable and legit in some circumstances. Expected is kryo serialized hashmap as byte array.
The debugger output is as follows:
this = {AbstractMessageChannel$ChannelInterceptorList@7877}
logger = {LogFactory$Log4jLog@7884}
interceptors = {CopyOnWriteArrayList@8890} size = 1
0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991}
messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
this$0 = {MessageConverterConfigurer@9044}
mimeType = null
MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044}
size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
headers = {MessageHeaders@9374} size = 3
0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833} size = 1
Processor configuration and code:
spring:
application:
name: test
cloud:
config:
uri: http://blade1:8888
name: scdf-tester
stream:
bindings:
output:
#content-type: 'application/x-java-object'
destination: demo-stream-processor-output
input:
content-type: 'application/json;type=java.util.Map'
destination: demo-stream-source-output
PROCESSOR CASE1: In "StreamListener" when Map is used as param "the best effort" logic gives a headers without payload
recv message: class org.springframework.messaging.MessageHeaders
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
LOG.info("recv message: {} {}", message.getClass(),message);
//do some transformations...
message.put("transformer_says","hello simon..");
return message;
}
The log output is as follows:
2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}
Of course here is expected exception:
Caused by: java.lang.UnsupportedOperationException: MessageHeaders is immutable at org.springframework.messaging.MessageHeaders.put(MessageHeaders.java:249)
PROCESSOR CASE2: In "StreamListener" when Message is used as param "the best effort" logic gives a JSON representation of map not Original HashMap
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
LOG.info("recv message: {}",message);
String jsonMap = new String((byte[]) message.getPayload());
LOG.info("jsonMap : {}", jsonMap);
return jsonMap;
}
The log output is as follows:
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
The previous version (1.5.9) and related stream cloud dependencies work without any problem.
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>1.5.9.RELEASE</version>
<relativePath></relativePath>
</parent>
Please can you give some advice regarding this behaviour.
Assuming that I am doing this wrong, simple question would be:
How to send HashMap payload using Kryo ser/der which is in my opinion better than JSON serialization (less inter-process overhead)
Thank you !
Best regards Ivan