I have problems with a service which uses spring cloud stream and kafka. The service had been working ok, but yesterday started reporting a series of exceptions on startup:
Checking for rethrow: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.integration.channel.DirectChannel : preSend on channel 'payment-reply', message: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=a9fe9b1c87b14698, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=7aa71302bc18bb4c, spanParentSpanId=a9fe9b1c87b14698, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[7aa71302bc18bb4c], spanParentSpanId=[a9fe9b1c87b14698], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=7aa71302bc18bb4c, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999}]
2018-09-11 10:43:34.904 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor : Created a new span in pre sendNoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor : Created a new span in before handleNoopSpan{context=966a10371583367f/e1d1a2a6b9ad093e}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor : Will finish the current span after message handled NoopSpan{context=966a10371583367f/4476713d70434d52}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor : Will finish the current span after completion NoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-11 10:43:35.001 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=payment-gateway] Fetch READ_UNCOMMITTED at offset 0 for partition refundResponse-0 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=payment-gateway] Added READ_UNCOMMITTED fetch request for partition refundResponse-0 at offset 0 to node 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=payment-gateway] Sending READ_UNCOMMITTED fetch for partitions [refundResponse-0] to broker 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
2018-09-11 10:43:35.004 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.i.h.a.ErrorMessageSendingRecoverer : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=7aa71302bc18bb4c, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=c94b21ccaaed668b, spanParentSpanId=7aa71302bc18bb4c, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[c94b21ccaaed668b], spanParentSpanId=[7aa71302bc18bb4c], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=c94b21ccaaed668b, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, id=83994228-ba45-2303-1f7e-2eaf8f49c400, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999, timestamp=1536662614904}]
2018-09-11 08:44:19.837 ERROR [payment-gateway,bd9888a7d590ebf7,535db983ae0aedab,false] 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessageDeliveryException:
Dispatcher has no subscribers for channel 'application-1.payment-reply'.; nested exception is org.springframework.integration.MessageDispatchingException:
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}], failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
after some time we then see exceptions like this:
Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name '946859a6-bc27-466d-91ba-3da93af50ac9:1' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '946859a6-bc27-466d-91ba-3da93af50ac9:1' available
the connection to kafka is configured with a property: spring.kafka.bootstrap-server = kafka.kafka:9092
and the topics are configured with spring cloud stream properties: spring.cloud.stream.bindings.[topic-name].destination = blah
The interaction with kafka goes via spring integration with code like this:
@MessagingGateway
public interface StreamGateway {
@Gateway(requestChannel = KafkaConfig.ENRICH_PAYMENT, replyChannel = ChannelNames.PAYMENT_REPLY, replyTimeout = 10000)
String processPayment(String payload);
}
//Different class:
private final StreamGateway gateway;
...
gateway.processPayment(message)
This is running on an azure kubernetes deployment, and kafka is in a separate pod from the spring boot service.
thanks in advance.
Update: The problem reoccured and some further investigation has highlighted a couple of things
- Because we're using spring integration
@MessagingGateway
and@Gateway
to create a synchronous interaction with Kafka, there is no normal topicStreamListener
or subscriber - The problem is occurring when there is a lag on the topic, i.e. there are messages in the topic beyond the topic offset.
- The lack of a normal
StreamListener
means the lag messages have no means of being processed. Only when a connection is made by the MessageGateway, is it possible for messages to be read from the topic. - One means of getting rid of the problem is to read all 'lag' messages, so that the lag is 0. The service will then start normally, however if I manually post messages to the topic (out-with the MessageGateway interaction), then the error reoccurs.
- A second partial solution (which I dont fully understand yet) is to add a
@DependsOn
annotation to the MessageGateway, indicating that it requires a bean separately created with a@Input
SubscribableChannel object. This means the SubscribableChannel must be created before the MessageGateway, therefore creating a Subscriber, however there is still no StreamListener, so exceptions are still thrown as lag messages are pulled from the topic, with no-where to go ????