0
votes

I'm struggling with customization of my spring kafka streams application. I have been trying to configure handling uncaught (runtime exceptions) at my KStreams.

Refering to documentation https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder - it should be done like that:

@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {

    @Bean
    public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
        return factoryBean -> {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                        log.error("An exception has occurred={}", e.getMessage()) ;
                    });

                }
            });
        };
    }

}

Later on I have a KStream

    @Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>> paymentExecution() {
        return stream -> stream
                .peek((k, v) -> {
                    if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
                        throw new IllegalStateException();
                    }
                });
    }

If I send Transaction with UNKNOWN status StreamThread dies due to IllegalStateException and then my KStream is not consuming anymore any incomming requests.

Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)

Is there something that I miss? Or maybe there is more appropriate to handle runtime exceptions that rise while processing stream?

After runtime exception I want commit that exception-event, and then I want my StreamThread-1 to be able to still consume events.

2

2 Answers

0
votes

If the application reaches the UncaughtExcpetionHandler, then the stream thread is already stopped and too late to recover. It won't continue processing. It is provided for you to take any graceful shutdown measures before exiting the application. You need to restart the application in order to continue the processing. You can probably look into using something like the branching feature in Kafka Streams to ignore the records with unknown status, something like below (I haven't tested this, but should work). The idea is to create a branch for the incoming KStream to filter out any records with this UNKNOWN status to be ignored rather than throwing an exception. It is still possible to log the unknown ones and provide any additional logic etc.

@Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>[]> paymentExecution() {
        return stream -> stream
                .branch( (k, v) - !v.getStatus().equals(PaymentStatus.UNKNOWN));
    }
0
votes