1
votes

I am creating a simple spring boot app using spring integration. Below are the three main constructs of this app:

  1. Inbound Gateway: WebFluxInboundEndpoint which accepts http request
  2. Outbound Gateway: PubSubMessageHandler which pushes message to google cloud pubsub topic
  3. Message Channel: FluxMessageChannel acting as request channel

Google cloud PubSubMessageHandler provides failure and success callback due to which error/success response is not returned back to webflux endpoint and request waits for indefinite time.

Ask: How success/failure response can be returned back after receiving response from pubsub?

Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

To run application please put your google cloud service key in serviceAccountKey.json file and then provide environment variable GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json

Sample request: curl -d "name=piyush" http://localhost:8080/createPerson

Below is the sample file which accepts above request and after converting into spring message, it pushes into pubsub topic "person"

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setPublishCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("There was an error sending the message.");
            }

            @Override
            public void onSuccess(String result) {
                LOGGER.info("Message was sent successfully.");
            }
        });

        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    }
}

The build.gradle dependencies are:

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

ext {
    set('springCloudVersion', "Hoxton.SR4")
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-webflux'
    implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

test {
    useJUnitPlatform()
}

New application file after making PubSubMessageHandler as sync and adding ExpressionEvaluatingRequestHandlerAdvice but this is giving error "'beanFactory' must not be null" when MessagingGatewaySupport creates Correlator.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel replyChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel errorChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "expressionAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());
        endpoint.setReplyChannel(replyChannel());
        endpoint.setErrorChannel(errorChannel());

        return endpoint;
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannel(replyChannel());
        advice.setFailureChannel(errorChannel());
        return advice;
    }
}

Stacktrace of error which is coming after sending http request:

2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1]  500 Server Error for HTTP POST "/createPerson"

java.lang.IllegalArgumentException: 'beanFactory' must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
        at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
2

2 Answers

0
votes

The PubSubMessageHandler is not designed for request/reply behavior. In most cases it is used as send-n-forget.

Since you really worry about a success/failure reply, I only can suggest something like:

  1. PubSubMessageHandler.setSync(true):

    /**
     * Set publish method to be synchronous or asynchronous.
     *
     * <p>Publish is asynchronous be default.
     * @param sync true for synchronous, false for asynchronous
     */
    public void setSync(boolean sync) {
    

This way your PubSubMessageHandler is going to wait for pubsubFuture.get(); and if it fails a MessageHandlingException is going to be thrown.

  1. To handle success or failure for this sync scenario i suggest to take a look into an ExpressionEvaluatingRequestHandlerAdvice with its successChannel and failureChannel. Where on onSuccessExpression I think should #root pointing to a requestMessage. The onFailureExpression may consult an #exception SpEL expression variable, but still propagate a requestMessage to the failureChannel. The reason I talk about a requestMessage because it has that important replyChannel to respond to that WebFluxInboundEndpoint request. See more info in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

  2. Those successChannel and failureChannel and failure sub-flows should reply properly with some return leaving their outputChannel empty.

But at the same time I totally agree that it would be much easier to make that PubSubMessageHandler as AbstractReplyProducingMessageHandler returning some ListenableFuture to let us to process a publishing result.

0
votes

Thanks @Artem. I resolved it by providing custom request handler advice which is identifying replyChannel from message header in success scenario and sending message payload in response to weblflux endpoint.

For error scenario, I am relying on error handling mechanism of ReactiveStreamsConsumer which internally uses errorChannel to send the error back to weblflux endpoint.

Please advice whether this implementation is correct.

Below is the code for PubSubRequestHandlerAdvice:

package com.example;

import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice {

  private final MessagingTemplate messagingTemplate = new MessagingTemplate();

  @Override
  protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {

    Object result = callback.execute();

    Object evalResult = message.getPayload();
    MessageChannel successChannel = null;
    Object replyChannelHeader = message.getHeaders().getReplyChannel();
    if (replyChannelHeader instanceof MessageChannel) {
      successChannel = (MessageChannel) replyChannelHeader;
    }

    if (evalResult != null && successChannel != null) {
      AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
      this.messagingTemplate.send(successChannel, resultMessage);
    }
    return result;
  }
}

Final application file which is using PubSubRequestHandlerAdvice for PubSubMessageHandler.

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication {

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) {
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    }

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
        return new JacksonPubSubMessageConverter(objectMapper);
    }

    @Bean
    public MessageChannel pubSubOutputChannel() {
        return MessageChannels.flux().get();
    }

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "pubSubAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    }

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() {

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    }

    @Bean
    public Advice pubSubAdvice() {
        return new PubSubRequestHandlerAdvice();
    }

}

Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample