0
votes

I am looking at Kafka-streams for eventing. I tried adding a interceptor(for the consumer) for Kakfa-Streams.

I added a RecordInterceptor like the following :

  configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");

But I get an error during startup :

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor

It works fine if I add an interceptor that implements

org.apache.kafka.clients.consumer.ConsumerInterceptor.

But I require a RecordInterceptor.

My question is, is there a way to add a RecordInterceptor implementation as a Consumer interceptor to Kafka-streams? Any help is greatly appreciated.

1

1 Answers

0
votes

If you need the methods of the Spring RecordInterceptor, you need make your class implement both interfaces, however only the methods of the ConsumerInterceptor will be invoked directly by the Kafka Streams framework

Kafka Consumers will only accept subclasses of ConsumerInterceptor