1
votes

I have problems with a service which uses spring cloud stream and kafka. The service had been working ok, but yesterday started reporting a series of exceptions on startup:

Checking for rethrow: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.integration.channel.DirectChannel    : preSend on channel 'payment-reply', message: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=a9fe9b1c87b14698, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=7aa71302bc18bb4c, spanParentSpanId=a9fe9b1c87b14698, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[7aa71302bc18bb4c], spanParentSpanId=[a9fe9b1c87b14698], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=7aa71302bc18bb4c, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999}]
2018-09-11 10:43:34.904 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Created a new span in pre sendNoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Created a new span in before handleNoopSpan{context=966a10371583367f/e1d1a2a6b9ad093e}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Will finish the current span after message handled NoopSpan{context=966a10371583367f/4476713d70434d52}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Will finish the current span after completion NoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-11 10:43:35.001 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Fetch READ_UNCOMMITTED at offset 0 for partition refundResponse-0 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Added READ_UNCOMMITTED fetch request for partition refundResponse-0 at offset 0 to node 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Sending READ_UNCOMMITTED fetch for partitions [refundResponse-0] to broker 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=3
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=3
2018-09-11 10:43:35.004 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.i.h.a.ErrorMessageSendingRecoverer   : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=7aa71302bc18bb4c, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=c94b21ccaaed668b, spanParentSpanId=7aa71302bc18bb4c, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[c94b21ccaaed668b], spanParentSpanId=[7aa71302bc18bb4c], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=c94b21ccaaed668b, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, id=83994228-ba45-2303-1f7e-2eaf8f49c400, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999, timestamp=1536662614904}]
2018-09-11 08:44:19.837 ERROR [payment-gateway,bd9888a7d590ebf7,535db983ae0aedab,false] 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : 
org.springframework.messaging.MessageDeliveryException: 
Dispatcher has no subscribers for channel 'application-1.payment-reply'.; nested exception is org.springframework.integration.MessageDispatchingException: 
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}], failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}]
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)

after some time we then see exceptions like this:

Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name '946859a6-bc27-466d-91ba-3da93af50ac9:1' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '946859a6-bc27-466d-91ba-3da93af50ac9:1' available

the connection to kafka is configured with a property: spring.kafka.bootstrap-server = kafka.kafka:9092

and the topics are configured with spring cloud stream properties: spring.cloud.stream.bindings.[topic-name].destination = blah

The interaction with kafka goes via spring integration with code like this:

@MessagingGateway
public interface StreamGateway {

    @Gateway(requestChannel = KafkaConfig.ENRICH_PAYMENT, replyChannel = ChannelNames.PAYMENT_REPLY, replyTimeout = 10000)
    String processPayment(String payload);

}

//Different class:

private final StreamGateway gateway;
...
gateway.processPayment(message)

This is running on an azure kubernetes deployment, and kafka is in a separate pod from the spring boot service.

thanks in advance.

Update: The problem reoccured and some further investigation has highlighted a couple of things

  • Because we're using spring integration @MessagingGateway and @Gateway to create a synchronous interaction with Kafka, there is no normal topic StreamListener or subscriber
  • The problem is occurring when there is a lag on the topic, i.e. there are messages in the topic beyond the topic offset.
  • The lack of a normal StreamListener means the lag messages have no means of being processed. Only when a connection is made by the MessageGateway, is it possible for messages to be read from the topic.
  • One means of getting rid of the problem is to read all 'lag' messages, so that the lag is 0. The service will then start normally, however if I manually post messages to the topic (out-with the MessageGateway interaction), then the error reoccurs.
  • A second partial solution (which I dont fully understand yet) is to add a @DependsOn annotation to the MessageGateway, indicating that it requires a bean separately created with a @Input SubscribableChannel object. This means the SubscribableChannel must be created before the MessageGateway, therefore creating a Subscriber, however there is still no StreamListener, so exceptions are still thrown as lag messages are pulled from the topic, with no-where to go ????
2

2 Answers

1
votes

While I am not sure about the details of your application, what is clear is that a Message gets delivered to an application-1.payment-reply channel which, as the error states, has no subscriber. Basically it means there is no listener on that channel (such as @StreamListener or @ServiceActivator etc).

It is a very common Spring Integration miss-configuration, but without looking at your app it is hard to say where it is.

0
votes

On looking at the debug log I noticed that the service was connecting to other topics correctly, but having problems with the payment-reply topic. I tried deleting this topic and restarting the service. This fixed the problem.