0
votes

Using kafka streams processor api

Scenario : streams processor( implemented using kafka streams processor api) reads data from source topic and writes data to target topic based on some buisness logic.

Code :

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
  props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

  Topology topology = new Topology();
  topology.addSource("mySource", "source_topic");
  topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource"); 
  topology.addSink("sink1","output_topic","StreamsProcessor");
  topology.addSink("sink2","output_topic2","StreamsProcessor");
  topology.addSink("sink3","output_topic3","StreamsProcessor");

  KafkaStreams streams = new KafkaStreams(topology, props);
  streams.start();
  --------------------------------------------------------------
  public void init(ProcessorContext context) 
  {
      this.context = context;
      context.commit();
  }

  public void process(String key, String Value) 
  {   
      // In a loop send to sink1 sink2 or sink3
      context.forward(key,Value,To.child("sink1"));
  }
  ----------------------------------------------------------------

Question:

If stream processor fails to publish messages to one or more target topics above then what are some of the best ways for retry mechanism using kafka streams processor api ?
Please share code snippets/links/best practices to handle failure scenarios . Thanks.

1
it depends on what type of failure occurred. there are many reasons for failure: deserialization issue, during processing event, during producing message to destination kafka topic etc. if your producer kafka is temporarily unavailable, Kafka Streams provide ability to retry with retries property, e.g. retries: 10. also please take a look at error handling in Kafka Streams: stackoverflow.com/a/51299739/2335775.Vasyl Sarzhynskyi

1 Answers

0
votes

You can impelement some kind of kafkaProducer that will be a messageFailureHandler and with it you can send all the failed messages to a dedicated kafka topic.

If you are fimiliar with the concept of dead-letter-queue in kafka-connect, it kind of the same (besides in kafka-connect its only a matter of configuration).