1
votes

I'm having some problems getting a Spring Integration flow using the DSL which has a splitter operating correctly for my needs in a Spring Cloud Dataflow context. Essentially my microservice is a Processor that is trying to do the following:

  1. Some third party reports are retrieved and munged together. This produces a collection of events. (Note: my microservice is NOT a Source because it needs to be driven by a configurable schedule so I have a scheduled trigger feeding command messages to this microservice.)
  2. The event collection is split so each event can be validated. Events that pass validation are fine and flow onwards. Events which fail validation must be dead-lettered in the Dataflow context. So there is NO concept here of a batch of events that either fails or succeeds.

The problems I am trying to overcome are:

  • throwing a MessagingException on an event that fails validation results in no further event processing
  • working around this premature stoppage results in no dead-lettering in Dataflow because the workaround involves exceptions being thrown in an executor which of course go nowhere!

An event which fails validation is marked with a hasError message header which a router uses to send it to a ServiceActivator that throws a MessagingException.

After some investigation and experimentation my flawed experiment looks like this:

IntegrationFlows.from(Processor.INPUT).

    // stuff ommitted for brevity
    handle(new MyEventPublisher(.........)).

    // List of events produced, split them
    split().

    // validate each event
    transform(new MyEventValidator()).

    // attempt to circumvent premature stoppage
    channel(MessageChannels.executor(Executors.newCachedThreadPool())).

    // route events based on validation result
    <String>route("headers[hasError] != null && headers[hasError] == 'true'",
    spec -> {
                spec.resolutionRequired(false);
                spec.defaultOutputChannel(Processor.OUTPUT);

                // A failed event routes to a service that throws a MessagingException
                spec.subFlowMapping("true", sf -> sf.<String>handle(new ExceptionThrowingService()));

                // Otherwise events flow onwards
                spec.channelMapping("false", Processor.OUTPUT);
            }).
get();

Without the channel step and the cached threadpool, the processing stops when the first event validation failure is encountered, but that failed event is dead-lettered and any successful events feed through.

With the threadpool, all events are processed. However, no events are dead-lettered in the Dataflow context because the exception is thrown in an executor thread, but the successful events do flow throw the Dataflow stream.

Am I able to use a splitter, process the entire input, and communicate MessagingExceptions to the Dataflow runtime?

1

1 Answers

0
votes

Dead Lattering is related to the inbound message. And it is true that container which receives the incoming message sends it (or something based on it) to the DLQ whenever any exception is happened downstream. That's already your custom logic to produce many messages via splitter, but from the Broker perspective it is still single incoming message problem.

To overcome the issue you should consider to send to the DLQ manually on each splitter item validation. For this purpose you can take a look into ExpressionEvaluatingRequestHandlerAdvice: http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#expression-advice. And send exceptions to some channel for analysis. And already there you can send to DLQ or something else.

From the Spring Cloud Stream and Data Flow perspective you can add one more binding configuration for the destination: http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#__code_input_code_and_code_output_code