0
votes

I am trying to build a custom transformer application using the guidelines provided here https://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#streams-dev-guide

I have started kafka on my windows machine. I have http source running on windows machine it writes to destination transformData. Command: java -Dserver.port=8123 -Dhttp.path-pattern=/data -Dspring.cloud.stream.bindings.output.destination=transformData -jar http-source-kafka-10-1.3.1.RELEASE.jar

I have transform application running that reads input from transformData and outputs to destination transformedData Command java -Dserver.port=8090 -Dspring.cloud.stream.bindings.input.destination=transformData -Dspring.cloud.stream.bindings.output.destination=transformedData -jar transformer-0.0.1-SNAPSHOT.jar

I have log sink running that reads from destination transformedData Command java -Dserver.port=8888 -Dspring.cloud.stream.bindings.input.destination=transformedData -jar log-sink-kafka-10-1.3.1.RELEASE.jar

Problem: When I try to send this curl request: curl -H "Content-Type: application/json" -X POST -d '{"id":"1", "temp":"400"}' http://172.20.24.47:8123/data

On the custom Transformer console I see errors:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token '▒': was expecting ('true', 'false' or 'null') at [Source: (byte[])"? contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"id":"1", "temp":"400"}"; line: 1, column: 4] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar!/:2.9.6] at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3121) ~[jackson-databind-2.9.6.jar!/:2.9.6] at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertParameterizedType(ApplicationJsonMessageMarshallingConverter.java:114) ~[spring-cloud-stream-2.0.0.RELEASE.jar!/:2.0.0.RELEASE] ... 37 common frames omitted

Can any one help?

2
BTW, the if I just have httpsrc and logsink running then the curl works fine. Wondering if need to set some consumer properties or producer properties to set headerMode=rawDeepali Bhandari

2 Answers

1
votes

I got this to finally work. When building the custom application using the Spring initializr instead of selecting 2.0.4 release as starter I reverted back to 1.5.15 Release. Now I have no more need to pass properties on the subscriber end that is the custom app and the logger sink app using headerModes set to embeddedHeaders.

0
votes

It appears that you are using Spring Cloud Stream 2.0.0.RELEASE, but your app is 1.3.x. Can you set spring.cloud.stream.bindings.input.consumer.headerMode to embeddedHeaders in the processor app where it is failing? In 2.0, the default is not to embed headers as Kafka supports it out of the box. However, in versions prior to 2.0 (which is used by 1.3.x apps), the default is to embed the headers. You need to explicitly set that when using that combination.