0
votes

I have a simple SCDF stream that looks like this:

http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

The mvmn-transform is a simple custom transformer that looks like this:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Message<?> message) {
        Object payload = message.getPayload();
        Map<String, Object> result = new HashMap<>();
        Map<String, String> headersStr = new HashMap<>();

        message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

        result.put("headers", headersStr);
        result.put("payload", payload);
        result.put("configProp", config.getSomeConfigProp());

        return result;
    }

    // See https://stackguides.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

This works fine.

But I've read that Spring Cloud Function should allow me to implement such apps without a necessity to specify binding and transformer annotations, so I've changed it to this:

@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    // @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @Bean
    public Function<Message<?>, Map<String, Object>> transform(
    // Message<?> message
    ) {
        return message -> {
            Object payload = message.getPayload();
            Map<String, Object> result = new HashMap<>();
            Map<String, String> headersStr = new HashMap<>();

            message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

            result.put("headers", headersStr);
            result.put("payload", payload);
            result.put("configProp", "Config prop val: " + config.getSomeConfigProp());

            return result;
        };
    }

    // See https://stackguides.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

And now I have a problem - SCDF source and target topic names are ignored by Spring-Cloud-Function apparently, and topics transform-in-0 and transform-out-0 are created instead.

SCDF creates topics that have names like <stream-name>.<app-name> eg something like TestStream123.http and TestStream123.mvmn-transform

Previously they were used for transformer - as it should be, since it is a part of the SCDF stream. But now they are ignored by Spring-Cloud-Function and transform-in-0 and transform-out-0 are created instead.

Thus my transformer no longer receives any input, as it expects it on a wrong Kafka topic. And would probably produce no output to the stream as well, since it outputs to the wrong Kafka topic also.

P.S. Just in case, full project code on GitHub: https://github.com/mvmn/scdftest-transformer/tree/scfunc

In order to run locally start up Kafka, Skipper, SCDF and SCDF console, do mvn clean install in the app folder and then do app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOTin the coonsole. Then you can deploy stream using definition http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp

1

1 Answers

2
votes

Since you are using the functional model of writing Spring Cloud Stream applications, when you deploy this app, you need to pass two properties on the custom processor to restore the Spring Cloud Data Flow behavior.

spring.cloud.stream.function.bindings.transform-in-0=input spring.cloud.stream.function.bindings.transform-out-0=output

Can you try that and see if that makes a difference?