I am new to Mule 4 and have following understanding/doubts regarding streams. I will really appreciate the help.
- When will mule 4 component generate a stream? For example, database select can return both array of objects and a single value. So will the return payload be stream in both cases?
- How to check if the payload returned by a mule 4 component is a stream and if it is, how to analyze the stream meaning where are the files created in case of Repeatable file store stream and how much payload is already consumed as a stream?
For example, I create the below Mule 4 application that reads a CSV file having 1 milion records and do the following :
- Read the CSV file [ Streaming Strategy : Repeatable file store stream, in memory size: 512 KB ]
- Using a for each loop of batch size 10k
- inside for each, a transform message to transform the csv row to a Json and a file write operation that will create files with name as per the dw code:
p('destination.dir') ++ "Output_" ++ vars.counter ++ ".txt"
100 files are generated each containing 10 k records. When checked, each file has size 913 KB.
Expected : For each will process n records where size of n records is 512 KB and process the (batchsize-n) in next iteration Actual: For each processed 10000 records in each batch. How did it happen?
Mule flow code:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<file:config name="File_Config_1" doc:name="File Config" doc:id="565a1655-beba-4048-942f-ae68887e9b96" />
<file:config name="File_Config" doc:name="File Config" doc:id="5182b61a-dddf-41c1-8d3c-6dbd895b5db7" />
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="f5bdb2ca-dc6f-4b6b-8407-8543bf7522e2" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="dw-streamingFlow">
<http:listener doc:name="Listener" doc:id="78635a2a-959d-41e9-a294-3cc7bb22d36f" config-ref="HTTP_Listener_config" path="/app/fileStream"/>
<file:read path="C:\Users\bbazazx\Documents\TestFolder\InputDirectory\input.csv"
config-ref="File_Config"
outputMimeType="application/csv; streaming=true; header=true" />
<foreach doc:name="For Each" doc:id="eacd49dc-c49f-437a-927e-976add7e57fc" batchSize="500" collection="payload">
<ee:transform doc:name="Transform Message" doc:id="bec02dd1-4c15-47c0-81cf-a7d0bfd64b39">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
payload map(item,index) -> {
"Country" : item."Country",
"FoodItems" : item."Item Type"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write" config-ref="File_Config_1" path='#["C:\\Users\\bbazazx\\Documents\\TestFolder\\OutputDirectory\\output" ++ vars.counter ++ ".json"]' />
<logger level="INFO" doc:name="Logger" message="#[payload]" />
</foreach>
</flow>
</mule>