0
votes

I am working on kafka-streams api . Basically Kafka-stream gets data from source topic and after applying some filter it writes back into a target kafka topic.

Dependency used . :

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.1.0</version>
    </dependency>

below is the code for the same . :

{ ...

       // create property

            Properties property =  new Properties();
            property.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
            property.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"kafka_streams_app");
            property.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
            property.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

            //create topology
            StreamsBuilder streamsBuilder = new StreamsBuilder();

            //build topology
            KStream<String,String> inputTopic = streamsBuilder.stream("source_topic");

          //filtering data
            KStream<String,String> filteredStream = inputTopic.filter(
                    (k,val)-> filterData(val)>10000
            );
            filteredStream.to("target_topic");
            KafkaStreams streams = new KafkaStreams(
                    streamsBuilder.build(),
                    property
            );
            //start our stream app

            streams.start(); 
    ...
    }

here is my Application architecture :

producer API(produces in source topic) =>
kafka-stream API(reads from source topic and sends data to target topic) => kafka-consumer api(reads from target topic)

What I want is , when the stream writes the data to the target topic , i want to capture the event whether it's successful or not.

Is there any way to capture that callback? Thanks

1
@ValBonn I am trying to capture event when the stream writes into the filtered topic. For now just like printing data is written to topic as success message.programoholic
I added an answer -- it's not possible. I am still interested, why you need this? What do you try to achieve using the callback?Matthias J. Sax

1 Answers

0
votes

The producer call back that you can specify on send on the plain producer API

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

is not exposed in Streams API.