I have a simple SCDF stream that looks like this:
http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp
The mvmn-transform is a simple custom transformer that looks like this:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Message<?> message) {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", config.getSomeConfigProp());
return result;
}
// See https://stackguides.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
This works fine.
But I've read that Spring Cloud Function should allow me to implement such apps without a necessity to specify binding and transformer annotations, so I've changed it to this:
@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
// @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@Bean
public Function<Message<?>, Map<String, Object>> transform(
// Message<?> message
) {
return message -> {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", "Config prop val: " + config.getSomeConfigProp());
return result;
};
}
// See https://stackguides.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
And now I have a problem - SCDF source and target topic names are ignored by Spring-Cloud-Function apparently, and topics transform-in-0
and transform-out-0
are created instead.
SCDF creates topics that have names like <stream-name>.<app-name>
eg something like TestStream123.http
and TestStream123.mvmn-transform
Previously they were used for transformer - as it should be, since it is a part of the SCDF stream. But now they are ignored by Spring-Cloud-Function and transform-in-0
and transform-out-0
are created instead.
Thus my transformer no longer receives any input, as it expects it on a wrong Kafka topic. And would probably produce no output to the stream as well, since it outputs to the wrong Kafka topic also.
P.S. Just in case, full project code on GitHub: https://github.com/mvmn/scdftest-transformer/tree/scfunc
In order to run locally start up Kafka, Skipper, SCDF and SCDF console, do mvn clean install
in the app folder and then do app register --name mvmn-transform-1 --type processor --uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT --metadata-uri maven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT
in the coonsole. Then you can deploy stream using definition http --port=12346 | mvmn-transform | file --name=tmp.txt --directory=/tmp