0
votes

I wrote a custom processor similar to this one. In particular, the processor takes an InputDto and returns json. Following guidance in the Q&A, my custom processor has an application.properties file with this content:

spring.cloud.stream.bindings.input.content-type=application/x-java-object;com.company.InputDto
spring.cloud.stream.bindings.output.content-type=application/json

From this Q&A I created a spring.integration.properties file with this line:

spring.integration.readOnly.headers=contentType

And I have working automated integration tests. So far so good.

I created a stream in the SCDF shell including my processor. time and httpclient are working well, so I didn't show detailed args for those. Added newlines here & throughout for readability.

stream create --name test --definition "time <args> 
  | httpclient <args> 
  | splitter --expression=\"#jsonPath(payload,'$..[0:]')\" 
  | myprocessor 
  | log"

I have debug logging enabled. httpclient produces messages with contentType=text/plain and payload:

payload=[{"name":"first","url":"url1"},{"name":"second","url":"url2"}]

The splitter creates two messages like this, per logs (as expected):

message: GenericMessage [payload={name=first, url=url1}, 
  headers={...contentType=text/plain, ... }]

message: GenericMessage [payload={name=second, url=url2}, 
  headers={...contentType=text/plain, ... }]

And the custom processor I wrote fails with this exception:

org.springframework.messaging.converter.MessageConversionException: Cannot 
convert from [java.util.LinkedHashMap] to [com.company.InputDto] for 
GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, 
   kafka_offset=xx, sequenceSize=x, correlationId=yy, id=zzz, 
   kafka_receivedPartitionId=0, contentType=text/plain, 
   kafka_receivedTopic=test.splitter, timestamp=1503591622851}]
    at 
 org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:142) ~[spring-messaging-4.3.10.RELEASE.jar!/:4.3.10.RELEASE] 
....

I'm not sure where the LinkedHashMap is coming from. I tried changing the application.properties file to:

spring.cloud.stream.bindings.input.content-type=application/json;com.company.InputDto

but no help. I also tried adding

--spring.cloud.stream.bindings.output.contentType='application/json'

to the splitter when creating the stream (following directions in a sample app), but still get the exception.

I've spent hours and just cannot see what I'm missing. Appreciate any assistance.

My custom processor uses Spring Cloud Dalston.SR3. I'm using SCDF Server and shell 1.3.0.M1. Using the Kafka binder.


Update, more info. I looked at the code in the Splitter starter and wrote a little test case to simulate what it's doing.

final ExpressionEvaluatingSplitter splitter = new ExpressionEvaluatingSplitter(
    new SpelExpressionParser().parseExpression(
        "T(com.jayway.jsonpath.JsonPath).read(payload, '$..[0:]')"));
final PollableChannel channel = new QueueChannel();
splitter.setOutputChannel(channel);
splitter.handleMessage(new GenericMessage<Object>(
  "[{\"name\":\"first\",\"url\":\"url1\"},
    {\"name\":\"second\",\"url\":\"url2\"}]"));
final Message<?> message = channel.receive(10);
System.out.println("payload class: " + message.getPayload().getClass());
System.out.println("payload: " + message.getPayload());
System.out.println(channel.receive(10));

This produces output:

payload class: class java.util.LinkedHashMap
payload: {name=first, url=url1}
GenericMessage [payload={name=second, url=url2}, headers={sequenceNumber=2,
  correlationId=xx, id=yy, sequenceSize=2, timestamp=1503609649518}]

Aha, the LinkedHashMap! Now I just need to convince the splitter to send the output as plain text or json, not a Map.


Update 2. I've been able to duplicate this issue without using any custom processors.

stream create --name test --definition "time <args>
  | httpclient <args>
  | splitter --expression=\"#jsonPath(payload,'$.[0:]')\" 
         --outputType=\"application/json\"
  | transform --expression=\"new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload) != null\" 
         --outputType=\"application/json\"
  | log"

When run, the splitter log file contains this exception (abridged), the infamous "payload must not be null" error:

2017-08-25 13:09:30,322 DEBUG -kafka-listener-2 o.s.i.c.DirectChannel:411 - preSend on channel 'output', message: GenericMessage [payload={name=first, url=url1}, headers={sequenceNumber=1, kafka_offset=xx sequenceSize=2, correlationId=yy, id=zz, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTopic=test.httpclient, timestamp=1503680970322}]
2017-08-25 13:09:30,328 ERROR -kafka-listener-2 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = test.httpclient, partition = 0, offset = 52606, CreateTime = 1503680967030, checksum = 2166925613, serialized key size = -1, serialized value size = 470, key = null, value = [B@6567451e)
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.IllegalArgumentException: payload must not be null
    at     org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449) ~[spring-integration-core-.3.8.RELEASE.jar!/:4.3.8.RELEASE]

It looks like the splitter is having difficulty converting a LinkedHashMap to JSON. Any way to make the conversion happen?

I also tried setting the outputType of the httpclient processor to be explicitly application/json and it didn't seem to make a difference. (Following the docs. The example shows the outputType values with shell quotes, I also tried without, no difference.)

Apps were loaded into SCDF server using command (replaced '.' in bit-ly so SO would accept link)

app import --uri http://bit-ly/Bacon-RELEASE-stream-applications-kafka-10-maven

Noticed a few more things. The debug logging shows the content type as text/plain throughout, so it appears to not be picking up the content type I set explicitly. Also, the errors go away if I remove the transform processor. I see the data in the log, but not in JSON format, just this:

{name=first, url=url1}
2

2 Answers

0
votes

Can you please try to set the output content type of the splitter, like you said, and just remove any contentType definition of your input processor. JSON->POJO should be automatic from message headers.

0
votes

I was able to get this working for both the pure Spring stream and a stream including my custom processor by explicitly setting both inputType and outputType as shown.

stream create --name test --definition "time 
  | httpclient <args> --outputType=\"application/json\" 
  | splitter --expression=\"#jsonPath(payload,'$.[0:]')\" 
     --inputType=\"application/json\" 
     --outputType=\"application/json\" 
  | myprocessor 
     --inputType=\"application/json\" 
     --outputType=\"application/json\" 
  | log"

Gary Russell mentions that this issue has been corrected in Spring Cloud Stream Ditmars.M2 (1.3.0.M2).