I am working on kafka-streams api . Basically Kafka-stream gets data from source topic and after applying some filter it writes back into a target kafka topic.
Dependency used . :
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.1.0</version>
</dependency>
below is the code for the same . :
{ ...
// create property
Properties property = new Properties();
property.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
property.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"kafka_streams_app");
property.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
property.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
//create topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
//build topology
KStream<String,String> inputTopic = streamsBuilder.stream("source_topic");
//filtering data
KStream<String,String> filteredStream = inputTopic.filter(
(k,val)-> filterData(val)>10000
);
filteredStream.to("target_topic");
KafkaStreams streams = new KafkaStreams(
streamsBuilder.build(),
property
);
//start our stream app
streams.start();
...
}
here is my Application architecture :
producer API(produces in source topic) =>
kafka-stream API(reads from source topic and sends data to target topic) => kafka-consumer api(reads from target topic)
What I want is , when the stream writes the data to the target topic
, i want to capture the event whether it's successful or not.
Is there any way to capture that callback? Thanks
data is written to topic
as success message. – programoholic