I am creating a simple spring boot app using spring integration. Below are the three main constructs of this app:
- Inbound Gateway: WebFluxInboundEndpoint which accepts http request
- Outbound Gateway: PubSubMessageHandler which pushes message to google cloud pubsub topic
- Message Channel: FluxMessageChannel acting as request channel
Google cloud PubSubMessageHandler provides failure and success callback due to which error/success response is not returned back to webflux endpoint and request waits for indefinite time.
Ask: How success/failure response can be returned back after receiving response from pubsub?
Working copy of application is available here: https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
To run application please put your google cloud service key in serviceAccountKey.json file and then provide environment variable GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json
Sample request: curl -d "name=piyush" http://localhost:8080/createPerson
Below is the sample file which accepts above request and after converting into spring message, it pushes into pubsub topic "person"
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setPublishCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
LOGGER.info("Message was sent successfully.");
}
});
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
}
}
The build.gradle dependencies are:
plugins {
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-webflux'
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
New application file after making PubSubMessageHandler as sync and adding ExpressionEvaluatingRequestHandlerAdvice but this is giving error "'beanFactory' must not be null" when MessagingGatewaySupport creates Correlator.
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication {
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args) {
SpringApplication.run(PubSubWebFluxApplication.class, args);
}
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
@Bean
public MessageChannel pubSubOutputChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel errorChannel() {
return MessageChannels.flux().get();
}
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "expressionAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate) {
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
}
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
endpoint.setReplyChannel(replyChannel());
endpoint.setErrorChannel(errorChannel());
return endpoint;
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannel(replyChannel());
advice.setFailureChannel(errorChannel());
return advice;
}
}
Stacktrace of error which is coming after sending http request:
2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1] 500 Server Error for HTTP POST "/createPerson"
java.lang.IllegalArgumentException: 'beanFactory' must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]