1
votes

Regarding the answer posted for How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?, what is the correct way to handle errors at the @MessagingGateway that can be returned from Spring Cloud Stream services?

To recap, I have a @MessagingGateway that provides synchronous access to asynchronous services built using Spring Cloud Stream. When an error occurs within my Spring Cloud Stream service layer, I create an error response and send it through a SubscribableChannel to other @StreamListener services that process the errors.

For example, when an account is created, I send a message to the accountCreated channel. When an error occurs I send an error response to the accountNotCreated channel.

This works fine, but I also want send an error response to the client of the @MessagingGateway so they receive the error response synchronously. The @MessagingGateway annotation has an errorChannel attribute, but the @Gateway annotation does not. So, the client of the @MessagingGateway should be able to block and wait for either 1) an account to be created or 2) an error response.

Again, the goal here is to build "backend" services that utilize Spring Cloud Stream for transactional services (i.e., those that create, update, or delete data) while at the same time provide our clients "gateway" access that block and wait for the responses to be returned. The solution Artem Bilan provided me works for the happy path, but when an error occurs, that's where I am not clear on how Spring Integration is best suited to handle this.

UPDATE with code example

GatewayApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  @Component
  public interface GatewayChannels {
    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    public static final String ENRICH = "enrich";

    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    StringWrapper process(StringWrapper payload) throws MyException;
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<StringWrapper> getUser(@PathVariable("string") String string) {
      try {
        StringWrapper result = gateway.process(new StringWrapper(string));
        // Instead of catching the exception in the below catch clause, here we have just a string
        // representation of the stack trace when an exception occurs.
        return new ResponseEntity<StringWrapper>(result, HttpStatus.OK);
      } catch (MyException e) {
        // Why is the exception not caught here?
        return new ResponseEntity<StringWrapper>(new StringWrapper("An error has occurred"),
            HttpStatus.INTERNAL_SERVER_ERROR);
      }
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(StreamGateway.ENRICH)
        .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StringWrapper.java (use across all three projects)

package com.example.demo;

import com.fasterxml.jackson.annotation.JsonProperty;

public class StringWrapper {
  @JsonProperty
  private String string;

  @JsonProperty
  private long time = System.currentTimeMillis();

  public StringWrapper() {
    super();
  }

  public StringWrapper(String string) {
    this.string = string;
  }

  public String getString() {
    return string;
  }


  public long getTime() {
    return time;
  }

  public void setString(String string) {
    this.string = string;
  }

  @Override
  public String toString() {
    return "StringWrapper [string=" + string + ", time=" + time + "]";
  }

}

CloudStreamApplication.java

package com.example.demo;

import java.util.Random;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    SubscribableChannel toUppercaseRequest();
  }

  @Component
  public class Processor {

    @Autowired
    CloudStreamChannels channels;

    @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
    public void process(Message<StringWrapper> request) {
      StringWrapper uppercase = null;
      try {
        uppercase = toUppercase(request);
      } catch (MyException e) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(e).setHeader("__TypeId__", e.getClass().getName())
                .copyHeaders(request.getHeaders()).build());
      }
      if (uppercase != null) {
        channels.toUppercaseReply()
            .send(MessageBuilder.withPayload(uppercase)
                .setHeader("__TypeId__", StringWrapper.class.getName())
                .copyHeaders(request.getHeaders()).build());
      }
    }

    private StringWrapper toUppercase(Message<StringWrapper> request) throws MyException {
      Random random = new Random();
      int number = random.nextInt(50) + 1;
      if (number > 25) {
        throw new MyException("An error occurred.");
      }
      return new StringWrapper(request.getPayload().getString().toUpperCase());
    }
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          content-type: application/json
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          producer:
            required-groups: gateway-to-uppercase-reply, stream-listener-to-uppercase-reply
server:
  port: 8088

StreamListenerApplication.java

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

@EnableBinding({StreamListenerApplication.CloudStreamChannels.class})
@SpringBootApplication
public class StreamListenerApplication {

  @Component
  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

  }

  public static void main(String[] args) {
    SpringApplication.run(StreamListenerApplication.class, args);
  }

  @Autowired
  CloudStreamChannels channels;

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public void processToUppercaseReply(Message<StringWrapper> message) {
    System.out.println("Processing message: " + message.getPayload());
  }

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-reply:
          destination: to-uppercase-reply
          content-type: application/json
          group: stream-listener-to-uppercase-reply
server:
  port: 8089
1
You should either accept Artem's answer to that other question (or at least give him a vote up so he gets some credit if you decide to accept your own answer that summarizes your final solution). See What should I do when someone answers my question?.Gary Russell

1 Answers

3
votes

There is only one global errorChannel on @MessagingGateway that is used for all @Gateway methods. If you have a gateway with multiple @Gateway methods, each method can set a message header to indicate which method failed.

If you send a Message<Throwable> to the gateway's reply channel (and there is no error channel) the payload will be thrown to the caller.

If the gateway method has a throws clause, an attempt to unwrap the cause tree is made looking for that exception.

If you add an errorChannel, instead of throwing the exception to the caller, an ErrorMessage with the exception as its payload is sent to the error channel - you can then do any further post-processing on the error channel flow and throw some other exception to the caller if desired. It sounds like you don't need that, though.

So, putting it all together...

  1. Have the error handling service send some message to another destination.
  2. In the gateway service, add a @StreamListener for that destination.
  3. In the @StreamListener construct a Message with an Exception payload and send it to the gateway's reply channel.
  4. The gateway will then throw the payload to the caller.

Something like this should work...

@Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
String process(String payload) throws MyException;

.

@StreamListener(CloudStreamChannels.TO_UPPERCASE_FAILURES)
public void failed(Message<FailInfo> failed) { // payload has info about the failure
    Message m = MessageBuilder.withPayload(new MyException(failed.getPayload())).
         .copyHeaders(failed.getHeaders())
         .build();
    this.reply.send(m); // send directly to the gateway's reply channel (perhaps @Autowired)
}

It's important to propagate the reply channel header end to end, regardless of how many remote services are involved.

EDIT

@SpringBootApplication
@EnableBinding(TwoAsyncPipes.class)
public class So47948454aApplication {

    public static void main(String[] args) {
        SpringApplication.run(So47948454aApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(Gate gate) {
        return args -> {
            System.out.println(gate.foo(new Foo("foo")));
            try {
                gate.foo(new Foo("fail"));
            }
            catch (MyException e) {
                System.out.println(e);
            }
        };
    }

    @MessagingGateway
    public interface Gate {

        @Gateway(requestChannel = "enrich", replyChannel = "transformed")
        Foo foo(Foo foo) throws MyException;

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from("enrich")
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel("gateOut").get();
    }

    @Bean
    public MessageChannel transformed() {
        return new DirectChannel();
    }

    @Transformer(inputChannel = "gateIn", outputChannel = "transformed")
    public Object jsonToObject(Message<?> in) {
        return jtot().transform(in);
    }

    @Bean
    public JsonToObjectTransformer jtot() {
        return new JsonToObjectTransformer();
    }

    @StreamListener("serviceIn")
    @SendTo("serviceOut")
    public Message<?> listen(Foo in) {
        if (in.foo.equals("fail")) {
            return MessageBuilder.withPayload(new MyException("failed"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            MyException.class.getName())
                    .build();
        }
        else {
            return MessageBuilder.withPayload(new Foo("bar"))
                    .setHeader(JsonHeaders.TYPE_ID,
                            Foo.class.getName())
                    .build();
        }
    }

    public static class Foo {

        String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    @SuppressWarnings("serial")
    public static class MyException extends RuntimeException {

        private String error;

        public MyException() {
            super();
        }

        public MyException(String error) {
            this.error = error;
        }

        public String getError() {
            return this.error;
        }

        public void setError(String error) {
            this.error = error;
        }

        @Override
        public String toString() {
            return "MyException [error=" + this.error + "]";
        }

    }

    public interface TwoAsyncPipes {

        @Output("gateOut")
        MessageChannel gateOut();

        @Input("serviceIn")
        MessageChannel serviceIn();

        @Output("serviceOut")
        MessageChannel serviceOut();

        @Input("gateIn")
        MessageChannel gateIn();

    }

}

and

Foo [foo=bar]
MyException [error=failed]

POM

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>com.example</groupId>
<artifactId>so47948454a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>so47948454a</name>
<description>Demo project for Spring Boot</description>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Edgware.RELEASE</spring-cloud.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

Rabbit binder 1.3.0.RELEASE Spring Integration 4.3.12

2017-12-26 13:56:18.121  INFO 39008 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SpringAMQP#7e87ef9e:0/SimpleConnection@45843650 [delegate=amqp://[email protected]:5672/, localPort= 60995]
Foo [foo=bar]
MyException [error=failed]
2017-12-26 13:56:18.165  INFO 39008 --- [           main] com.example.So47948454aApplication       : Started So47948454aApplication in 3.422 seconds (JVM running for 3.858)

application.yml:

spring:
  cloud:
    stream:
      bindings:
        gateIn:
          destination: serviceOut
          content-type: application/json
        gateOut:
          destination: serviceIn
          content-type: application/json
        serviceIn:
          destination: serviceIn
          content-type: application/json
        serviceOut:
          destination: serviceOut
          content-type: application/json