1
votes

I have around 10 000 000 tasks that each takes from 1-10 seconds to complete. I am running those tasks on a powerful server, using 50 different threads, where each thread picks the first not-done task, runs it, and repeats.

Pseudo-code:

for i = 0 to 50:
    run a new thread:
        while True:
            task = first available task
            if no available tasks: exit thread
            run task

Using this code, I can run all the tasks in parallell on any given number of threads.

In reality, the code uses C#'s Task.WhenAll, and looks like this:

ServicePointManager.DefaultConnectionLimit = threadCount; //Allow more HTTP request simultaneously
var currentIndex = -1;
var threads = new List<Task>(); //List of threads
for (int i = 0; i < threadCount; i++) //Generate the threads
{
    var wc = CreateWebClient();
    threads.Add(Task.Run(() =>
    {
        while (true) //Each thread should loop, picking the first available task, and executing it.
        {
            var index = Interlocked.Increment(ref currentIndex);
            if (index >= tasks.Count) break;
            var task = tasks[index];
            RunTask(conn, wc, task, port);
        }
    }));
}

await Task.WhenAll(threads);

This works just as I wanted it to, but I have a problem: since this code takes a lot of time to run, I want the user to see some progress. The progress is displayed in a colored bitmap (representing a matrix), and also takes some time to generate (a few seconds).

Therefore, I want to generate this visualization on a background thread. But this other background thread is never executed. My suspicion is that it is using the same thread pool as the parallel code, and is therefore enqueued, and will not be executed before the parallel code is actually finished. (And that's a bit too late.)

Here's an example of how I generate the progress visualization:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)
{
    var bitmap = await Task.Run(() => // <<< This task is never executed!
    {
        //bla, bla, various database calls, and generating a relatively large bitmap
    });

    //Convert the bitmap into a WPF image, and update the GUI
    VisualizationImage = BitmapToImageSource(bitmap);
}

So, how could I best solve this problem? I could create a list of Tasks, where each Task represents one of my tasks, and run them with Parallel.Invoke, and pick another Thread pool (I think). But then I have to generate 10 million Task objects, instead of just 50 Task objects, running through my array of stuff to do. That sounds like it uses much more RAM than necessary. Any clever solutions to this?

EDIT: As Panagiotis Kanavos suggested in one of his comments, I tried replacing some of my loop logic with ActionBlock, like this:

// Create an ActionBlock<int> that performs some work. 
var workerBlock = new ActionBlock<ZoneTask>(
t =>
{
    var wc = CreateWebClient(); //This probably generates some unnecessary overhead, but that's a problem I can solve later.
    RunTask(conn, wc, t, port);
},
// Specify a maximum degree of parallelism. 
new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = threadCount
});

foreach (var t in tasks) //Note: the objects in the tasks array are not Task objects
    workerBlock.Post(t);
workerBlock.Complete();

await workerBlock.Completion;

Note: RunTask just executes a web request using the WebClient, and parses the results. It's nothing in there that can create a dead lock.

This seems to work as the old parallelism code, except that it needs a minute or two to do the initial foreach loop to post the tasks. Is this delay really worth it?

Nevertheless, my progress task still seems to be blocked. Ignoring the Progress< T > suggestion for now, since this reduced code still suffers the same problem:

private async void Refresh_Button_Clicked(object sender, RoutedEventArgs e)
{
    Debug.WriteLine("This happens");
    var bitmap = await Task.Run(() =>
    {
        Debug.WriteLine("This does not!");
        //Still doing some work here, so it's not optimized away.
    };

    VisualizationImage = BitmapToImageSource(bitmap);
}

So it still looks like new tasks are not executed as long as the parallell task is running. I even reduced the "MaxDegreeOfParallelism" from 50 to 5 (on a 24 core server) to see if Peter Ritchie's suggestion was right, but no change. Any other suggestions?

ANOTHER EDIT:

The issue seems to have been that I overloaded the thread pool with all my simultaneous blocking I/O calls. I replaced WebClient with HttpClient and its async-functions, and now everything seems to be working nicely.

Thanks to everyone for the great suggestions! Even though not all of them directly solved the problem, I'm sure they all improved my code. :)

1
i think this may give you a starting point stackoverflow.com/questions/548208/…Alex
.NET already has such a mechanism through Progress< T> and IProgress< T> Panagiotis Kanavos
Tasks aren't threads. The TPL itself takes care of using threads to process a task's payload. What does RunTask do and why don't you just use Task.Run on it?Panagiotis Kanavos
TPL has degree of parallelism and doesn't try to do too many things at once (typically only one thing per cpu/core at a time). If you don't have 50 cpus/cores, the TPL is likely throttling how many tasks are running at once.Peter Ritchie
Thank you, Kanavos and Ritchie. I have tried some of your suggestions, see the edit.Erlend D.

1 Answers

2
votes

.NET already provides a mechanism to report progress with the IProgress< T> and the Progress< T> implementation.

The IProgress interface allows clients to publish messages with the Report(T) class without having to worry about threading. The implementation ensures that the messages are processed in the appropriate thread, eg the UI thread. By using the simple IProgress< T> interface the background methods are decoupled from whoever processes the messages.

You can find more information in the Async in 4.5: Enabling Progress and Cancellation in Async APIs article. The cancellation and progress APIs aren't specific to the TPL. They can be used to simplify cancellation and reporting even for raw threads.

Progress< T> processes messages on the thread on which it was created. This can be done either by passing a processing delegate when the class is instantiated, or by subscribing to an event. Copying from the article:

private async void Start_Button_Click(object sender, RoutedEventArgs e)
{
    //construct Progress<T>, passing ReportProgress as the Action<T> 
    var progressIndicator = new Progress<int>(ReportProgress);
    //call async method
    int uploads=await UploadPicturesAsync(GenerateTestImages(), progressIndicator);
}

where ReportProgress is a method that accepts a parameter of int. It could also accept a complex class that reported work done, messages etc.

The asynchronous method only has to use IProgress.Report, eg:

async Task<int> UploadPicturesAsync(List<Image> imageList, IProgress<int> progress)
{
        int totalCount = imageList.Count;
        int processCount = await Task.Run<int>(() =>
        {
            int tempCount = 0;
            foreach (var image in imageList)
            {
                //await the processing and uploading logic here
                int processed = await UploadAndProcessAsync(image);
                if (progress != null)
                {
                    progress.Report((tempCount * 100 / totalCount));
                }
                tempCount++;
            }

            return tempCount;
        });
        return processCount;
}

This decouples the background method from whoever receives and processes the progress messages.