I'm having some problems getting a Spring Integration flow using the DSL which has a splitter operating correctly for my needs in a Spring Cloud Dataflow context. Essentially my microservice is a Processor that is trying to do the following:
- Some third party reports are retrieved and munged together. This produces a collection of events. (Note: my microservice is NOT a Source because it needs to be driven by a configurable schedule so I have a scheduled trigger feeding command messages to this microservice.)
- The event collection is split so each event can be validated. Events that pass validation are fine and flow onwards. Events which fail validation must be dead-lettered in the Dataflow context. So there is NO concept here of a batch of events that either fails or succeeds.
The problems I am trying to overcome are:
- throwing a MessagingException on an event that fails validation results in no further event processing
- working around this premature stoppage results in no dead-lettering in Dataflow because the workaround involves exceptions being thrown in an executor which of course go nowhere!
An event which fails validation is marked with a hasError message header which a router uses to send it to a ServiceActivator that throws a MessagingException.
After some investigation and experimentation my flawed experiment looks like this:
IntegrationFlows.from(Processor.INPUT).
// stuff ommitted for brevity
handle(new MyEventPublisher(.........)).
// List of events produced, split them
split().
// validate each event
transform(new MyEventValidator()).
// attempt to circumvent premature stoppage
channel(MessageChannels.executor(Executors.newCachedThreadPool())).
// route events based on validation result
<String>route("headers[hasError] != null && headers[hasError] == 'true'",
spec -> {
spec.resolutionRequired(false);
spec.defaultOutputChannel(Processor.OUTPUT);
// A failed event routes to a service that throws a MessagingException
spec.subFlowMapping("true", sf -> sf.<String>handle(new ExceptionThrowingService()));
// Otherwise events flow onwards
spec.channelMapping("false", Processor.OUTPUT);
}).
get();
Without the channel step and the cached threadpool, the processing stops when the first event validation failure is encountered, but that failed event is dead-lettered and any successful events feed through.
With the threadpool, all events are processed. However, no events are dead-lettered in the Dataflow context because the exception is thrown in an executor thread, but the successful events do flow throw the Dataflow stream.
Am I able to use a splitter, process the entire input, and communicate MessagingExceptions to the Dataflow runtime?