I am looking at Kafka-streams for eventing. I tried adding a interceptor(for the consumer) for Kakfa-Streams.
I added a RecordInterceptor like the following :
configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");
But I get an error during startup :
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor
It works fine if I add an interceptor that implements
org.apache.kafka.clients.consumer.ConsumerInterceptor
.
But I require a RecordInterceptor.
My question is, is there a way to add a RecordInterceptor implementation as a Consumer interceptor to Kafka-streams? Any help is greatly appreciated.