0
votes

Support for sending HashMaps between Source/Processor/Sink through RabbitBinder seems changed in new SpringBoot (2.0.2).

the common parent pom.xml fro all modules is as follows:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>demo.stream</groupId>
   <artifactId>demo-stream-parent</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <modules>
      <module>source-module</module>
      <module>processor-module</module>
      <module>sink-module</module>
   </modules>
   <packaging>pom</packaging>
   <name>demo-stream</name>
   <description>Demo project for Spring Boot</description>
   <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.0.2.RELEASE</version>
      <relativePath />
      <!-- lookup parent from repository -->
   </parent>
   <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <java.version>1.8</java.version>
      <spring-cloud.version>Finchley.RC2</spring-cloud.version>
   </properties>
   <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-config</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-stream-reactive</artifactId>
      </dependency>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
      </dependency>
   </dependencies>
   <dependencyManagement>
      <dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
         </dependency>
      </dependencies>
   </dependencyManagement>
   <build>
      <plugins>
         <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
         </plugin>
      </plugins>
   </build>
   <repositories>
      <repository>
         <id>spring-milestones</id>
         <name>Spring Milestones</name>
         <url>https://repo.spring.io/milestone</url>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
   </repositories>
</project>

It is clearly stated the following:

If no content-type property is set on an outbound channel, Spring Cloud Stream will serialize the payload using a serializer based on the Kryo serialization framework. Deserializing messages at the destination requires the payload class to be present on the receiver’s classpath.

Given the rules it is expected to work with the standard java types such as "HashMap" without need for the custom message converter.

Could not get this simple code to work properly work between source and processor:

Source configuration and code:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-serialized-object'
          #content-type: 'application/json'
          content-type:
          destination: demo-stream-source-output



@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    //@InboundChannelAdapter(Source.OUTPUT)
    @Scheduled(fixedRate = 2000)
    public MessageSource<Map<String, Object>> timerMessageSource() {

            Map<String, Object> mapa = new HashMap<>();
            mapa.put("string", "string");
            mapa.put("string", "string");
            mapa.put("long", 1212121L);
            mapa.put("integer", 1212121);
            Map<String, Object> mapaInner = new HashMap<>();
            mapaInner.put("string", "string");
            mapaInner.put("string", "string");
            mapaInner.put("long", 1212121L);
            mapaInner.put("integer", 1212121);

            mapa.put("innerMapa", mapaInner);

            channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

From the debugger it clearly visible that the payload is converted into JSON string (ApplicationJsonMessageMarshallingConverter) and the content type header is set to "application/json".

This not what is expected although it is acceptable and legit in some circumstances. Expected is kryo serialized hashmap as byte array.

The debugger output is as follows:

this = {AbstractMessageChannel$ChannelInterceptorList@7877} 
 logger = {LogFactory$Log4jLog@7884} 
 interceptors = {CopyOnWriteArrayList@8890}  size = 1
  0 = {MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor@8991} 
   messageConverter = {CompositeMessageConverter@9043} "CompositeMessageConverter[converters=[org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@2863f519, org.springframework.cloud.stream.converter.TupleJsonMessageConverter@e9dffa1, org.springframework.messaging.converter.ByteArrayMessageConverter@cc64ad, org.springframework.cloud.stream.converter.ObjectStringMessageConverter@97df9ef, org.springframework.cloud.stream.converter.JavaSerializationMessageConverter@3e011ee4, org.springframework.cloud.stream.converter.KryoMessageConverter@5f808e72, org.springframework.cloud.stream.converter.JsonUnmarshallingConverter@3c0a7023]]"
   this$0 = {MessageConverterConfigurer@9044} 
   mimeType = null
   MessageConverterConfigurer$AbstractContentTypeInterceptor.this$0 = {MessageConverterConfigurer@9044} 
 size = 1
message = {GenericMessage@9169} "GenericMessage [payload=byte[117], headers={contentType=application/json;charset=UTF-8, id=e1b9b79e-5501-0ebe-178f-59ef5538192c, timestamp=1528647360089}]"
 payload = {byte[117]@9373} {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}
 headers = {MessageHeaders@9374}  size = 3
  0 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12201} "contentType" -> "application/json;charset=UTF-8"
  1 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12202} "id" -> "e1b9b79e-5501-0ebe-178f-59ef5538192c"
  2 = {Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntry@12203} "timestamp" -> "1528647360089"
channel = {DirectChannel@6940} "output"
interceptorStack = {ArrayDeque@8833}  size = 1

Processor configuration and code:

spring:
  application:
    name: test
  cloud:
    config:
      uri: http://blade1:8888
      name: scdf-tester
    stream:
      bindings:
        output:
          #content-type: 'application/x-java-object'
          destination: demo-stream-processor-output
        input:
          content-type: 'application/json;type=java.util.Map'
          destination: demo-stream-source-output

PROCESSOR CASE1: In "StreamListener" when Map is used as param "the best effort" logic gives a headers without payload

recv message: class org.springframework.messaging.MessageHeaders

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Map handle(final Map message) {
    LOG.info("recv message: {} {}", message.getClass(),message);
    //do some transformations...
    message.put("transformer_says","hello simon..");
    return message;
}

The log output is as follows:

  2018-06-10 18:55:51 [demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg-1] INFO  demostream.modules.DemoProcessor - recv message: class org.springframework.messaging.MessageHeaders {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.0EthCzZpTneSPp48retFQg, amqp_redelivered=false, id=671bc2d8-aaba-5ddb-b02b-f92a9dba3e0f, amqp_consumerTag=amq.ctag-ZnNk0O3vNs0yhOPKMTM3Fg, contentType=application/json;charset=UTF-8, timestamp=1528649751852}

Of course here is expected exception:

Caused by: java.lang.UnsupportedOperationException: MessageHeaders is immutable at org.springframework.messaging.MessageHeaders.put(MessageHeaders.java:249)

PROCESSOR CASE2: In "StreamListener" when Message is used as param "the best effort" logic gives a JSON representation of map not Original HashMap

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object handle(Message<?> message) {
    LOG.info("recv message: {}",message);
    String jsonMap = new String((byte[]) message.getPayload());
    LOG.info("jsonMap : {}", jsonMap);
    return jsonMap;
}

The log output is as follows:

2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - recv message: GenericMessage [payload=byte[117], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=demo-stream-source-output, amqp_receivedExchange=demo-stream-source-output, amqp_deliveryTag=38, deliveryAttempt=1, amqp_consumerQueue=demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ, amqp_redelivered=false, id=271ad47e-7b36-c01c-0c19-b3201297ddd7, amqp_consumerTag=amq.ctag-QIVzqanCxiNF-HXUTUVn7Q, contentType=application/json;charset=UTF-8, timestamp=1528649563755}]
2018-06-10 18:52:43 [demo-stream-source-output.anonymous.lt21i1ZXQvKZkOdB6hQwUQ-1] INFO  demostream.modules.DemoProcessor - jsonMap : {"string":"string","integer":1212121,"long":1212121,"innerMapa":{"string":"string","integer":1212121,"long":1212121}}

The previous version (1.5.9) and related stream cloud dependencies work without any problem.

  <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>1.5.9.RELEASE</version>
        <relativePath></relativePath>
    </parent>

Please can you give some advice regarding this behaviour.

Assuming that I am doing this wrong, simple question would be:

How to send HashMap payload using Kryo ser/der which is in my opinion better than JSON serialization (less inter-process overhead)

Thank you !

Best regards Ivan

1

1 Answers

0
votes

Solution to make kryo binary serialization/deserialization between source/processor is to use the following configuration:

This directive:

output.content-type: application/x-java-object

triggers kryo binary serialization (otherwise JSON is default).

Source (conf and code)

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: source-output
          content-type: application/x-java-object



   @EnableBinding(Source.class)
    public class SourceTester {
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000"))
        public Message<HashMap<String,Object>> source() {
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);
            return MessageBuilder.withPayload(mapa).build();
        }
    }

Or another approach:

@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableScheduling
@Component
public class DemoSource {

    @Autowired
    private Source channels;

    @Scheduled(fixedRate = 5000)
    public MessageSource<Map<String, Object>> timerMessageSource() { 
            HashMap<String,Object> mapa = new HashMap<>();
            mapa.put("foo","bar");
            mapa.put("bar",1337);    
        channels.output().send(MessageBuilder.withPayload(mapa).build());
        }

}

Processor (conf and code)

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: source-output
            output:
              destination: processor-output
              content-type: application/x-java-object



   @EnableBinding(Processor.class)
    @EnableAutoConfiguration
    public class ProcessorTester {
        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        protected Message<HashMap> process(Message<HashMap> input){
            input.getPayload().put("Processor", "was here");
            return input;
        }
    }

Conclusion:

The processor handler (transformer) with the "type parameter HashMap" and Message is needed to trigger kryo deserialization of byte[] to HashMap automatically:

protected Message<HashMap> process(Message<HashMap> input)

If output content type directive is disabled (Source module or any other producer)

 spring:
      cloud:
        stream:
          bindings:
            output:
              destination: source-output
              ###content-type: application/x-java-object

Default serialization is HashMap to JSON string.

Best regards,

Ivan