0
votes

I'm attempting to create a custom exception handler for my Spring Cloud Dataflow stream to route some errors to be requeued and others to be DLQ'd.

To do this I'm utilizing the global Spring Integration "errorChannel" and routing based on exception type.

This is the code for the Spring Integration error router:

package com.acme.error.router;

import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;


@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
   private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);

   public static final String ERROR_CHANNEL = "errorChannel";

   @Router(inputChannel = ERROR_CHANNEL)
    public String onError(Message<Object> message) {
      LOGGER.debug("ERROR ROUTER - onError");
      if(message.getPayload() instanceof MessageTransformationException) {
         MessageTransformationException exception = (MessageTransformationException) message.getPayload();
         Message<?> failedMessage = exception.getFailedMessage();
          if(exceptionChainContainsDlq(exception)) {
             return ErrorMessageChannels.DLQ_QUEUE_NAME;
          }
         return ErrorMessageChannels.REQUEUE_CHANNEL;
      }
      return ErrorMessageChannels.DLQ_QUEUE_NAME;
    }

    ...

}

The error router is picked up by each of the stream apps through a package scan on the Spring Boot App for each:

@ComponentScan(basePackages = { "com.acme.error.router" }
@SpringBootApplication
public class StreamApp {}

When this is deployed and run with the local Spring Cloud Dataflow server (version 1.5.0-RELEASE), and a DlqException is thrown, the message is successfully routed to the onError method in the errorRouter and then placed into the dlq topic.

However, when this is deployed as a docker container with SCDF Kubernetes server (also version 1.5.0-RELEASE), the onError method is never hit. (The log statement at the beginning of the router is never output)

In the startup logs for the stream apps, it looks like the bean is picked up correctly and registers as a listener for the errorChannel, but for some reason, when exceptions are thrown they do not get handled by the onError method in our router.

Startup Logs:

o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router

We are using all default settings for the spring cloud stream and kafka binder configurations:

spring.cloud:
  stream:
    binders:
      kafka:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
        environment.spring.cloud.stream.kafka.binder.zkNodes=zklist

Edit: Added pod args from kubectl describe <pod>

Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher

One other idea we attempted was trying to use the Spring Cloud Stream - spring integration error channel support to send to a broker topic on errors, but since messages don't seem to be landing in the global Spring Integration errorChannel at all, that didn't work either.

Is there anything special we need to do in SCDF Kubernetes to enable the global Spring Integration errorChannel?

What am I missing here?

Update with solution from the comments:

After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky

This was indeed the problem with our setup. Instead of upgrading, we just eliminated our multi-binder usage for now and the issue was resolved.

1
Apart from setting the Kafka config settings for connection/creds, we don't do anything special in K8s-server or muck around with the properties. It could be that the way how you're creating the docker container could have a side effect wrt how the properties are propagated over. See here for entryPoint options - it'd be good to review your Dockerfile and perhaps also the sample app.Sabby Anandan
if we're using the default errorChannel settings though, would that even matter? It would just be part of the default configuration (that we're not overriding in any way with runtime environment variables). Is there some other way that the entrypoint style could affect that?pclem12
by the way, we're using the exec stylepclem12
After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway)Oleg Zhurakousky
I can care less about credits, the fact that I was able to help is more then enough!Oleg Zhurakousky

1 Answers

0
votes

Update with solution from the comments:

After reviewing your configuration I am now pretty sure I know what the issue is. You have a multi-binder configuration scenario. Even if you only deal with a single binder instance the existence of spring.cloud.stream.binders.... is what's going to make framework treat it as multi-binder. Basically this a bug - github.com/spring-cloud/spring-cloud-stream/issues/1384. As you can see it was fixed but you need to upgrade to Elmhurst.SR2 or grab the latest snapshot (we're in RC2 and 2.1.0.RELEASE is in few weeks anyway) – Oleg Zhurakousky

This was indeed the problem with our setup. Instead of upgrading, we just eliminated our multi-binder usage for now and the issue was resolved.