0
votes

Spring Integration Headers are not getting injected to Service Activator POJO.This happens only at the time of load testing (with less than 10 tps). Normal flow its is working as expected. Spring integration flow is

Splitter -> executor channel -> service activator -> aggregator.

SI version 4.1.2.RELEASE
Spring Version 4.1.2.RELEASE
Java version 1.8.0_60

//service activator -Method     
public ProductDetail getProductDetails(final String productID,
            @Header(STATS) final Stats statsType,
            @Header(SKU) final String sku,
            @Header(PRODUCT_LIST) final Collection<String> productList,
            @Header(NO_OF_PRODUCTS) final String size) 

final MessageBuilder<Map<String, Object>> msgBuilder = MessageBuilder
                .withPayload(payload);//Map<String, Object> payload
this.addMessageHeaders(msgBuilder, httpHeadersMap, statsType);


private final boolean addMessageHeaders(MessageBuilder<?> msgBuilder,
            final Map<String, String> httpHeadersMap, final Stats statsType) {
        if ((httpHeadersMap != null) && (httpHeadersMap.size() > 0)) {
            for (Map.Entry<String, String> entry : httpHeadersMap.entrySet()) {
                msgBuilder.setHeader(entry.getKey(), entry.getValue());
            }
        }
        if (statsType != null) {
            msgBuilder.setHeader(STATS, statsType);
        }
        return true;
}

org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalArgumentException: required header not available: STATS at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) ~[spring-integration-core-4.1.2.RELEASE.jar:na] at xx.run(DelegatingContextRunnable.java:40) ~[xx-core-3.xx.jar:3.24.0.52] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_60] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_60] Caused by: java.lang.IllegalArgumentException: required header not available: STATS at org.springframework.util.Assert.isTrue(Assert.java:68) ~[spring-core-4.2.2.RELEASE.jar:4.2.2.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]

<int:channel id="retrieve-product.in" />
<int:channel id="retrieve-product.out" />
<int:channel id="retrieve-product.err" />
<int:channel id="retrieve-product.splitter.in"/>
<int:channel id="retrieve-product.sa.in">
    <int:dispatcher task-executor="testExecutor" failover="false"/>
</int:channel>
<int:channel id="retrieve-product.aggregator.in"/>
<int:channel id="retrieve-product.transformer.in"/>

<bean id="retrieveProductsServiceActivatorSupport" class="com.test.eip.serviceactivator.support.RetrieveProductsServiceActivatorSupport" />
<bean id="retrieveProductsServiceActivator" class="com.test.eip.serviceactivator.RetrieveProductsServiceActivator" />
<bean id="retrieveProductsAggregator" class="com.test.eip.aggregator.RetrieveProductsAggregator" />
<bean id="retrieveProductResponseTransformer" class="com.test.eip.transformer.RetrieveProductsResponseTransformer" />
<bean id="productGroupsConfig" class="com.test.support.ProductGroupsConfig" >
    <constructor-arg index="0" value="ProductMapping.xml" />
</bean>
<bean id="payloadSizeBasedReleaseStrategy" class="com.test.eip.aggregator.support.PayloadSizeBasedReleaseStrategy" />
<oxm:jaxb2-marshaller id="productConfigJaxbMarshaller" context-path="com.test.product.config.types" />

<int:gateway id="retrieveProductsGateway" service-interface="com.test.eip.gateway.IProductGateway" error-channel ="retrieve-product.err">
    <int:method name="getProductsResponse" request-channel="retrieve-product.in" reply-channel="retrieve-product.out" />
</int:gateway>

<int:header-enricher input-channel="retrieve-product.in" output-channel="retrieve-product.splitter.in">
    <int:header name="API_NAME" value="retrieve-product" />
</int:header-enricher>
<!-- Input List<String> contain product numbers --> 
<int:splitter id="productListSplitter" input-channel="retrieve-product.splitter.in" 
                output-channel="retrieve-product.sa.in"></int:splitter>
<!-- Retrieve Product Features from USING API -->               
<int:service-activator input-channel="retrieve-product.sa.in"
    ref="retrieveProductsServiceActivator" method="getProductDetails"
    output-channel="retrieve-product.aggregator.in"
    id="retrieve-product-service-activator" />
<!-- Aggregate result from Parallel Call -->
<int:aggregator input-channel="retrieve-product.aggregator.in"
    output-channel="retrieve-product.transformer.in"
    method="aggregateProductDetails" ref="retrieveProductsAggregator"
    release-strategy="payloadSizeBasedReleaseStrategy"
    release-strategy-method="canRelease">
</int:aggregator>

<!-- Generate Service Response-->
<int:transformer id="retrieve-product-ResponseTransformer" method="transform"
        ref="retrieveProductResponseTransformer" input-channel="retrieve-product.transformer.in" output-channel="retrieve-product.out" />

<!-- 
    Listen for error and generate response
 -->
<int:service-activator input-channel="retrieve-product.err"
    ref="testErrorHandler" method="handleExceptions"
    output-channel="retrieve-product.out"
    id="retrieve-product-err-handler" />

public interface IProductGateway {
    public RetrieveProductsResponse getProductsResponse(Message<?> inputMessage);
}

//Service
@Service
public class ProductsService implements IProductsService {
    @Autowired
    private IProductGateway retrieveProductsGateway;

    @Override
    public RetrieveProductsResponse getProductsResponse(
            final RetrieveProductsRequest retrieveProductsRequest,
            final Map<String, String> httpHeadersMap, final Stats statsType) {
        final ProductsReqType products = retrieveProductsRequest.getProducts();
        //populate productList From products
        final Collection<String> productList = new HashSet<>();
        httpHeadersMap.put(SKU, retrieveProductsRequest.getSKU());
        httpHeadersMap.put(NO_OF_PRODUCTS,
                String.valueOf(productList.size()));
        final MessageBuilder<Collection<String>> msgBuilder = MessageBuilder
                .withPayload(productList)
                .setHeader(PRODUCT_LIST, new HashSet<>(productList));
        this.addMessageHeaders(msgBuilder, httpHeadersMap, statsType);
        final Message<Collection<String>> inMsg = msgBuilder.build();
        LOG.debug("retrieveProductsGateway:: {}", this.retrieveProductsGateway);
        final RetrieveProductsResponse retrieveProductsResponse = this.retrieveProductsGateway
                .getProductsResponse(inMsg);
        return retrieveProductsResponse;
    }
}

//Jersey Filter is creating Stats setting it to requestContext( javax.ws.rs.container.ContainerRequestFilter)
/* Request Filter Creating and setting statsType 
*/
@Override
public void filter(final ContainerRequestContext requestContext) throws IOException {
    LOG.debug("filter(requestContext)::Entry");
    final Stats statsType = STATS_OBJ_FACTORY.createStats();
    final MultivaluedMap<String, String> headers = requestContext.getHeaders();
    statsType.setBegin(DT_FACTORY.newXMLGregorianCalendar(new GregorianCalendar()));
    requestContext.setProperty(STATS, statsType);
    LOG.debug("filter(requestContext)::Exit");
}

//Resource Method taking statsType and passing to ProductsService
/*javax.ws.rs.core.Context*/
@POST
@Path("retrieve-product")
@Produces(MyMediaType.PD_XML_MEDIA_TYPE)
@Consumes(MyMediaType.PD_XML_MEDIA_TYPE)
public Response getProducts(RetrieveProductsRequest retrieveProductsRequest,
        @Context final ContainerRequestContext containerRequestContext,
        @Context final HttpHeaders headers) {
    LOG.debug("getProducts::Entry");
    final Stats statsType = (Stats) containerRequestContext
            .getProperty(STATS);
    final Map<String, String> httpHeadersMap = ResourceHelper
            .getHttpHeadersAsMap(headers);
    final RetrieveProductsResponse retrieveProductsResponse = this.productsService
            .getProductsResponse(retrieveProductsRequest, httpHeadersMap, statsType);
    LOG.debug("getProducts::Exit");
    return Response.ok(retrieveProductsResponse).build();
}

Thanks,

1

1 Answers

0
votes

required header not available: STATS

Simply means that whatever is upstream that creates the message is not populating the headers; you need to show all your configuration (and possibly code). DEBUG logging and following the messages will generally help debugging such conditions.

A common mistake is a lack of thread-safety - such as in your splitter.