0
votes

I need to take data(input.xml) from one file which is size in 100MB-200MB and need to write into four different files based on some logic.

input xml :

            <?xml version="1.0"?>
            <Orders>
                 <Order><OrderId>1</OrderId><Total>10</Total><Name>jon1</Name></Order>
                <Order><OrderId>2</OrderId><Total>20</Total><Name>jon2</Name></Order>
                <Order><OrderId>3</OrderId><Total>30</Total><Name>jon3</Name></Order>
                <Order><OrderId>4</OrderId><Total>40</Total><Name>jon4</Name></Order>
            <Orders>

logic is if Total is 1-10 then write to file1 and if Total is 11-20 then write to file2.....,

expected output:

1 10 jon1 -->write into file1

2 20 jon2 -->write into file2

3 30 jon3 -->write into file3

4 40 jon4 -->write into file4

Here i have enabled streaming in datamapper which is under configuration but i'm not getting proper output. The problem is i'm getting only some recodes into only one file which should come into that file after satisfying the condition.

But if i disable streaming button in datamapper it is working fine. As there are lakes of records i must use streaming option.

Is there any otherway to configure datamapper to enable streaming option..?

Please suggest me on this., Thanks.,

1

1 Answers

0
votes

It is difficult to see a problem without a little more detail on what you are doing. Nevertheless, I think this probably will help you to try another approach.

The data mapper will load the full XML document into memory although you activate streaming, it has to do it in order to support XPATH (it loads the full xml input into a DOM). So if you can not afford to load 200Mb document into memory you will need to try a workaround.

What I have done before is creating a java component that transforms the input stream to an iterator with the help of a stax parser. With a very simple implementation you can code an iterator that pulls from the stream to create the next element (a pojo, a map, a string...). In the mule flow, after the "java component", you should be able to use a "for-each" with a "choice" within and apply your logic.

A quick example for your data:

package tests;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class OrdersStreamIterator implements Iterator<Map<String,String>> {

    final static Log LOGGER = LogFactory.getLog(OrdersStreamIterator.class);

    final InputStream is;
    final XMLStreamReader xmlReader;

    boolean end = false;
    HashMap<String,String> next;

    public OrdersStreamIterator(InputStream is)
            throws XMLStreamException, FactoryConfigurationError {
        this.is = is;
        xmlReader = XMLInputFactory.newInstance().createXMLStreamReader(is);
    }

    protected HashMap<String,String> _next() throws XMLStreamException {
        int event;
        HashMap<String,String> order = null;
        String orderChild = null;
        String orderChildValue = null;
        while (xmlReader.hasNext()) {
            event = xmlReader.getEventType();
            if (event == XMLStreamConstants.START_ELEMENT) {
                if (order==null) {
                    if (checkOrder()) {
                        order = new HashMap<String,String>();
                    }
                }
                else {
                    orderChild = xmlReader.getLocalName();
                }
            }
            else if (event == XMLStreamConstants.END_ELEMENT) {
                if (checkOrders()) {
                    end = true;
                    return null;
                }
                else if (checkOrder()) {
                    xmlReader.next();
                    return order;
                }
                else if (order!=null) {
                    order.put(orderChild, orderChildValue);
                    orderChild = null;
                    orderChildValue = null;
                }
            }
            else if (order!=null && orderChild!=null){
                switch (event) {
                case XMLStreamConstants.SPACE:
                case XMLStreamConstants.CHARACTERS:
                case XMLStreamConstants.CDATA:
                    int start = xmlReader.getTextStart();
                    int length = xmlReader.getTextLength();
                    if (orderChildValue==null) {
                        orderChildValue = new String(xmlReader.getTextCharacters(), start, length);
                    }
                    else {
                        orderChildValue += new String(xmlReader.getTextCharacters(), start, length);
                    }
                    break;
                }
            }
            xmlReader.next();
        }
        end = true;
        return null;
    }

    protected boolean checkOrder() {
        return "Order".equals(xmlReader.getLocalName());
    }

    protected boolean checkOrders() {
        return "Orders".equals(xmlReader.getLocalName());
    }

    @Override
    public boolean hasNext() {
        if (end) {
            return false;
        }
        else if (next==null) {
            try {
                next = _next();
            } catch (XMLStreamException e) {
                LOGGER.error(e.getMessage(), e);
                end = true;
            }
            return !end;
        }
        else {
            return true;
        }
    }


    @Override
    public Map<String,String> next() {
        if (hasNext()) {
            final HashMap<String,String> n = next;
            next = null;
            return n;
        }
        else {
            return null;
        }
    }


    @Override
    public void remove() {
        throw new RuntimeException("ReadOnly!");
    }

    // Test

    public static String dump(Map<String,String> o) {
        String s = "{";
        for (Entry<String,String> e : o.entrySet()) {
            if (s.length()>1) {
                s+=", ";
            }
            s+= "\"" + e.getKey() + "\" : \"" + e.getValue() + "\"";
        }
        return s + "}";
    }

    public static void main(String[] argv) throws XMLStreamException, FactoryConfigurationError {
        final InputStream is = OrdersStreamIterator.class.getClassLoader().getResourceAsStream("orders.xml");
        final OrdersStreamIterator i = new OrdersStreamIterator(is);
        while (i.hasNext()) {
            System.out.println(dump(i.next()));
        }
    }
}

An example flow:

 <flow name="testsFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
        <scripting:component doc:name="Groovy">
            <scripting:script engine="Groovy"><![CDATA[return tests.OrdersStreamIterator.class.getClassLoader().getResourceAsStream("orders.xml");]]></scripting:script>
        </scripting:component>
        <set-payload value="#[new tests.OrdersStreamIterator(payload)]" doc:name="Iterator"/>
        <foreach doc:name="For Each">
            <logger message="#[tests.OrdersStreamIterator.dump(payload)]" level="INFO" doc:name="Logger"/>
        </foreach>
    </flow>