2
votes

I have large (> 1 Gb) text file. I need to process that file row-by-row (apply business logic) in a multithreaded manner, so I wrote next code:

public Task Parse(Stream content, Action<Trade> parseCallback)
{    
   return Task.Factory.StartNew(() =>
   {
      using (var streamReader = new StreamReader(content))
      {
        string line;
        while ((line = streamReader.ReadLine()) != null)
        {
            if (String.IsNullOrWhiteSpace(line))
            {
                continue;
            }

            var tokens = line.Split(TokensSeparator);
            if (!tokens.Any() || tokens.Count() != 6)
            {
                continue;
            }

            Task.Factory.StartNew(() => parseCallback(new Trade
            {
                Id = Int32.Parse(tokens[0]),
                MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
                Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
                Quantity = Int64.Parse(tokens[3]),
                TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
                TradeType = tokens[5]
            }),
            TaskCreationOptions.AttachedToParent);
        }
      }
   });
}

where Action parseCallback applies business logic on a data object created from a data row.

Parse() method returns Task and caller thread waits for parent task completion:

try
{
   var parseTask = parser.Parse(fileStream, AddTradeToTradeResult);
   parseTask.Wait();
}
catch (AggregateException ae)
{
   throw new ApplicationException(ae.Flatten().InnerException.Message, ae);
}

Questions are:

  1. It is obvious that tasks in while loop can be created faster than processed. How TPL will treat such enqueued tasks? Will they wait until some thread from thread pool pick them and execute or there is possibility that they will be lost?
  2. Caller thread (parseTask.Wait()) is the main console application thread. Will I be able to interact with console app window during large file processing or it will be blocked?
  3. I realize that provided approach is wrong. How can I improve the solution? For example: read file stream and put data to some queue in main thread, process queue items with help of Tasks. Some other approach? Please give me direction.
3
is parsing one row so expensive that you need a separate thread for that? If not I would parse several rows in one thread - thumbmunkeys
Have you considered Parallel.ForEach to process your file? - Big Daddy
Parsing one row is not expensive. Main problem is correctly process large file: in optimal way from the processor time utilization point of view, as fast as possible, without UI blocking. So I tried, but failed :) - Andrey Weber
Yes, Parallel.ForEach is an idea, thank you. - Andrey Weber
try using a concurrent collection where you add the data to from producer threads msdn.microsoft.com/en-us/library/dd997305%28v=vs.110%29.aspx the consumer threads can read the produced data - thumbmunkeys

3 Answers

0
votes

you can control over threads by appling semaphore it will run max 320 threads if needed then wait for complete the earlier threads.

 public class Utitlity
    {
        public static SemaphoreSlim semaphore = new SemaphoreSlim(300, 320);
        public static char[] TokensSeparator = "|,".ToCharArray();
        public async Task Parse(Stream content, Action<Trade> parseCallback)
        {
            await Task.Run(async () =>
            {
                using (var streamReader = new StreamReader(content))
                {
                    string line;
                    while ((line = streamReader.ReadLine()) != null)
                    {
                        if (String.IsNullOrWhiteSpace(line))
                        {
                            continue;
                        }

                        var tokens = line.Split(TokensSeparator);
                        if (!tokens.Any() || tokens.Count() != 6)
                        {
                            continue;
                        }
                        await semaphore.WaitAsync();
                        await Task.Run(() =>
                        {
                            var trade = new Trade
                        {
                            Id = Int32.Parse(tokens[0]),
                            MktPrice = Decimal.Parse(tokens[1], CultureInfo.InvariantCulture),
                            Notional = Decimal.Parse(tokens[2], CultureInfo.InvariantCulture),
                            Quantity = Int64.Parse(tokens[3]),
                            TradeDate = DateTime.Parse(tokens[4], CultureInfo.InvariantCulture),
                            TradeType = tokens[5]
                        };
                            parseCallback(trade);

                        });
                        semaphore.Release();
                    }
                }
            });
        }
    }

    public class Trade
    {
        public int Id { get; set; }
        public decimal MktPrice { get; set; }
        public decimal Notional { get; set; }
        public long Quantity { get; set; }
        public DateTime TradeDate { get; set; }
        public string TradeType { get; set; }


    }
0
votes

2 + 3: If you launch a second thread, and let it create the Tasks, the UI wouldn't get blocked. You don't have to do that, though - your main thread can create a List of Tasks, and await them all (Task.WhenAll). As you said, Tasks creation in a loop is very fast, and the UI will be blocked only for the time it takes to create the Tasks.

EDIT:

I just realize you don't use async at all, which makes my answer irrelevant. Why are not using async to read from disk? You could read bulks of data from the disk asynchronously (that's the must time consuming part of you program, isn't it?) and process them as they arrive.

EDIT2 :

This sounds like a classic producer-consumer scenario (I hope I'm right). Please review the following example: you have one thread (the main thread, thought it doesn't have to be) asynchronously reading lines from the file, and pushing them to a queue. Another thread, the consumer, picks up the lines as the arrive and processing them. I did not test the code, and I don't expect it to work well, it's just an example to start with. Hope it helps.

    class ProducerConsumer
    {
        private BlockingCollection<string> collection;
        ICollection<Thread> consumers;
        string fileName;
        public ProducerConsumer(string fileName)
        {
            this.fileName =  fileName;
            collection = new BlockingCollection<string>();
            consumers = new List<Thread>();
            var consumer = new Thread(() => Consumer());
            consumers.Add(consumer);
            consumer.Start();
        }
        private async Task StartWork()
        {
            using (TextReader reader = File.OpenText(fileName))
            {
                var line = await reader.ReadLineAsync();  
                collection.Add(line);
            }
        }
        private void Consumer()
        {
            while (true /* insert your abort condition here*/)
            {
                try
                {
                    var line = collection.Take();
                    // Do whatever you need with this line. If proccsing this line takes longer then 
                    // fetching the next line (that is - the queue lenght increasing too fast) - you
                    // can always launch an additional consumer thread.
                }
                catch (InvalidOperationException) { }
            }
        }
    }

You can launch a dedicated thread - not the main thread - to be the producer. As a result, it will read the file and add items to the queue as fast as it, and your disk, can. If that's too fast for your consumer - just launch an additional one!

0
votes

Change Parse so that it returns a lazy IEnumerable<string> of lines. In fact you can use the built-in File.EnumerateLines for that and delete most code.

Then, use a PLINQ query:

File.EnumerateLines(path)
.AsParallel()
.Where(x => !String.IsNullOrWhiteSpace(line))
.Select(line => ProcessLine(line);

And that's it. This will run with a saner degree of parallelism. The exact DOP is chosen by the TPL. The algorithm is flaky and you might want to add WithDegreeOfParallelism(ProcessorCount).

You can call parseCallback if you want by appending .ForAll(parseCallback).

Will I be able to interact with console app window during large file processing or it will be blocked?

In order to do that you need to wrap this PLINQ query in Task.Run so that it executes on a background thread.