I have large (> 1 Gb) text file. I need to process that file row-by-row (apply business logic) in a multithreaded manner, so I wrote next code:
public Task Parse(Stream content, Action<Trade> parseCallback)
{
return Task.Factory.StartNew(() =>
{
using (var streamReader = new StreamReader(content))
{
string line;
while ((line = streamReader.ReadLine()) != null)
{
if (String.IsNullOrWhiteSpace(line))
{
continue;
}
var tokens = line.Split(TokensSeparator);
if (!tokens.Any() || tokens.Count() != 6)
{
continue;
}
Task.Factory.StartNew(() => parseCallback(new Trade
{
Id = Int32.Parse(tokens[0]),
MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
Quantity = Int64.Parse(tokens[3]),
TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
TradeType = tokens[5]
}),
TaskCreationOptions.AttachedToParent);
}
}
});
}
where Action parseCallback applies business logic on a data object created from a data row.
Parse() method returns Task and caller thread waits for parent task completion:
try
{
var parseTask = parser.Parse(fileStream, AddTradeToTradeResult);
parseTask.Wait();
}
catch (AggregateException ae)
{
throw new ApplicationException(ae.Flatten().InnerException.Message, ae);
}
Questions are:
- It is obvious that tasks in while loop can be created faster than processed. How TPL will treat such enqueued tasks? Will they wait until some thread from thread pool pick them and execute or there is possibility that they will be lost?
- Caller thread (parseTask.Wait()) is the main console application thread. Will I be able to interact with console app window during large file processing or it will be blocked?
- I realize that provided approach is wrong. How can I improve the solution? For example: read file stream and put data to some queue in main thread, process queue items with help of Tasks. Some other approach? Please give me direction.