1
votes

I use Spring Cloud Data Flow to set up a stream that read a CSV file, transform it using a custom processor and log it :

stream create --name testsourcecsv --definition "file --mode=lines --directory=D:/toto/ --file.filename-pattern=adresses-28.csv --maxMessages=1000 | csvToMap --spring.cloud.stream.bindings.output.content-type=application/json | log --spring.cloud.stream.bindings.input.content-type=application/json" --deploy

The file and csvToMap applications work fine, but in the log application I see this kind of exception, for every records :

2019-12-03 11:32:46.500 ERROR 1328 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$5  : Could not decode json type: adresses-28.csv for key: file_name

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'adresses': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"adresses-28.csv"; line: 1, column: 10]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729) ~[jackson-core-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.9.jar!/:2.9.9]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.9.jar!/:2.9.9]
    at org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper.lambda$toHeaders$1(BinderHeaderMapper.java:268) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_202]
    at org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper.toHeaders(BinderHeaderMapper.java:251) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]

This exception is also raised for the file_relativePath header. I don't understand why spring-kafka tries to read them as JSON.

Also, the log-sink logs my records the right way :

2019-12-03 11:32:46.516  INFO 1328 --- [container-0-C-1] log-sink                                 : {"code_postal":"28200","id_fantoir":"28211_0127","source_nom_voie":"inconnue","numero":"1","code_insee":28211,"lon":1.260462,"code_insee_ancienne_commune":"","nom_afnor":"RUE DU VIEUX MOULIN","nom_voie":"Rue du Vieux Moulin","nom_ld":"","libelle_acheminement":"LOGRON","source_position":"inconnue","nom_commune":"Logron","nom_ancienne_commune":"","x":570633.27,"y":6784246.2,"alias":"","id":"28211_0127_00001","rep":"","lat":48.145756}

I log the kafka headers for a debug purpose in my csvToMap processor, giving me :

2019-12-03 11:32:37.042  INFO 10788 --- [container-0-C-1] c.d.streams.processor.CsvToMapProcessor  : headers {sequenceNumber=152963, file_name=adresses-28.csv, sequenceSize=0, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, file_originalFile=NonTrustedHeaderType [headerValue="D:\\toto\\adresses-28.csv", untrustedType=java.io.File], kafka_receivedMessageKey=null, kafka_receivedTopic=testsourcecsv.file, file_relativePath=adresses-28.csv, kafka_offset=430949, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7c3e63db, correlationId=9547c02d-e617-d981-f9b5-8df231530f66, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTimestamp=1575299282558, kafka_groupId=testsourcecsv}

So I definitively don't understand why the log-sink tries to decode the file_name and file_relativePath headers.

I set up a local environment with :

  • Windows 7
  • Spring CDF server v 2.2.1.REALEASE
  • Spring Cloud Skipper v 2.1.2.RELEASE
  • Spring CDF shell v 2.2.1.RELEASE
  • Kafka 2.12-2.3.0

My csvToMap processor is defined as follow :

    @Component
    public class CsvToMapProcessor {
        private static final Logger LOGGER = LoggerFactory.getLogger(CsvToMapProcessor.class);

        @Autowired
        @Qualifier("csvMapper")
        private ObjectReader csvMapper;

        @Autowired
        @Qualifier("jsonWriter")
        private ObjectWriter jsonWriter;

        @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
        public Map<String, Object> transform(String csvLine, @Headers Map<String, Object> headers) {
            try {
                LOGGER.info("headers {}", headers);
                Map<String, Object> map = csvMapper.readValue(csvLine);
                return map;
            } catch (JsonProcessingException e) {
                LOGGER.error("An error occurs while reading CSV line {} : {}", csvLine, e.getMessage());
                LOGGER.debug(e.getMessage(), e);
                return null;
            }
        }
    }

with this parent :

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

And this Spring cloud version :

<spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>

What am I doing wrong to lead to this issue ?

2

2 Answers

4
votes

So :

  • comming with spring-cloud Hoxton version, the spring-cloud-stream version is 3.0.0.RELEASE :
[INFO] +- org.springframework.cloud:spring-cloud-starter-stream-kafka:jar:3.0.0.RELEASE:compile
[INFO] |  \- org.springframework.cloud:spring-cloud-stream-binder-kafka:jar:3.0.0.RELEASE:compile
[INFO] |     +- org.springframework.cloud:spring-cloud-stream-binder-kafka-core:jar:3.0.0.RELEASE:compile
[INFO] |     |  \- org.springframework.integration:spring-integration-kafka:jar:3.2.1.RELEASE:compile
[INFO] |     \- org.springframework.kafka:spring-kafka:jar:2.3.3.RELEASE:compile
  • the log-sink-app 2.1.2 uses spring-cloud-stream v 2.1.4.RELEASE :
[INFO] +- org.springframework.cloud:spring-cloud-starter-stream-kafka:jar:2.1.4.RELEASE:compile
[INFO] |  \- org.springframework.cloud:spring-cloud-stream-binder-kafka:jar:2.1.4.RELEASE:compile
[INFO] |     +- org.springframework.cloud:spring-cloud-stream-binder-kafka-core:jar:2.1.4.RELEASE:compile
[INFO] |     |  \- org.springframework.integration:spring-integration-kafka:jar:3.1.0.RELEASE:compile
[INFO] |     \- org.springframework.kafka:spring-kafka:jar:2.2.8.RELEASE:compile

As the spring-kafka 2.3.3 documentation DefaultKafkaHeaderMapper.setEncodeStrings method says :

Set to true if a consumer of the outbound record is using Spring for Apache Kafka version less than 2.3

The log-sink app actually uses spring-kafka v 2.2.8, so I have to set it to true, using a customized header mapper :

    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }

But if I do so, the log sink doesn't log anything because of the impossibility for it to understand the contentType header encoded by the DefaultKafkaHeaderMapper. And the team provides a BinderHeaderMapper to fix this issue :

Custom header mapper for Apache Kafka. This is identical to the DefaultKafkaHeaderMapper from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x and 2.x apps, where mime types passed as regular MimeType in the header are not de-serialized properly

So I have to configure a custom BinderHeaderMapper in my app :

    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }

And everything works fine.

-1
votes

It looks like the value you set in --file.filename-pattern has issues. Can you check if you are indeed passing the value that adheres to AntPathMatcher (the filename-pattern property is based on this path matcher)?

What happens if you try something like --file.filename-pattern=*.csv?