2
votes

I'm trying to create a bundle and run it inside ServiceMix. I've encoutered a problem, any help is appreciated.

What I'm trying to do:

Produce SOAP messages from a CXF Endpoint and validate them against a XSD schema using Apache ServiceMix 5.0.0 (Camel 2.12.3).

What I've done:

Implemented a route that produces messages from a CXF endpoint and then validates them.

The route's configure method:

private RouteBuilder getInputRoute() {

    SoapValidatingProcessor soapValidatingProcessor = new SoapValidatingProcessor();

    inputRoute = new RouteBuilder() {

      @Override
      public void configure() throws Exception {

        from(cxfEndpointInId())
          .convertBodyTo(java.lang.String.class, "UTF-8")
          .onException(org.apache.camel.ValidationException.class)
            .log(LoggingLevel.INFO, LOG_NAME, "Invalid message received!")
            .handled(true)
            .stop()
            .end()
          .bean(soapValidatingProcessor).id("SoapHeaderValidatingProcessor")
          .to("browse:foo")

}

Created a validating processor:

public class SoapValidatingProcessor {

  private final String SCHEMA = "schema.xsd";  

  public SoapValidatingProcessor() {

    validatingProcessor = new ValidatingProcessor();
    validatingProcessor.setFailOnNullHeader(false);
    validatingProcessor.setFailOnNullBody(false);

  }

  @Handler
  public void validate(Exchange exchange) throws Exception {

    Resource validationSchema = context.getApplicationContext().getResource(SCHEMA);
    validatingProcessor.setSchemaUrl(validationSchema.getURL());
    validatingProcessor.loadSchema();

    /* Creating a new SchemaFactory instance */
    SchemaFactory xmlSchema = SchemaFactory
        .newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);   

    validatingProcessor.setSchemaFactory(xmlSchema);    
    validatingProcessor.process(exchange);

  }

}

Problem:

The Apache Camel ValidatingProcessor [1] does not behaves well in multi-threating. I'm sending SOAP messages at aprox. 20ms interval and I'm getting the following exception. If I increase the sending interval to something above 200ms, it's all good.

2014-07-11 17:11:47,404 | WARN  | ult-workqueue-18 | PhaseInterceptorChain            | ?                                   ? | 129 - org.ap
ache.cxf.cxf-api - 2.7.10 | Application {http://ws.service}WSImplService#{http://foo.bar}Update has thrown exception, unwinding now
org.apache.cxf.interceptor.Fault: FWK005 parse may not be called while parsing.
        at org.apache.camel.component.cxf.CxfConsumer$1.checkFailure(CxfConsumer.java:228)[181:org.apache.camel.camel-cxf:2.12.3]
        at org.apache.camel.component.cxf.CxfConsumer$1.setResponseBack(CxfConsumer.java:206)[181:org.apache.camel.camel-cxf:2.12.3]
        at org.apache.camel.component.cxf.CxfConsumer$1.syncInvoke(CxfConsumer.java:140)[181:org.apache.camel.camel-cxf:2.12.3]
        at org.apache.camel.component.cxf.CxfConsumer$1.invoke(CxfConsumer.java:75)[181:org.apache.camel.camel-cxf:2.12.3]
        at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:58)[129:org.apache.cxf.cxf-api:2.7.10]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)[:1.7.0_45]
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)[:1.7.0_45]
        at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)[129:org.apache.cxf.cxf-api:2.7.10]
        at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:107)[129:org.apache.cxf.cxf-api
:2.7.10]
        at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:272)[129:org.apache.cxf.cxf-api:2.7.10]
        at org.apache.cxf.phase.PhaseInterceptorChain.resume(PhaseInterceptorChain.java:242)[129:org.apache.cxf.cxf-api:2.7.10]
        at org.apache.cxf.interceptor.OneWayProcessorInterceptor$1.run(OneWayProcessorInterceptor.java:144)[129:org.apache.cxf.cxf-api:2.7.1
0]
        at org.apache.cxf.workqueue.AutomaticWorkQueueImpl$3.run(AutomaticWorkQueueImpl.java:428)[129:org.apache.cxf.cxf-api:2.7.10]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)[:1.7.0_45]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)[:1.7.0_45]
        at org.apache.cxf.workqueue.AutomaticWorkQueueImpl$AWQThreadFactory$1.run(AutomaticWorkQueueImpl.java:353)[129:org.apache.cxf.cxf-ap
i:2.7.10]
        at java.lang.Thread.run(Thread.java:744)[:1.7.0_45]
Caused by: org.xml.sax.SAXException: FWK005 parse may not be called while parsing.
        at org.apache.xerces.jaxp.validation.Util.toSAXException(Unknown Source)[:]

The only way I'm not getting that exception is if the validate method is synchronized but I don't want that.

What I've found:

I see here [2] that the bug was resolved. I've tried to create a new SchemaFactory [3] instance for each message but still no luck. Any ideeas why I'm still getting the error? What I'm doing wrong?

[1] http://camel.apache.org/maven/camel-2.8.0/camel-core/apidocs/org/apache/camel/processor/validation/ValidatingProcessor.html

[2] https://issues.apache.org/jira/browse/CAMEL-6630

[3] http://docs.oracle.com/javase/6/docs/api/javax/xml/validation/SchemaFactory.html

Thanks!

1

1 Answers

2
votes

Your code is not thread-safe. In your SoapValidatingProcessor you're using shared validatingProcessor, while modifying its state/configuration (by loading a different schema every time).

The schema, however, doesn't change in your example. In this case you should only configure the processor once (by loading the schema at startup).

Your handler should only call methods that do not modify processor state:

@Handler
public void validate(Exchange exchange) throws Exception {
    validatingProcessor.process(exchange);
}

And the state should be configured once, before any exchanges are processed, e.g.:

public SoapValidatingProcessor(CamelContext context) {
    validatingProcessor = new ValidatingProcessor();
    validatingProcessor.setFailOnNullHeader(false);
    validatingProcessor.setFailOnNullBody(false);

    Resource validationSchema = context.getApplicationContext().getResource(SCHEMA);
    validatingProcessor.setSchemaUrl(validationSchema.getURL());
    validatingProcessor.loadSchema();

    /* Creating a new SchemaFactory instance */
    SchemaFactory xmlSchema = SchemaFactory
        .newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);   

    validatingProcessor.setSchemaFactory(xmlSchema);    
}

In fact you would probably be better off by removing your SoapValidatingProcessor class and moving all the validatingProcessor configuration to your RouteBuilder.configure() method, and then using validatingProcessor directly in the route.