We are running on mule Community Edition 3.4.1 Investment to make code work there currently outweighs upgrade options.
The Mongo connector at that version leaks resources so bad, that is basically unusable for mass data processing. We are processing approximately 10M-100M records a day (which is not all that huge) and with the Mongo connector out of the box, we would have to restart Mule approximately every three hours. Now you can do that intelligently and not, and we can invest time to make it relatively painless however with the amount of automation and dependent processes we instead opted to write our own Mongo connector.
For usability purposes though and to fit the skills of our existing talent pool, we wanted to create something that provides easy usability and NO leaks on resources.
We basically created a simple Mule Transformer in Java, that exposes it's functionality through static methods. Each static method receives an instance reference, and the instance has the current MuleEvent. Basically, we simply override the process method to save the MuleEvent and the doProcess method to save the payload to a private property, then return the instance reference as the payload (this).
The next element in the flow is an Expression transformer, that calls the static method with the payload as the instance, and contains all expressions as simple MEL expressions, just as you could do in the Mongo connector.
We are using a POJO, and Mule Studio is able to pick up the static method references, and the use of this connector is very simple, and easy. It can also process hundreds of millions of calls without any issue whatsoever, since we are very carefully managing resources (actually, too conservative) and all is great. Except. When the processing is really fast, sometimes Mule loses its cool and throws a null exception. We know for a fact there is no null exception possible at a normal scenario at that point. We actually even explicitly added expression filters that would not allow a null expression to raise, however it still happens. But only, if there is an asynchronous flow. Now before we would jump on that, inside an asycnhronous flow the components still execute in sequence, and our solution puts the instance of the Mongo connector on the message payload and picks it right up. We want to have some input how this issue happens and why.
Here is some parts of the code, I can add more. EDIT: I added the line in the flow before. You will see, currentSku cannot be null
<set-session-variable variableName="currentSku" value="#[payload.?sku]" doc:name="currentSku"/>
<logger message="#[currentSku]" level="INFO" category="fiuze.plugins.contentproviders.ExtraData.asycnhFlow" doc:name="Logger"/>
<expression-filter expression="#[currentSku != null ]" doc:name="filterForNoSKUinpayload"/>
<custom-transformer class="com.fiuze.components.Mongo" doc:name="mongoInit" />
<expression-transformer expression="#[com.fiuze.components.Mongo.many(payload, "fizueDemoMongo","products","{'productChannelData.default.ExtraData':{$exists:false}}",100)]" doc:name="mongoGetProductsWithoutExtraData"/>
add here is the Java code that reflects this call:
public static synchronized Object many(Mongo instance, String configRef, String collectionName, String query, Integer limit) throws TransformerException {
return Mongo.exec(instance, configRef, collectionName, query, limit, null, null, false);
}
and here us the Exec implementation:
private static synchronized Object exec(Mongo instance, String configRef, String collectionName, Object query, Integer limit, Integer skip, Object sort, Boolean count) throws TransformerException {
instance.getMuleMessage().setPayload(instance.getPayload());
instance.setConfigref(configRef);
instance.setCollectionName(collectionName);
instance.setLimit(limit);
instance.setSkip(skip);
sort = instance.evaluate(sort);
instance.setQuery(instance.evaluate(query));
if (sort instanceof BasicDBObject) {
instance.setSort((BasicDBObject)sort);
} else if (sort instanceof String) {
instance.setSort((String)sort);
} else if (sort == null){
instance.setSort((String)null);
} else {
throw new TransformerException( (Message) instance.getMuleMessage() );
}
instance.setCount(count);
Object result = instance.execFind();
instance.dispose();
return result;
}
as you noticed, we use synchronized calls, so the static methods don't get confused.
Since this is a complicated issue, and I am not sure I am expressing it clearly, please do not hesitate to ask me details, I will be more than happy to provide.
EDIT:
Here is part of the log, I will include the part with no issues, the error, and then no issues further. In the section, you will see that the process continues, but a particular Asynch thread is messed up.
INFO 2016-05-20 21:22:00,760 [[fiuze].getExtraProductData.async1.1579] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:00,769 [[fiuze].getExtraProductData.async1.1581] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,812 [[fiuze].getExtraProductData.async1.1590] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,825 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:00,831 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,887 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,959 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:00,964 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:00,995 [[fiuze].getExtraProductData.async1.1580] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84422
INFO 2016-05-20 21:22:00,996 [[fiuze].getExtraProductData.async1.1579] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84436
INFO 2016-05-20 21:22:00,998 [[fiuze].getExtraProductData.async1.1578] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84454
INFO 2016-05-20 21:22:00,999 [[fiuze].getExtraProductData.async1.1587] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84455
INFO 2016-05-20 21:22:01,000 [[fiuze].getExtraProductData.async1.1590] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84446
INFO 2016-05-20 21:22:01,002 [[fiuze].getExtraProductData.async1.1586] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84445
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1589] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84470
INFO 2016-05-20 21:22:01,006 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84477
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1591] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84462
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1588] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84464
ERROR 2016-05-20 21:22:01,010 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,004 [[fiuze].getExtraProductData.async1.1583] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84447
INFO 2016-05-20 21:22:01,003 [[fiuze].getExtraProductData.async1.1585] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84456
ERROR 2016-05-20 21:22:01,014 [[fiuze].getExtraProductData.async1.1577] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
2. cannot invoke method: update (java.lang.RuntimeException)
org.mvel2.optimizers.impl.refl.nodes.MethodAccessor:88 (null)
3. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:218 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html)
4. Execution of the expression "com.fiuze.components.Mongo.update(payload, "fizueDemoMongo","products",true,true,"{ sku: #[currentSku] }","{$set: { 'productChannelData.default.Extra.attemptedAt':'#[server.dateTime]'}}")" failed. (org.mule.api.expression.ExpressionRuntimeException) (org.mule.api.transformer.TransformerException)
org.mule.expression.transformers.ExpressionTransformer:66 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
+ 0 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.1582] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84482
INFO 2016-05-20 21:22:01,015 [[fiuze].getExtraProductData.async1.01] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84483
INFO 2016-05-20 21:22:01,023 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.api.processor.LoggerMessageProcessor: starting ForEach
WARN 2016-05-20 21:22:01,024 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
ERROR 2016-05-20 21:22:01,034 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,035 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,145 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,153 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,268 [[fiuze].getExtraProductData.async1.1581] org.mule.api.processor.LoggerMessageProcessor: no match found
ERROR 2016-05-20 21:22:01,269 [[fiuze].getExtraProductData.async1.1584] org.mule.api.processor.LoggerMessageProcessor: no match found
WARN 2016-05-20 21:22:01,385 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
WARN 2016-05-20 21:22:01,586 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.module.mongo.api.MongoCollection: Method toArray needs to consume all the element. It is inefficient and thus should be used with care
WARN 2016-05-20 21:22:01,600 [[fiuze].ConnectorWithoutMuleSessionHTTP.receiver.64] org.mule.routing.Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.01] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,667 [[fiuze].getExtraProductData.async1.1580] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1583] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,675 [[fiuze].getExtraProductData.async1.1586] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:01,820 [[fiuze].getExtraProductData.async1.1581] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84485
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1584] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84488
INFO 2016-05-20 21:22:01,821 [[fiuze].getExtraProductData.async1.1577] fiuze.plugins.contentproviders.Extra.asycnhFlow: 84490
ERROR 2016-05-20 21:22:01,892 [[fiuze].getExtraProductData.async1.1580] org.mule.api.processor.LoggerMessageProcessor: no match found
INFO 2016-05-20 21:22:01,911 [[fiuze].getExtraProductData.async1.1585] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,136 [[fiuze].getExtraProductData.async1.1591] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,231 [[fiuze].getExtraProductData.async1.1588] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
INFO 2016-05-20 21:22:02,356 [[fiuze].getExtraProductData.async1.1587] org.mule.transport.jdbc.sqlstrategy.SelectSqlStatementStrategy: SQL query received a result
ERROR 2016-05-20 21:22:02,425 [[fiuze].getExtraProductData.async1.1587] org.mule.api.processor.LoggerMessageProcessor: no match found
updatemethod oncom.fiuze.components.Mongothrows an NPE. Any idea why? - David DossotcurrentSkuset? It seems it's the only thing that could cause an eval issue. This can of problem are hard to solve on StackOverflow, I'd try to put a breakpoint onorg.mule.api.expression.ExpressionRuntimeExceptionand step debug to find out what's wrong with the state of the app. And then remove this unfortunatesynchronizedkeyword :) - David Dossot