"Parallelism in .NET" Series by Reed Copsey, Jr.

Reed Copsey, Jr. has been writing a great series of articles on parallelism with the .NET Framework 4. The articles provide the insights of an expert developer who has been using parallelism with .NET to speed up real-world programs. Recommended reading.


Parallelism in .NET


Introduction


Part 1, Decomposition 


Part 2, Simple Imperative Data Parallelism


Part 3, Imperative Data Parallelism: Early Termination


Part 4, Imperative Data Parallelism: Aggregation


Part 5, Partitioning of Work


Part 6, Declarative Data Parallelism


Part 7, Some Differences between PLINQ and LINQ to Objects


Part 8, PLINQ’s ForAll Method

 "Parallelism in .NET" Series by Reed Copsey, Jr.

Updated Beta 2 samples for parallel programming

We’ve refreshed our Beta 2 samples for parallel programming with the .NET Framework 4.  Thanks to the gracious assistance of the fabulous Lisa Feigenbaum and others on the Visual Basic team, in this refresh the majority of the samples are now available not only in C# but also in Visual Basic. The samples are available for download at http://code.msdn.microsoft.com/ParExtSamples.


Enjoy!

 Updated Beta 2 samples for parallel programming

The meaning of TaskStatus

Every System.Threading.Tasks.Task instance goes through a lifecycle, and it only makes this journey once.  To provide insight into where in that lifecycle a given Task is, the Task class provides an instance Status property.  That property returns a value from the TaskStatus enumeration that reflects the current point in the lifecycle.

“Current” is the operative word here.  Many of the states are transient, meaning that we’re dealing with a concurrent system, and by the time you’ve received the value of the status, the status may have changed.  However, the lifecycle does not contain cycles (as ironic as that may sound), meaning that there’s a directionality to the various states, and once a given state has been reached, the Task won’t return back to that state again or to any that came before it.  Additionally, there are three “final” states, where once in those states, the Task is no longer executing and will no longer change state.

Here are the states and an informal description of what they imply:

TaskStatus Enumeration Value Informal Description
Created This is the starting state for tasks created through a Task constructor.  Tasks in this state will not leave the state until Start or RunSynchronously is called on the instance or until the Task is canceled.
WaitingForActivation This is the starting state for tasks created through methods like ContinueWith, ContinueWhenAll, ContinueWhenAny, and FromAsync, as well as from a TaskCompletionSource<TResult>.  The task isn’t currently scheduled, and won’t be until some dependent operation has completed (some tasks may never be scheduled, such as those created by a TaskCompletionSource<TResult> that have nothing relevant to be scheduled).  For example, a task created by ContinueWith won’t be scheduled until the antecedent task (the one off of which ContinueWith was called) completes execution.
WaitingToRun The task has been scheduled to a TaskScheduler and is waiting to be picked up by that scheduler and run.  This is the starting state for tasks created through TaskFactory.StartNew; by the time the Task is returned from StartNew, it will already have been scheduled, and thus the state will be at least WaitingToRun (“at least”, since by the time StartNew returns, the Task could of course have already started or even completed executing).
Running The Task is currently executing.
WaitingForChildrenToComplete A Task isn’t considered complete until its attached children have completed.  If a Task has finished executing its code body, it will leave the Running state, and if it’s then implicitly waiting for its children to complete, it will enter this state.
RanToCompletion One of the three final states.  A Task in this state has successfully completed execution, running to the end of its body without cancellation and without throwing an unhandled exception.
Canceled One of the three final states.  A Task in this state completed execution, but it did so through cancellation.  To end in the Canceled state, a Task must either have cancellation requested prior to starting execution, or it must acknowledge a cancellation request during its execution.
Faulted One of the three final states.  A Task in this state completed execution due to an unhandled exception in its body or due to one of its attached children completing in this state.

Several of these status values are also communicated through helper properties on the Task class.  For a Task t:

  • t.IsCanceled returns true if and only if t.Status equals TaskStatus.Canceled. 
  • t.IsFaulted returns true if and only if t.Status equals TaskStatus.Faulted.  Additionally, t.Exception will be non-null if and only if t.IsFaulted is true.
  • t.IsCompleted returns true if and only if t.Status equals TaskStatus.RanToCompletion, TaskStatus.Canceled, or TaskStatus.Faulted (i.e. one of the three final states). Note that IsCompleted does not just correspond to RanToCompletion, but rather to all final states.

Several operations in the Task Parallel Library are tied to the concept of a final state, or more generally to IsCompleted.  For example, a call to the Wait method on a Task will not return until that Task’s IsCompleted is true.  And the continuations for a Task will not fire until that same condition is met.

 The meaning of TaskStatus

TaskCreationOptions.PreferFairness

One of the ways in which the Task Parallel Library achieves good performance is through “work-stealing”.  Work-stealing is supported in the .NET 4 ThreadPool for access through the Task Parallel Library and its default scheduler.  This manifests as every thread in the ThreadPool having its own queue for work; when that thread creates Tasks, by default these tasks are enqueued into the thread’s local queue, rather than into the global queue that a call to ThreadPool.QueueUserWorkItem would typically target.  When a thread goes in search for work to be executed, it starts with its local queue, an operation which enable some extra efficiencies due to improved cache locality, minimized contention, and so forth.  However, this logic also affects fairness.

A typical thread pool will have a single queue that maintains all of the work to be executed.  When threads in the pool are ready to process another work item, they’ll dequeue the work from the head of the queue, and when new work arrives to be executed by the pool, it’ll be enqueued onto the queue’s tail.  This provides a level of fairness between work items, in that work items that arrive first are more likely to be picked off and start executing first.

Work-stealing perturbs that fairness.  Threads external to the pool could be queuing up work, but if the threads in the pool are also generating work, the work generated by the pool will be preferred over the other work items, based on the search order employed by the threads in the pool (which start searching for work first with their local queues, only proceeding on to the global queue and then on to other threads’ queues if work wasn’t available locally).  This behavior is typically expected and even desired, in that if a work item being executed is generating more work, that generated work is typically considered part of the overall operation being processed, and thus it makes sense it would be preferred over other unrelated work.  For example, imagine a quicksort operation, where each recursive sort invocation potentially results in several further recursive calls; those calls, which in a parallel implementation are likely individual tasks, are part of the all-up sort operation.

Still, there are some scenarios where this default behavior is inappropriate, where fairness should be maintained between particular work items generated by threads in the pool and work items generated by other threads.  This is often the case for long chains of continuations, where the work generated isn’t considered part of the current work but rather a follow-on to the current work.  In those cases, you may want that follow-on work to be prioritized in a fair manner with other work in the system.  This is where TaskCreationOptions.PreferFairness can prove useful.

When a Task is scheduled to the default scheduler, the scheduler looks to see whether the current thread from which the Task is being queued is a ThreadPool thread that has its own local queue.  If it isn’t, the work item will be queued to the global queue.  If it is, the scheduler will also check whether the TaskCreationOptions value for the Task includes the PreferFairness flag, which is not on by default.  If the flag is set, even if the thread does have its own local queue, the scheduler will still enqueue the Task to the global queue rather than to the local queue.  In this manner, that Task will be considered fairly along with all other work items queued globally.

What was just described is the current implementation of the PreferFairness flag in the default scheduler.  The implementation could of course change, but what won’t change is the purpose of the flag: by specifying PreferFairness, you’re telling the system that this Task shouldn’t be prioritized just because it’s coming from a local queue.  You’re telling the system that you want the system to do its best to ensure that this Task is prioritized in a first-come, first-serve nature.

One other thing to note is that Task itself knows nothing of this flag; it’s just a flag set as an option on the Task.  It’s the scheduler that decides how it wants to handle this particular option, just as with TaskCreationOptions.LongRunning.  The default scheduler handles it as described above, but another scheduler (such as one you write) may do with this flag whatever it wants, including ignoring it.  Hence the naming “Prefer” rather than something stricter like “Guarantee”.

 TaskCreationOptions.PreferFairness

Asynchronous methods, C# iterators, and Tasks

More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.

Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:

static void CopyStreamToStream(Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data. If we weren’t able to read any, bail.
        // Otherwise, write it out and start over again.
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.

One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):

static void CopyStreamToStreamAsync(
    Stream input, Stream output, Action<Exception> completed)
{
    // Buffer space for the data to be read and written
    byte[] buffer = new byte[0x2000];

    // The read/write loop. The parameter is the IAsyncResult of the
    // last read operation that still need to be completed with a call
    // to EndRead. If the parameter is null, that means a new read needs
    // to be started.
    Action<IAsyncResult> readWriteLoop = null;
    readWriteLoop = iar =>
    {
        try
        {
            // Determine whether to start with a BeginRead or 
            // an EndRead/BeginWrite, based on whether iar is null.
            // Then, as long as the loop continues, alternate between
            // reading and writing.
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                switch (isRead)
                {
                    // Do BeginRead(…)
                    case true:
                        // Start the asynchronous read
                        iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
                        {
                            // If the read completed synchronously, immediately
                            // return from the callback, as the processing of the
                            // read will be handled synchronously
                            // from the same thread that called BeginRead
                            if (readResult.CompletedSynchronously) return;

                            // The read completed asynchronously, so we need 
                            // to run the processing loop,
                            // starting with EndRead/BeginWrite.
                            readWriteLoop(readResult);
                        }, null);

                        // If the read is completing asynchronously, bail, as
                        // there’s nothing more to do.
                        // If it completed synchronously, loop around to do
                        // the EndRead/BeginWrite synchronously.
                        if (!iar.CompletedSynchronously) return;
                        break;

                   // Do BeginWrite(…)
                   case false:
                       // Complete the previous read. If there’s no more data
                       // to be read/written, bail.
                       int numRead = input.EndRead(iar);
                       if (numRead == 0)
                       {
                           completed(null);
                           return;
                       }

                       // Now that we know how much data was read, write it
                       // out asynchronously
                       iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
                       {
                           // If the write completed synchronously, allow
                           // the thread that called BeginWrite
                           // to handle it.
                           if (writeResult.CompletedSynchronously) return;

                           // Otherwise, complete the asynchronous write
                           // and launch the read/write loop
                           // to continue all over again.
                           output.EndWrite(writeResult);
                           readWriteLoop(null);
                       }, null);

                       // If the write is completing asynchronously, bail,
                       // as there’s nothing more to do.
                       // Otherwise, complete the write synchronously and
                       // loop around.
                       if (!iar.CompletedSynchronously) return;
                       output.EndWrite(iar);
                       break;
                   }
               }
           } catch(Exception e) { completed(e); }
    };

    // Start the whole process off with a read.
    readWriteLoop(null);
}

The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:

static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        int numRead = input.Read(buffer, 0, buffer.Length);
        if (numRead == 0) break;
        output.Write(buffer, 0, numRead);
    }
}

That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.

As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.

The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:

IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
    …
    yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
    … // code here executes when the 
        // yielded ThingThatHasCallbackWhenCompletes completes
}

The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.

Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    // Buffer space for the data to be read and written
    byte [] buffer = new byte[0x2000];

    // While there’s data to be read and written
    while(true)
    {
        // Read data asynchronously. When the operation completes,
        // if no data could be read, we’re done.
        var read = Task<int>.Factory.FromAsync(
            input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
            TaskCreationOptions.DetachedFromParent);
        yield return read;
        if (read.Result == 0) break;

        // Write the data asynchronously
        yield return Task.Factory.FromAsync(
            output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
            TaskCreationOptions.DetachedFromParent);
    }
}

Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples):

public static Task<int> ReadTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

public static Task WriteTask(this Stream stream,
    byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
        buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}

This then enables the previous code to be simplified to:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte [] buffer = new byte[0x2000];
    while(true)
    {
        var read = input.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        yield return output.WriteTask(buffer, 0, read.Result);
    }
}

That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.

Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):

public static Task Iterate(this TaskFactory factory,
    IEnumerable<Task> asyncIterator)
{
    // Validate parameters
    if (factory == null) throw new ArgumentNullException("factory");
    if (asyncIterator == null)
        throw new ArgumentNullException("asyncIterator");

    // Get the scheduler to use, either the one provided by the factory
    // or the current one if the factory didn’t have one specified.
    var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;

    // Get an enumerator from the enumerable
    var enumerator = asyncIterator.GetEnumerator();
    if (enumerator == null) throw new InvalidOperationException();

    // Create the task to be returned to the caller. And ensure
    // that when everything is done, the enumerator is cleaned up.
    var trs = new TaskCompletionSource<object>(factory.CreationOptions);
    trs.Task.ContinueWith(_ => enumerator.Dispose(),
        TaskContinuationOptions.DetachedFromParent, scheduler);

    // This will be called every time more work can be done.
    Action<Task> recursiveBody = null;
    recursiveBody = antecedent =>
    {
        try
        {
            // If the previous task completed with any exceptions, bail
            if (antecedent != null && antecedent.IsFaulted)
                trs.TrySetException(antecedent.Exception);

            // If the user requested cancellation, bail.
            else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();

            // If we should continue iterating and there’s more to iterate
            // over, create a continuation to continue processing. We only
            // want to continue processing once the current Task (as yielded
            // from the enumerator) is complete.
            else if (enumerator.MoveNext())
                enumerator.Current.ContinueWith(recursiveBody,
                    TaskContinuationOptions.DetachedFromParent, scheduler).
                        IgnoreExceptions();

            // Otherwise, we’re done!
            else trs.TrySetResult(null);
        }
        // If MoveNext throws an exception, propagate that to the user
        catch (Exception exc) { trs.TrySetException(exc); }
    };

    // Get things started by launching the first task
    factory.StartNew(() => recursiveBody(null),
        TaskCreationOptions.DetachedFromParent, scheduler).
            IgnoreExceptions();

    // Return the representative task to the user
    return trs.Task;
}

Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:

var asyncOperation = Task.Factory.Iterate(
    CopyStreamToStreamAsync(input, output));

Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.

On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:

static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
    byte [] buffer = new byte[0x2000];
    Encoding enc = new UTF8Encoder();
    while(true)
    {
        var read = stream.ReadTask(buffer, 0, buffer.Length);
        yield return read;
        if (read.Result == 0) break;
        myTextBox.Text += enc.GetString(buffer, 0, read.Result)
    }
}

As previously shown, I could invoke this method as follows:

public void button1_Click(object sender, EventArgs e)
{
    Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}

However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:

public void button1_Click(object sender, EventArgs e)
{
    var uiFactory = new TaskFactory(
        TaskScheduler.FromCurrentSynchronizationContext());
    uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}

Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:

static IEnumerable<Task> CopyStreamToStreamAsync(
    Stream input, Stream output)
{
    byte[][] buffers = new byte[2][] { 
        new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
    int filledBufferNum = 0;
    Task writeTask = null;

    while (true)
    {
        var readTask = input.ReadTask(
            buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
        yield return writeTask == null ?
            readTask :
            Task.Factory.ContinueWhenAll(new[] { readTask, writeTask },
                tasks => Task.WaitAll(tasks), // to propagate exceptions
                TaskContinuationOptions.DetachedFromParent);
        if (readTask.Result == 0) break;

        writeTask = output.WriteTask(
            buffers[filledBufferNum], 0, readTask.Result);
        filledBufferNum = filledBufferNum == 0 ? 1 : 0;
    }
}

Here, ContinueWhenAll is used to ensure that the iterator isn’t re-entered until both any pending writes and reads have completed.

There are of course many variations to this code that you could implement. At the end of the day, I find it quite interesting to see how the Task primitive can be used to enable such scenarios.

 Asynchronous methods, C# iterators, and Tasks

Parallel For Loops over Non-Integral Types

In a previous post, it was demonstrated how for loops with very small loop bodies could be parallelized by creating an iterator over ranges, and then using Parallel.ForEach over those ranges.  A similar technique can be used to write parallel loops over iteration spaces of non-integers.  For example, let’s say I wanted to parallelize the following loop, where the iteration range is based on doubles:



for(double d = 0.0; d < 1.0; d += .001)
{
    Process(d);
}


Parallel.For only contains overloads for where the iteration variable is an Int32 or an Int64.  To accomodate doubles, one approach would be to translate the range into an integer-based range in order to use Parallel.For, and then within the body of the loop translate it into a double.  As an example, the previously shown loop could be rewritten as:



Parallel.For(0, 1000, i =>
{
    double d = i / 1000.0;
    Process(d);
});


Due to floating point arithmetic, this may not be exactly the same, but it may be close enough.  Another approach is to implement an iterator like the following:



private static IEnumerable<double> Iterate(
    double fromInclusive, double toExclusive, double step)
{
    for(double d = fromInclusive; d < toExclusive; d += step) yield return d;
}


With that Iterate method, now I can parallelize the sequential loop using Parallel.ForEach:



Parallel.ForEach(Iterate(0.0, 1.0, .001), d =>
{
    Process(d);
});


This same technique can be applied to a wide variety of scenarios.  Keep in mind, however, that the IEnumerator<T> interface isn’t thread-safe, which means that Parallel.ForEach needs to take locks when accessing the data source. While ForEach internally uses some smarts to try to ammortize the cost of such locks over the processing, this is still overhead that needs to be overcome by more work in the body of the ForEach in order for good speedups to be achieved. 


Parallel.ForEach has optimizations used when working on indexible data sources, such as lists and arrays, and in those cases the need for locking is decreased.  Thus, performance may actually be improved in some cases by transforming the iteration space into a list or an array, which can be done using LINQ, even though there is both time and memory cost associated with creating an array from an enumerable. For example:



Parallel.ForEach(Iterate(0.0, 1.0, .001).ToArray(), d =>
{
    Process(d);
});


Happy coding.

 Parallel For Loops over Non Integral Types

Tasks and the Event-based Asynchronous Pattern

As has been discussed previously, one of the new features in the Task Parallel Library is TaskCompletionSource<TResult>, which enables the creation of a Task<TResult> that represents any other asynchronous operation. 

There are a wide variety of sources in the .NET Framework for asynchronous work.  One comes from components that implement the Asynchronous Programming Model (APM) pattern, which we discussed here.  Another includes types that implement the Event-based Asynchronous Pattern (EAP).  For some synchronous method Xyz, the EAP provides an asynchronous counterpart XyzAsync.  Calling this method launches the asynchronous work, and when the work completes, a corresponding XyzCompleted event is raised. (That’s an oversimplification of the pattern, but it provides a grounding.)

For many situations, the EAP is quite straightforward and simple to use.  However, there are cases where you’d like to be able to do things like join across multiple EAP asynchronous invocations, such as to download three different web pages asynchronously, and only when all three have completed do something else.  Tasks in .NET 4.0 make this kind of operation easy, through Task.Factory.ContinueWhenAll (or ContinueWhenAny if you want to do something when any one of the items completes rather than when all of them do).  However, in order to use these methods, you need Tasks, thus to use them with components that implement the EAP, you need to create Tasks from EAP.  TaskCompletionSource<TResult> can be used to do exactly that.

First, let’s create two small helper functions that we can reuse over and over for multiple EAP-to-Task implementations:

private static TaskCompletionSource<T> CreateSource<T>(object state)
{
    return new TaskCompletionSource<T>(
        state, TaskCreationOptions.DetachedFromParent);
}

private static void TransferCompletion<T>(
    TaskCompletionSource<T> tcs, AsyncCompletedEventArgs e,
    Func<T> getResult)
{
    if (e.UserState == tcs)
    {
        if (e.Cancelled) tcs.TrySetCanceled();
        else if (e.Error != null) tcs.TrySetException(e.Error);
        else tcs.TrySetResult(getResult());
    }
}

The first helper, CreateSource, simply creates an instance of TaskCompletionSource<T> with the right type and settings.  I’ve included TaskCreationOptions.DetachedFromParent under the assumption that tasks created for the EAP pattern aren’t necessarily meant to participate in parent/child relationships, but you could certainly change this if your scenarios needed that functionality.

The second helper, TransferCompletion, will be used whenever an EAP event signals completion of an asynchronous operation.  It takes the AsyncCompletedEventArgs provided through the XyzCompleted event and uses it to determine whether the operation completed due to cancellation or due to an unhandled exception.  Both of those pieces of data are standard to the base AsyncCompletedEventArgs class. However, each EAP operation typically comes with its own type derived from AsyncCompletedEventArgs: to be able to use this one helper function with any of those to mine the result of the operation, TransferCompletion also accepts a Func<T> that will return the results from the derived instance. 

With those two helpers, writing a Task wrapper for an EAP operation is a cinch.  Consider System.Net.WebClient, which provides support for downloading and uploading to and from URIs, with methods like DownloadData.  We can write a Task-based wrapper for DownloadData in just a few lines (in this case, as an extension method):

public static Task<byte[]> DownloadDataTask(
    this WebClient webClient, Uri address)
{
    var tcs = CreateSource<byte[]>(address);
    webClient.DownloadDataCompleted +=
        (sender, e) => TransferCompletion(tcs, e, () => e.Result);
    webClient.DownloadDataAsync(address, tcs);
    return tcs.Task;
}

We first create a TaskCompletionSource<byte[]> whose Task<byte[]> will be returned from the method.  We then register with the DownloadDataCompleted event handler, such that when the event is raised, the downloaded data (or exception or cancellation information) will be transferred to the returned Task.  And then we start the operation.  Piece of cake.

One interesting thing to note about the EAP is that some implementations support multiple asynchronous operations concurrently on the same instance.  In such cases, a user-supplied token is needed to correlate the asynchronous invocations to the asynchronous completions (something that’s provided in Task implicitly by having a Task reference returned from and serving as a reference for an asynchronous operation).  For that purpose, we pass the created TaskCompletionSource<TResult> as the user-supplied token, and in TransferCompletion, we only transfer the results if the token received matches the target completion source.  If it doesn’t match, this is an event completion for another operation and should be ignored.

One potential issue with the previously shown implementation (and with the EAP pattern in general) is that, if they’re used over and over, the XyzCompleted event handlers will start to build up.  Delegates are being registered with the event but not released.  To fix that, we can also remove the event handler in the delegate handling the event.  For example, DownloadDataTask can be rewritten as:

public static Task<byte[]> DownloadDataTask(
    this WebClient webClient, Uri address)
{
    var tcs = CreateSource<byte[]>(address);

    DownloadDataCompletedEventHandler handler = null;
    handler = (sender, e) => {
        TransferCompletionToTask(tcs, e, () => e.Result);
        webClient.DownloadDataCompleted -= handler;
    };

    webClient.DownloadDataCompleted += handler;
    try { webClient.DownloadDataAsync(address, tcs); }
    catch {
        webClient.DownloadDataCompleted -= handler;
        tcs.TrySetCanceled();
        throw;
    }

    return tcs.Task;
}

The Beta 1 samples available at http://code.msdn.microsoft.com/ParExtSamples already include Task-based extensions for WebClient, as well as extensions for other EAP implementations like SmtpClient and Ping.  Download and enjoy!

 Tasks and the Event based Asynchronous Pattern

How PLINQ processes an IEnumerable<T> on multiple cores

As Ed Essey explained in Partitioning in PLINQ, partitioning is an important step in PLINQ execution. Partitioning splits up a single input sequence into multiple sequences that can be processed in parallel. This post further explains chunk partitioning, the most general partitioning scheme that works on any IEnumerable<T>.


Chunk partitioning appears in two places in Parallel Extensions. First, it is one of the algorithms that PLINQ uses under the hood to execute queries in parallel. Second, chunk partitioning is available as a standalone algorithm through the Partitioner.Create() method.


To explain the design of the chunk partitioning algorithm, let’s walk through the possible ways of processing an IEnumerable<T> with multiple worker threads, finally arriving at the solution used in PLINQ (approach 4).



Approach 1: Load the input sequence into an intermediate array


As a simple solution, we could walk over the input sequence and store all elements into an array. Then, we can split up the array into ranges, and assign each range to a different worker.


The disadvantage of this approach is that we need to allocate an array large enough to store all input elements. If the input sequence is long, this will algorithm leads to unnecessarily large memory consumption. Also, we need to wait until the entire input sequence is ready before the workers can start executing.



Approach 2: Hand out elements to threads on demand


An entirely different approach is to have all worker threads share one input enumerator. When a worker is ready to process the next input element, it takes a shared lock, gets the next element from the input enumerator, and releases the lock.


This algorithm has a fairly large overhead because processing every element requires locking. Also, handing out elements individually is prone to poor cache behavior.


This approach does have an interesting advantage over Approach 1, though: since workers receive data on demand, the workers that finish faster will come back to request more work. In contrast, Approach 1 splits up all work ahead of time, and a worker that is done early simply goes away.



Approach 3: Hand out elements in chunks


To mitigate the two drawbacks of Approach 2 (synchronization cost and cache behavior), we can hand out elements to threads in “chunks”. When a thread is ready to process more inputs, it will take say 64 elements from the input enumerator.


Unfortunately, while this approach nicely amortizes the synchronization cost over multiple elements, it does not work well for short inputs. For example, if the input contains 50 elements and the chunk size is 64, all inputs will go into a single partition. Even if the work per element is large, we will not be able to benefit from parallelism, since one worker gets all the work.


And since IEnumerable<T> in general does not declare its length, we cannot simply tune the chunk size based on the input sequence length.



Approach 4: Hand out elements in chunks of increasing size


A solution to the problem with small inputs is to use chunks of a growing size. The first chunk assigned to each thread is of size 1 and subsequent chunks are gradually larger, until a specific threshold is reached.


Our solution doubles the chunk size every few chunks. So, each thread first receives a few chunks of size 1, then a few chunks of size 2, then 4, and so forth. Once the chunk size reaches a certain threshold, it remains constant.


This chunking strategy ensures that if the input is short, it will still get split up fairly among the cores. But, the chunk size also grows fairly quickly, and the per-chunk overheads are small for large inputs. Also, the algorithm is quite good at load-balancing, so if one worker is taking longer to process its inputs, other workers will process more elements to decrease the overall processing time.


One interesting consequence of the chunk partitioning algorithm is that multiple threads will call MoveNext() on the input enumerator. The worker threads will use a lock to ensure mutual exclusion, but the enumerator must not assume that MoveNext() will be called from a particular thread (e.g., it should not use thread-local storage, manipulate UI, etc).


The current implementation of both PLINQ chunk partitioning and Partitioner.Create() follows approach 4 fairly closely. Now you know how it behaves and why!

 How PLINQ processes an IEnumerable<T> on multiple cores

Tasks and the APM Pattern

The Asynchronous Programming Model (APM) in the .NET Framework has been around since .NET 1.0 and is the most common pattern for asynchrony in the Framework.  Even if you’re not familiar with the name, you’re likely familiar with the core of the pattern.  For a given synchronous operation Xyz, the asynchronous version manifests as BeginXyz and EndXyz:  BeginXyz starts the operation and the EndXyz method joins with it (completes it).  There are several mechanisms by which the results can be joined with, such as by polling for completion using the IsCompleted property on the IAsyncResult returned from BeginXyz, blocking until the operation has completed by waiting on the IAsyncResult’s AsyncWaitHandle, simply calling EndXyz and passing it the IAsyncResult (which will block until the operation has completed), or passing to BeginXyz a callback which will be invoked when the operation has completed: that callback should then call EndXyz to retrieve the results.

This pattern is so common that we’ve opted to incorporate it as a first-class citizen into the Task Parallel Library.  One way we’ve done this is by having the Task class itself implement IAsyncResult: this means that Task can be used as the core of a Begin/End implementation, easing the implementation for common scenarios.  One feature new to .NET 4 Beta 1 since previous CTPs, however, is that we now support the inverse, easy creation of tasks from an implementation of the APM pattern.  This new functionality shows up through the FromAsync methods on TaskFactory and TaskFactory<TResult>.

Under the covers, FromAsync utilizes the TaskCompletionSource<TResult> type. Let’s say you wanted to create a Task that represents an asynchronous read on a Stream.  You could write something like the following:

public static Task<int> ReadTask(this Stream stream,
    byte [] buffer, int offset, int count, object state)
{
    var tcs = new TaskCompletionSource<int>();
    stream.BeginRead(buffer, offset, count, ar =>
    {
        try { tcs.SetResult(stream.EndRead(ar)); }
        catch(Exception exc) { tcs.SetException(exc); }
    }, state);
    return tcs.Task;
}

You could of course write a similar wrapper for every Begin/End pair you wanted to utilize, but that could get tedious and error-prone.  To solve that, we’ve provided the FromAsync method, which takes advantage of the common structure of the APM pattern, along with delegate inference, to provide generic overloads that work with most APM implementations. 

For example, let’s say I have a Stream and a byte buffer, and I want to read from that stream into the buffer.  Synchronously, I could do something like:

int bytesRead = stream.Read(buffer, 0, buffer.Length);

Creating a Task that does the same thing asynchronously:

Task<int> bytesRead = Task<int>.Factory.FromAsync(
    stream.BeginRead, stream.EndRead, buffer, 0, buffer.Length, null);

Under the covers, we follow a pattern very similar to the one shown earlier as a specific implementation for Stream.Read. Combine this support with the ability to do Task.WaitAll, Task.WaitAny, Task.Factory.ContinueWhenAll, and Task.Factory.ContinueWhenAny, and you can achieve some very useful coordination functionality in very little code.

Of course, we don’t have any magic at our disposal; all of this functionality is available through standard .NET libraries.  That means that we achieve the above with a method with the following signature:

public Task<TResult> FromAsync<TArg1, TArg2, TArg3>(
    Func<TArg1, TArg2, TArg3, AsyncCallback,
        object, IAsyncResult> beginMethod,
    Func<IAsyncResult, TResult> endMethod,
    TArg1 arg1, TArg2 arg2, TArg3 arg3,
    object state);

You’ll notice then that this overload is coded specifically for APM implementations that take three input parameters (of types TArg1, TArg2, and TArg3).  The vast majority of the APM implementations in the .NET Framework take three or less input parameters, with only a trickling accepting more than that.  As such, we’ve included overloads in the Task Parallel Library that follow this pattern for up to and including three parameters.  Of course, as with all corner cases, there will certainly be scenarios that require usage with more than three parameters, or with slightly different forms.  For those cases, we’ve also added overloads that accept an IAsyncResult as the first parameter, rather than accepting a beginMethod delegate.  This way, you can pass in any IAsyncResult you want, and we’ll call back to the endMethod when we find that the IAsyncResult has completed.  This approach typically isn’t as efficient as the beginMethod approach, but it can be quite handy in a pinch.

 Tasks and the APM Pattern