2
votes

We are running on mule Community Edition 3.4.1 Investment to make code work there currently outweighs upgrade options.

The Mongo connector at that version leaks resources so bad, that is basically unusable for mass data processing. We are processing approximately 10M-100M records a day (which is not all that huge) and with the Mongo connector out of the box, we would have to restart Mule approximately every three hours. Now you can do that intelligently and not, and we can invest time to make it relatively painless however with the amount of automation and dependent processes we instead opted to write our own Mongo connector.

For usability purposes though and to fit the skills of our existing talent pool, we wanted to create something that provides easy usability and NO leaks on resources.

We basically created a simple Mule Transformer in Java, that exposes it's functionality through static methods. Each static method receives an instance reference, and the instance has the current MuleEvent. Basically, we simply override the process method to save the MuleEvent and the doProcess method to save the payload to a private property, then return the instance reference as the payload (this).

The next element in the flow is an Expression transformer, that calls the static method with the payload as the instance, and contains all expressions as simple MEL expressions, just as you could do in the Mongo connector.

We are using a POJO, and Mule Studio is able to pick up the static method references, and the use of this connector is very simple, and easy. It can also process hundreds of millions of calls without any issue whatsoever, since we are very carefully managing resources (actually, too conservative) and all is great. Except. When the processing is really fast, sometimes Mule loses its cool and throws a null exception. We know for a fact there is no null exception possible at a normal scenario at that point. We actually even explicitly added expression filters that would not allow a null expression to raise, however it still happens. But only, if there is an asynchronous flow. Now before we would jump on that, inside an asycnhronous flow the components still execute in sequence, and our solution puts the instance of the Mongo connector on the message payload and picks it right up. We want to have some input how this issue happens and why.

Here is some parts of the code, I can add more. EDIT: I added the line in the flow before. You will see, currentSku cannot be null

<set-session-variable variableName="currentSku" value="#[payload.?sku]" doc:name="currentSku"/>
<logger message="#[currentSku]" level="INFO" category="fiuze.plugins.contentproviders.ExtraData.asycnhFlow" doc:name="Logger"/>
<expression-filter expression="#[currentSku != null ]" doc:name="filterForNoSKUinpayload"/> 
<custom-transformer class="com.fiuze.components.Mongo" doc:name="mongoInit" />
<expression-transformer expression="#[com.fiuze.components.Mongo.many(payload, &quot;fizueDemoMongo&quot;,&quot;products&quot;,&quot;{'productChannelData.default.ExtraData':{$exists:false}}&quot;,100)]" doc:name="mongoGetProductsWithoutExtraData"/>

add here is the Java code that reflects this call:

    public static synchronized Object many(Mongo instance, String configRef, String collectionName, String query, Integer limit) throws TransformerException {
      return Mongo.exec(instance, configRef, collectionName, query, limit, null, null, false);
    }

and here us the Exec implementation:

    private static synchronized Object exec(Mongo instance, String configRef, String collectionName, Object query, Integer limit, Integer skip, Object sort, Boolean count) throws TransformerException {

    instance.getMuleMessage().setPayload(instance.getPayload());
    instance.setConfigref(configRef);
    instance.setCollectionName(collectionName);
    instance.setLimit(limit);
    instance.setSkip(skip);
    sort = instance.evaluate(sort);
    instance.setQuery(instance.evaluate(query));
    if (sort instanceof BasicDBObject) {
        instance.setSort((BasicDBObject)sort);
    } else if (sort instanceof String) {
        instance.setSort((String)sort);
    } else if (sort == null){
        instance.setSort((String)null);
    } else {
        throw new TransformerException( (Message) instance.getMuleMessage() );
    }
    instance.setCount(count);
    Object result = instance.execFind();
    instance.dispose();
    return result;
}

as you noticed, we use synchronized calls, so the static methods don't get confused.

Since this is a complicated issue, and I am not sure I am expressing it clearly, please do not hesitate to ask me details, I will be more than happy to provide.

EDIT:

Here is part of the log, I will include the part with no issues, the error, and then no issues further. In the section, you will see that the process continues, but a particular Asynch thread is messed up.

INFO  2016-05-20 21:22:00,760 [[fiuze].getExtraProductData.async1.1579] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:00,769 [[fiuze].getExtraProductData.async1.1581] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,812 [[fiuze].getExtraProductData.async1.1590] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:00,825 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,831 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,887 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,959 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:00,995 [[fiuze].getExtraProductData.async1.1580] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84422
INFO  2016-05-20 21:22:00,996 [[fiuze].getExtraProductData.async1.1579] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84436
INFO  2016-05-20 21:22:00,998 [[fiuze].getExtraProductData.async1.1578] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84454
INFO  2016-05-20 21:22:00,999 [[fiuze].getExtraProductData.async1.1587] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84455
INFO  2016-05-20 21:22:01,000 [[fiuze].getExtraProductData.async1.1590] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84446
INFO  2016-05-20 21:22:01,002 [[fiuze].getExtraProductData.async1.1586] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84445
INFO  2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1589] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84470
INFO  2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84477
INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1591] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84462
INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1588] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84464
ERROR 2016-05-20 21:22:01,010 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message               : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
  org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
  org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
  org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
    + 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1583] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84447
INFO  2016-05-20 21:22:01,003 [[fiuze].getExtraProductData.async1.1585] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84456
ERROR 2016-05-20 21:22:01,014 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message               : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
  org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
  org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
  org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
    + 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.1582] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84482
INFO  2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.01] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84483
INFO  2016-05-20 21:22:01,023 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.api.processor.LoggerMessageProcessor: starting ForEach
WARN  2016-05-20 21:22:01,024 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
ERROR 2016-05-20 21:22:01,034 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,035 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,145 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,153 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,268 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,269 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
WARN  2016-05-20 21:22:01,385 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
WARN  2016-05-20 21:22:01,586 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
WARN  2016-05-20 21:22:01,600 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
INFO  2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.1580] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1583] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1586] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:01,820 [[fiuze].getExtraProductData.async1.1581] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84485
INFO  2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1584] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84488
INFO  2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84490
ERROR 2016-05-20 21:22:01,892 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO  2016-05-20 21:22:01,911 [[fiuze].getExtraProductData.async1.1585] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,136 [[fiuze].getExtraProductData.async1.1591] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,231 [[fiuze].getExtraProductData.async1.1588] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO  2016-05-20 21:22:02,356 [[fiuze].getExtraProductData.async1.1587] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:02,425 [[fiuze].getExtraProductData.async1.1587] org.mule.api.processor.LoggerMessageProcessor: no match found
1
Providing the stacktrace of the exception would help. - David Dossot
Added the log. David -- thank you for looking, I have learned a lot from you before. - Menashe Borbely
So it seems the update method on com.fiuze.components.Mongo throws an NPE. Any idea why? - David Dossot
David, not sure about that. See it says 2. cannot invoke method: update (java.lang.RuntimeException), which tells me the exception is thrown in the phase where Mule is evaluating the expression transformer to call it. Somehow at that moment, evaluating the values, it throws an NPE. I am thinking maybe thread safety between me calling the transformer to initialize and then using it. Only happens in Asynch, not in a synch use case. - Menashe Borbely
How is currentSku set? It seems it's the only thing that could cause an eval issue. This can of problem are hard to solve on StackOverflow, I'd try to put a breakpoint on org.mule.api.expression.ExpressionRuntimeException and step debug to find out what's wrong with the state of the app. And then remove this unfortunate synchronized keyword :) - David Dossot

1 Answers

0
votes

You are most likely stung by a variant of this bug I reported long ago: MULE-6630 which was fixed in 3.5.0 (see MULE-7414. There was some flakiness in MEL multi-threading in early versions.