Using Spring Cloud DataFlow 1.3.0.M2 with Spring Cloud Stream Starters Celsius.M1.
I have two processors. First produces a List<Map>
that is supposed to be consumed by the other. Here is simplified code.
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
I am deploying these two processors in a pipeline using the SCDF shell:
<source> | otherProcessors | processor1 | processor2 | log
The debug message for Processor 1 says it has 2 objects in the List. Processor 2's debug message says it received 40 objects (each map has 20 key=value pairs) - it appears the two maps got flattened into one list of key=value pairs.
I have debug logging enabled for org.spring.integration
and the message appears to have a list of maps format (this is from Processor 2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
I'd like Processor 2 to receive the 2 maps produced by Processor 1. I wonder if this is something related to generic types. Can someone point me towards the configuration to make this happen?
---- Update for Artem's comments ----
Processor 1 has this in its application.properties
file:
spring.cloud.stream.bindings.output.content-type=application/json
I had also tried modifying the stream definition like this but it didn't seem to make a difference:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
GenericMessage.toString()
. Would you mind to shareheaders
you have there on the matter? At leastcontentType
– Artem Bilaninput/output
configuration do you have on those processors? I meanbinding
setting – Artem BilancontentType=application/x-java-object;type=java.util.ArrayList
.) This solves my problem, thanks! I'm still curious as to why the contentType of JSON didn't work though, and what I would need to change if I had to make it work for some reason. Link to docs gratefully accepted; I searched before asking but may not have found the right section. I'm learning a bunch of new tools at once and sometimes just don't know which tool docs holds the answer I want. – user944849