2
votes

I am new to Mule 4 and have following understanding/doubts regarding streams. I will really appreciate the help.

  1. When will mule 4 component generate a stream? For example, database select can return both array of objects and a single value. So will the return payload be stream in both cases?
  2. How to check if the payload returned by a mule 4 component is a stream and if it is, how to analyze the stream meaning where are the files created in case of Repeatable file store stream and how much payload is already consumed as a stream?

For example, I create the below Mule 4 application that reads a CSV file having 1 milion records and do the following :

  • Read the CSV file [ Streaming Strategy : Repeatable file store stream, in memory size: 512 KB ]
  • Using a for each loop of batch size 10k
  • inside for each, a transform message to transform the csv row to a Json and a file write operation that will create files with name as per the dw code:
p('destination.dir')  ++ "Output_" ++ vars.counter ++ ".txt"

100 files are generated each containing 10 k records. When checked, each file has size 913 KB.

Expected : For each will process n records where size of n records is 512 KB and process the (batchsize-n) in next iteration Actual: For each processed 10000 records in each batch. How did it happen?

Mule flow code:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
    xmlns:file="http://www.mulesoft.org/schema/mule/file"
    xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
    xmlns="http://www.mulesoft.org/schema/mule/core"
    xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd 
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">

    <file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
    <file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>
    <flow name="dw-streamingFlow">
        <http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
        <file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
            config-ref="File_Config"
            outputMimeType="application/csv; streaming=true; header=true" />
        <foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
            <ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
            <ee:message>
                <ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
    "Country" : item."Country",
    "FoodItems" : item."Item Type"
}]]></ee:set-payload>
            </ee:message>
        </ee:transform>
            <file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\\Users\\bbazazx\\Documents\\TestFolder\\OutputDirectory\\output" ++ vars.counter ++ ".json"]' />
            <logger level="INFO" doc:name="Logger" message="#[payload]" />
        </foreach>

</flow>

</mule>

1

1 Answers

1
votes

It depends on each connector. Most of the connectors in Mule 4 allow streaming, and many allow to configure the streaming strategy. See https://docs.mulesoft.com/mule-runtime/4.3/streaming-about for details.

It maybe the case that the DB connectors returns an array enclosed into a stream. Mule will understand it and process it as an array transparently.

You can see in the debugger if the class of the payload is a kind of stream or iterable or a managed cursor. That seems to be an indication of streaming.

The 512 KB mentioned is the size of the buffer. For the file stored repeatable stream strategy the documentation links above explains it:

This strategy initially uses an in-memory buffer size of 512 KB. For larger streams, the strategy creates a temporary file to the disk to store the contents, without overflowing your memory.