1
votes

I am new to Spring Cloud Stream. My use case is to read from a file source and publish messages (to Kafka) for each line in the file. I have tried using the file source app starter (https://github.com/spring-cloud-stream-app-starters/file/tree/master/spring-cloud-starter-stream-source-file) and have been able to publish messages.

However the I now need to tweak the body of the message before publishing. The app starter generates generic messages and I need to modify the structure before publishing. I have tried searching on SO but haven't found any suitable example. Can anyone please suggest on how to achieve this?

Many thanks.

2

2 Answers

1
votes

Actually this is a new feature that we're going to blog soon, but I'll try to explain it here. I believe you want to extend the existing app, so in this case you simply want to create a new app that extends source-file and then use newly added Spring Cloud Function support to simply compose your transformer into the existing app. First, you need to make sure you're using the newest Spring Cloud Stream which should be Fishtown.RC1 (2.1.0.RC1). Also, we have an example (which is going to be used for the blog) that you may find useful. It actually does exactly what you're looking for; Only instead of extending file-source it extends http-source which means you simply have to swap a dependency in the pom from spring-cloud-starter-stream-source-http to spring-cloud-starter-stream-source-file and then simply define a Bean of type Function where you define your transformation and provide a property during teh startup --spring.cloud.stream.function.definition=uppercase where uppercase is the name of the function you want to compose at the tail of file source.

@SpringBootApplication
public class MyAppExtender {

    public static void main(String[] args) {
        SpringApplication.run(MyAppExtender.class, "--spring.cloud.stream.function.definition=uppercase");
    }

    @Bean
    public Function<String, String> uppercase() {
        return x -> x.toUpperCase();
    } 
}

Anyway, I know the above explanation may be missing a few parts, but give it a shot and see if you have a follow up questions. I'll make sure I'll post the blog when it's ready.

0
votes

Messages are immutable.

Add a .transform() method to the flowBuilder().

See transformers.