1
votes

I'm using Spring Cloud Stream (3.0.4.RELEASE) with the Kafka-Streams binder (3.0.0.RELEASE). I'm also using the 'Functional Programming Model' (so no @StreamListener etc). What a lovely piece of tech!

I need to be able to pause stream processing / consuming of new events at certain times of the day. This creates a 'blackout period' for events. After the 'blackout period' is over I shall resume stream processing. As a result I want to be able to pause or turn on/off the KStream consumer with code. I cannot seem to manage it!

What have I tried so far? - Using the /actuator/bindings endpoint to start/stop kafka-streams bindings. It seemed like this was not available for the kafka-streams binder, only for the kafka binder :(.

Any help would be much appreciated! Thanks!

1

1 Answers

0
votes

The actuator binding endpoints for controlling stream processing is not supported out of the box for Kafka Streams binder. This use case has come up before.

If you are ok with adding extra input/output topics (and potential latency depending on a number of factors) in front of the Kafka streams processor, there is a way to solve this problem. See the comments added here.

The basic idea is that the first processor is a simple passthrough processor in which it doesn't use Kafka Streams, but the standard messaging based binder in Spring Cloud Stream. There you can control the flow of events using the actuator binding endpoints. The output of this processor becomes the input for the Kafkfa Streams processor.

Once again, it doesn't take much code to implement this pattern (maybe 3 or 4 lines), but there might be performance implications based on the requirements and throughput of your application. Nevertheless, if that is not a concern, this is a pattern that you can try.

Hope this helps.