0
votes

Using Spring Cloud DataFlow 1.3.0.M2 with Spring Cloud Stream Starters Celsius.M1.

I have two processors. First produces a List<Map> that is supposed to be consumed by the other. Here is simplified code.

// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
    final List<Map<String, Object>> results = worker.doWork(payload);
    LOG.debug("Returning " + results.size() + " objects");
    return results;
}

// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
    LOG.debug("Received " + payload.size() + " objects");
    final List<Map<String, Object>> results = worker.moreWork(payload);
    return results;
}

I am deploying these two processors in a pipeline using the SCDF shell:

<source> | otherProcessors | processor1 | processor2 | log

The debug message for Processor 1 says it has 2 objects in the List. Processor 2's debug message says it received 40 objects (each map has 20 key=value pairs) - it appears the two maps got flattened into one list of key=value pairs.

I have debug logging enabled for org.spring.integration and the message appears to have a list of maps format (this is from Processor 2):

preSend on channel 'input', message: GenericMessage 
  [payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
   {"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]

I'd like Processor 2 to receive the 2 maps produced by Processor 1. I wonder if this is something related to generic types. Can someone point me towards the configuration to make this happen?

---- Update for Artem's comments ----

Processor 1 has this in its application.properties file:

spring.cloud.stream.bindings.output.content-type=application/json

I had also tried modifying the stream definition like this but it didn't seem to make a difference:

<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
1
You know that doesn't look like JSON, but indeed as GenericMessage.toString(). Would you mind to share headers you have there on the matter? At least contentTypeArtem Bilan
And yes, what is input/output configuration do you have on those processors? I mean binding settingArtem Bilan
Updated post to answer questions, Artem, thank you.user944849
OK. Thanks for the update. How does it work if you don't use any JSON in between? I mean how is it with the standard Java (de)serialization?Artem Bilan
Removing the contentType settings lets the conversion work as expected. (Header in processor2's inbound message is contentType=application/x-java-object;type=java.util.ArrayList.) This solves my problem, thanks! I'm still curious as to why the contentType of JSON didn't work though, and what I would need to change if I had to make it work for some reason. Link to docs gratefully accepted; I searched before asking but may not have found the right section. I'm learning a bunch of new tools at once and sometimes just don't know which tool docs holds the answer I want.user944849

1 Answers

1
votes

Ok, you actually stumbled upon a bug :)

This has been fixed on the 2.0 branch, which is a bit unstable for the moment considering it's a snapshot.

Things should be better once we release in a few days.

Team is discussing the path forward on back porting the fix to 1.3 line.