09.02.2010 at
2:14 pm · Saved under
.NET Help
The aim of this post is to help developers writing applications in which operations may need to be performed and then later undone due to a subsequent failure. It shows a pattern for how to maintain such a consistent application state by utilizing functionality from the Task Parallel Library (TPL) in the .NET Framework 4.
For the purposes of this blog post, a program/routine is a state machine where each state is equally valid and important and should be handled correctly regardless of whether it belongs to a happy path or not. Specifically, a step that rolls back an incomplete state transition is equally important as a step that makes a forward state transition. A lot could be written about the advantages and disadvantages of exceptions vs. error codes, and such a discussion is not the goal of this post: for the purpose of this article, it is only worth mentioning that throwing and catching exceptions is focused on the happy path (it keeps the code clean and easy to comprehend), while returning and checking error codes is focused on detecting the places of the code (program states) where deviations from the happy path may occur.
In general, a routine that consists of n atomic forward steps, where each forward step may fail and then needs to be undone, could be modeled as the following state machine:

Assuming a step is atomic means we don’t have to undo that particular step if it fails. That is why the undo sequences in the diagram start with the last successful step.
Note: It is also assumed that undoing may not fail. If failures are possible to occur during rollback, then you should strongly consider using a real transactional resource manager like a database server.
Now that we’ve stated the problem, what is the best way to approach it? Obviously, there are solutions through either error checking or through exception handling, but both of those come at the expense of convoluting the code. Notice that the number of state transition paths is about three times the number of forward steps. Ideally, we are looking for a solution that has a clean happy path as well as clean undo paths without any if statements or separate Exception types for each forward step. In other words, we are looking for a way to describe a program as a state machine.
That’s exactly what the TPL does through tasks (as states) and continuations (as state transitions).
Let’s take a concrete application as an example and write some real code. Let’s say we have a system that manages the process of making a sandwich. In a terminal state our resources, bread and ham, are in the fridge. (For the sake of simplicity, we produce one sandwich at a time.) First, we take a slice of bread out of the fridge. Second, we take a slice of ham out of the fridge. Third, we assemble the bread and the ham into a sandwich. If an operation fails, we have to return whatever ingredients are currently on the line back to the fridge so they don’t rot:
And here is the code of our routine:
// HAPPY PATH
Task retrieveBread = Task.Factory.StartNew(RetrieveBreadFromFridge);
Task retrieveHam = retrieveBread.ContinueWith(_ => RetrieveHamFromFridge(),
TaskContinuationOptions.OnlyOnRanToCompletion);
Task assembleSandwich = retrieveHam.ContinueWith(_ => AssembleSandwich(),
TaskContinuationOptions.OnlyOnRanToCompletion);
// RESET STATE
TaskCompletionSource<bool> reset = new TaskCompletionSource<bool>();
assembleSandwich.ContinueWith(_ => reset.SetCanceled(),
TaskContinuationOptions.OnlyOnRanToCompletion);
// UNDO PATH
Task returnBread = new Task(ReturnBreadToFridge);
returnBread.ContinueWith(_ => reset.SetResult(true),
TaskContinuationOptions.OnlyOnRanToCompletion);
Task returnHam = new Task(ReturnHamToFridge);
returnHam.ContinueWith(_ => returnBread.Start(),
TaskContinuationOptions.OnlyOnRanToCompletion);
// HAPPY PATH -> UNDO PATH
// On a failure in bread retrieval – signal “reset”.
retrieveBread.ContinueWith(_ => reset.SetResult(true),
TaskContinuationOptions.OnlyOnFaulted);
// On a failure in ham retrieval – return bread
retrieveHam.ContinueWith(_ => returnBread.Start(),
TaskContinuationOptions.OnlyOnFaulted);
// On a failure in sandwich assembly- return ham and bread
assembleSandwich.ContinueWith(_ => returnHam.Start(),
TaskContinuationOptions.OnlyOnFaulted);
// WAIT
// Log the execution of the HAPPY PATH tasks.
// Additionally, wait on the reset state to get signaled.
Task[] loggedTasks = new Task[]
{retrieveBread, retrieveHam, assembleSandwich, reset.Task};
Task.Factory.ContinueWhenAll(loggedTasks,LogErrors)
.Wait();
As you can see, there are no if statements, nor are there any Exception types customized to provide information what step failed.
Finally, I added a logging task that effectively waits for all the state machine tasks to finish, and then traverses the given task array and logs any error messages. (Notice that I still have to wait for that task to finish.) The body of that method is straightforward:
private void LogErrors(Task[] tasks)
{
Console.WriteLine(“ntERRORS:”);
foreach (Task task in tasks)
{
if (task.IsFaulted)
{
// Use the InnerException since it is wrapped in an AggregateException
Console.WriteLine(“t{0}”, task.Exception.InnerException.Message);
}
}
}
In conclusion, TPL enables writing state-sensitive applications on the .NET platform without convoluting the application code with many if statements or customized exception types. While the code is still a little verbose, it closely resembles the state machine’s diagram. The attached zip file contains the complete C# project. Feel free to download it and play with it. Your feedback is welcome.
Tagged: -net-4 • code-samples • default-aspx • post • wikipedia
08.02.2010 at
10:40 am · Saved under
.NET Help
Igor Ostrovsky is a developer on the Parallel Extensions team. On his blog, he’s documented a great set of examples for how caches can effect application performance; this is important to think through when writing parallel applications, but as Igor demonstrates, it applies equally to serial applications. Check out his post.
Tagged: media • post • silverlight
30.08.2009 at
2:35 am · Saved under
.NET Help
Every System.Threading.Tasks.Task instance goes through a lifecycle, and it only makes this journey once. To provide insight into where in that lifecycle a given Task is, the Task class provides an instance Status property. That property returns a value from the TaskStatus enumeration that reflects the current point in the lifecycle.
“Current” is the operative word here. Many of the states are transient, meaning that we’re dealing with a concurrent system, and by the time you’ve received the value of the status, the status may have changed. However, the lifecycle does not contain cycles (as ironic as that may sound), meaning that there’s a directionality to the various states, and once a given state has been reached, the Task won’t return back to that state again or to any that came before it. Additionally, there are three “final” states, where once in those states, the Task is no longer executing and will no longer change state.
Here are the states and an informal description of what they imply:
| TaskStatus Enumeration Value |
Informal Description |
| Created |
This is the starting state for tasks created through a Task constructor. Tasks in this state will not leave the state until Start or RunSynchronously is called on the instance or until the Task is canceled. |
| WaitingForActivation |
This is the starting state for tasks created through methods like ContinueWith, ContinueWhenAll, ContinueWhenAny, and FromAsync, as well as from a TaskCompletionSource<TResult>. The task isn’t currently scheduled, and won’t be until some dependent operation has completed (some tasks may never be scheduled, such as those created by a TaskCompletionSource<TResult> that have nothing relevant to be scheduled). For example, a task created by ContinueWith won’t be scheduled until the antecedent task (the one off of which ContinueWith was called) completes execution. |
| WaitingToRun |
The task has been scheduled to a TaskScheduler and is waiting to be picked up by that scheduler and run. This is the starting state for tasks created through TaskFactory.StartNew; by the time the Task is returned from StartNew, it will already have been scheduled, and thus the state will be at least WaitingToRun (“at least”, since by the time StartNew returns, the Task could of course have already started or even completed executing). |
| Running |
The Task is currently executing. |
| WaitingForChildrenToComplete |
A Task isn’t considered complete until its attached children have completed. If a Task has finished executing its code body, it will leave the Running state, and if it’s then implicitly waiting for its children to complete, it will enter this state. |
| RanToCompletion |
One of the three final states. A Task in this state has successfully completed execution, running to the end of its body without cancellation and without throwing an unhandled exception. |
| Canceled |
One of the three final states. A Task in this state completed execution, but it did so through cancellation. To end in the Canceled state, a Task must either have cancellation requested prior to starting execution, or it must acknowledge a cancellation request during its execution. |
| Faulted |
One of the three final states. A Task in this state completed execution due to an unhandled exception in its body or due to one of its attached children completing in this state. |
Several of these status values are also communicated through helper properties on the Task class. For a Task t:
- t.IsCanceled returns true if and only if t.Status equals TaskStatus.Canceled.
- t.IsFaulted returns true if and only if t.Status equals TaskStatus.Faulted. Additionally, t.Exception will be non-null if and only if t.IsFaulted is true.
- t.IsCompleted returns true if and only if t.Status equals TaskStatus.RanToCompletion, TaskStatus.Canceled, or TaskStatus.Faulted (i.e. one of the three final states). Note that IsCompleted does not just correspond to RanToCompletion, but rather to all final states.
Several operations in the Task Parallel Library are tied to the concept of a final state, or more generally to IsCompleted. For example, a call to the Wait method on a Task will not return until that Task’s IsCompleted is true. And the continuations for a Task will not fire until that same condition is met.
![The meaning of TaskStatus The meaning of TaskStatus]()
Tagged: -net-4-0 • contests • default-aspx • media • microsoft • parallel • post • running • search • silverlight • task-parallel-library • three • tools
04.08.2009 at
4:55 pm · Saved under
.NET Help
In this post, we’ll investigate some ways that Parallel Extensions can be used to introduce parallelism and asynchrony to I/O scenarios.
Here’s a simple scenario. I want to retrieve data from a number of web resources.
static string[] Resources = new string[]
{
“http://www.microsoft.com”, “http://www.msdn.com”,
“http://www.msn.com”, “http://www.bing.com”
};
Using the WebClient class, I might end up with the following.
var data = new List<byte[]>();
var wc = new WebClient();
foreach (string resource in Resources)
{
data.Add(wc.DownloadData(resource));
}
// Use the data.
However, these days, downloading data from the web usually utilizes only a small fraction of my available bandwidth. So there are potential performance gains here, and with TPL’s parallel ForEach loop, they are easily had.
var data = new ConcurrentBag<byte[]>();
Parallel.ForEach(Resources, resource =>
{
data.Add((new WebClient()).DownloadData(resource));
});
// Use the data.
Note that WebClient instances do not support multiple pending asynchronous operations (and the class is not thread-safe), so I need a separate instance for each operation. Also, since the normal BCL collections (List<T>, etc.) are not thread-safe, I need something like ConcurrentBag<T> to store the results. Of course, storing all the data in a collection assumes the scenario requires that all retrieval operations complete before processing. If this was not the case, I could start processing each data chunk right after obtaining it right in the loop, exploiting more parallelism. However, for the purposes of this investigation, I wanted to determine the possible performance gains in the absence of CPU-intensive work.
As it turns out, the above often yields linear speedup against sequential, with some variation due to the inconsistent nature of web site response times. And it was pretty straightforward. However, things would have been even easier had I started out with a “LINQ” frame of mind. First, I can convert my original sequential code to a LINQ query. Then, I can turn it into PLINQ using the AsParallel method and use WithDegreeOfParallelism to control the number of concurrent retrievals.
var data =
from resource in Resources
.AsParallel()
.WithDegreeOfParallelism(numConcurrentRetrievals)
select (new WebClient()).DownloadData(resource);
// Sometime later…
foreach (byte[] result in data) { }
(As an aside, it’s worth noting that WithDegreeOfParallelism causes PLINQ to use exactly numConcurrentRetrivals Tasks. This differs from the MaxDegreeOfParallelism option that I could have used with my previous Parallel.ForEach code, because that option sets the maximum; the actual number of threads still depends on the ThreadPool’s thread-adjusting logic.)
This code offers enhanced readability and makes storing the data easier. In addition, I can continue on the main thread, as PLINQ queries do not execute until the data they represent is accessed – that is, when MoveNext is called on the relevant enumerator. However, in this particular case, I don’t want to delay my query’s execution until I need the data; I actually want to execute my query while continuing on the main thread. To do so, I can wrap my query in a Task and force its immediate execution using ToArray.
var t = Task.Factory.StartNew(() =>
{
return
from resource in Resources
.AsParallel()
.WithDegreeOfParallelism(numConcurrentRetrievals)
select (new WebClient()).DownloadData(resource).ToArray();
});
// Sometime later…
foreach (byte[] result in t.Result) { }
// OR, use a continuation
t.ContinueWith(dataTask =>
{
foreach (byte[] result in dataTask.Result) { }
});
Now, I’ve got asynchrony, and I still get similar speedup. However, there’s still something about this code that is not ideal. The work (sending off download requests and blocking) requires almost no CPU, but it is being done by ThreadPool threads since I’m using the default scheduler. Ideally, threads should only be used for CPU-bound work (when there’s actually work to do). Of course, this probably won’t matter much for most typical client applications, but in scenarios where resources are tight, it could be a serious issue. Therefore, it’s worth investigating how we might reduce the number of blocked threads, perhaps by not using threads at all where possible.
To achieve this, I’ll be using ideas from a previous post: Tasks and the Event-based Asynchronous Pattern. That article explained how to create a Task<TResult> from any type that implements the EAP, and it presented an extension method for WebClient (available along with many others in the ParallelExtensionsExtras):
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address);
The key point is that this method produces a Task<TResult> by integrating WebClient’s EAP implementation with a TaskCompletionSource<TResult>, and I can use it to rewrite my scenario.
var tasks = new Queue<Task<byte[]>>();
foreach (string resource in Resources)
{
WebClient wc = new WebClient();
tasks.Enqueue(wc.DownloadDataTask(new Uri(resource)));
}
// Sometime later…
while (tasks.Count > 0)
{
byte[] result = tasks.Dequeue().Result;
}
// OR, use a continuation
Task<byte[]>.Factory.ContinueWhenAll(tasks.ToArray(), dataTasks =>
{
foreach (var dataTask in dataTasks)
{
byte[] result = dataTask.Result;
}
});
With this, I’ve got a solution that uses parallelism for speed-up, is asynchronous, and does not burn more threads than necessary!
To recap, in this post, we considered a typical I/O scenario. First, we saw how easy it was to arrive at solutions that are better than the sequential one. Then, we delved deeper to discover a more complex solution (integrating EAP with Tasks) that offers even more benefits.
Tagged: default-aspx • jobs • media • microsoft • parallel • plinq • post • silverlight • task-parallel-library • wikipedia
05.07.2009 at
4:39 pm · Saved under
.NET Help
In many data-parallel scenarios, all of the data to be processed is available immediately. This blog post addresses the opposite scenario: the inputs arrive gradually (as if in a stream), and we want to start producing results even before reading the last element of the input sequence.
There is a variety of scenarios in which inputs become available gradually rather than all at once. The inputs could arrive as requests across the network, inputs entered by a user, data read from an I/O device, results computed from another computation, and so on. We want to process the inputs in parallel, and make the partial results available as they are computed.
Simple PLINQ queries support streaming. A query that only consists of simple Select and Where operators will do streaming:
var q = inputSrc.Where(x => Foo(x)).Select(x => ExpensiveComputation(x));
foreach(var x in q)
{
Process(x);
}
In this example, Process(x) will get called on some input elements even before all elements of the input have been read.
On the other hand, not all queries support streaming. In fact, some queries cannot possibly support streaming. Consider this query that contains an OrderBy() operator:
var q = src.AsParallel().Select(x => Foo(x)).OrderBy(x => x);
We need to compute Foo(x) for all input elements, and only then we can yield the smallest element. Thus, a query that contains an OrderBy operator does not (and cannot) run in a streaming fashion.
So, what kinds of PLINQ queries do run in a streaming fashion? Generally, they are simple queries. The query can contain any number of Select and Where operators, except for the special positional variants. By positional overloads, we mean the overloads of Select and Where that accept a delegate that accesses both element values and their positions in the sequence. A positional Select or Where operator does prevents streaming if there is a Where operator anywhere prior to it in the query.
A streaming query can produce both ordered or unordered results. As is typical in PLINQ, the results are unordered by default, but you can opt into ordering by using the AsOrdered() operator:
var q = src.AsParallel().AsOrdered().Select(x => Foo(x)).Where(y => Bar(y));
In the above query, PLINQ will read the elements from the src enumerable, distribute them into different partitions, process them on multiple threads, and rearrange results into a correctly-ordered output sequence. All of these stages are happening concurrently, so first results will start getting produced while the inputs are still getting read (assuming that the input sequence is sufficiently long).
One important point about streaming PLINQ queries is that the algorithms we use are optimized for throughput rather than latency. PLINQ uses buffers internally, so a particular result may sit in an output buffer until a certain number of results have been produced. Obviously, that is undesirable if one of the results needs to be sent back to a client as quickly as possible.
One major benefit of streaming is that all data does not have to be loaded into memory at any one particular time. For example, if the query reads its inputs from one file and writes the outputs to another file, all of the inputs will not necessarily have to get loaded into memory at one time. Keep this use case in mind, and when you come across it in your development, use PLINQ to easily get your code to scale on multi-core machines.
Tagged: default-aspx • input • notification • order • post • silverlight • tags • tools
30.06.2009 at
11:09 am · Saved under
.NET Help
More and more, developers are realizing the significant scalability advantages that asynchronous programming can provide, especially as it relates to I/O.
Consider an application that needs to copy data from one stream to another stream, such as is being done in the following synchronous implementation:
static void CopyStreamToStream(Stream input, Stream output)
{
// Buffer space for the data to be read and written
byte [] buffer = new byte[0x2000];
// While there’s data to be read and written
while(true)
{
// Read data. If we weren’t able to read any, bail.
// Otherwise, write it out and start over again.
int numRead = input.Read(buffer, 0, buffer.Length);
if (numRead == 0) break;
output.Write(buffer, 0, numRead);
}
}
In many cases like this, such as if the Streams are FileStream instances representing files on disk, or NetworkStream instances representing remote data, the operations being performed require little-to-no computational power, as the majority of the time is spent waiting on the I/O subsystems and devices. If one such operation is being performed at a time, that waiting isn’t such a big deal. But in a synchronous implementation like the one above, that waiting ends up blocking a thread, rendering the thread useless to do anything else while waiting. Threads by default take up a sizeable chunk of memory as well as kernel resources. Thus, if multiple concurrent calls to CopyStreamToStream are executed, multiple threads may be wasted. Since threads consume a non-negligable amount of resources, we try to limit the number of threads in an application at any one time, such as by using a thread pool, and this synchronous style of programming I/O can lead to scalability bottlenecks, especially in server components where we desire to process as many user requests concurrently as the machine’s resources will possibly allow.
One solution to this problem is through compiler support. A compiler could recognize this synchronous pattern and translate it into an asynchronous one. Such a transformation of that same snippet might look like the following (note that this is hand-generated and is not the actual output of any particular compiler, and rather is my attempt to write this out concisely while still human-understandable):
static void CopyStreamToStreamAsync(
Stream input, Stream output, Action<Exception> completed)
{
// Buffer space for the data to be read and written
byte[] buffer = new byte[0x2000];
// The read/write loop. The parameter is the IAsyncResult of the
// last read operation that still need to be completed with a call
// to EndRead. If the parameter is null, that means a new read needs
// to be started.
Action<IAsyncResult> readWriteLoop = null;
readWriteLoop = iar =>
{
try
{
// Determine whether to start with a BeginRead or
// an EndRead/BeginWrite, based on whether iar is null.
// Then, as long as the loop continues, alternate between
// reading and writing.
for (bool isRead = iar == null; ; isRead = !isRead)
{
switch (isRead)
{
// Do BeginRead(…)
case true:
// Start the asynchronous read
iar = input.BeginRead(buffer, 0, buffer.Length, readResult =>
{
// If the read completed synchronously, immediately
// return from the callback, as the processing of the
// read will be handled synchronously
// from the same thread that called BeginRead
if (readResult.CompletedSynchronously) return;
// The read completed asynchronously, so we need
// to run the processing loop,
// starting with EndRead/BeginWrite.
readWriteLoop(readResult);
}, null);
// If the read is completing asynchronously, bail, as
// there’s nothing more to do.
// If it completed synchronously, loop around to do
// the EndRead/BeginWrite synchronously.
if (!iar.CompletedSynchronously) return;
break;
// Do BeginWrite(…)
case false:
// Complete the previous read. If there’s no more data
// to be read/written, bail.
int numRead = input.EndRead(iar);
if (numRead == 0)
{
completed(null);
return;
}
// Now that we know how much data was read, write it
// out asynchronously
iar = output.BeginWrite(buffer, 0, numRead, writeResult =>
{
// If the write completed synchronously, allow
// the thread that called BeginWrite
// to handle it.
if (writeResult.CompletedSynchronously) return;
// Otherwise, complete the asynchronous write
// and launch the read/write loop
// to continue all over again.
output.EndWrite(writeResult);
readWriteLoop(null);
}, null);
// If the write is completing asynchronously, bail,
// as there’s nothing more to do.
// Otherwise, complete the write synchronously and
// loop around.
if (!iar.CompletedSynchronously) return;
output.EndWrite(iar);
break;
}
}
} catch(Exception e) { completed(e); }
};
// Start the whole process off with a read.
readWriteLoop(null);
}
The Axum compiler, available on DevLabs, is actually capable of these kinds of transformations for asynchronous programming, and you could imagine such functionality being baked into a mainstream language like C#. Here’s how the code could be written asynchronously with Axum:
static asynchronous void CopyStreamToStream(Stream input, Stream output)
{
byte [] buffer = new byte[0x2000];
while(true)
{
int numRead = input.Read(buffer, 0, buffer.Length);
if (numRead == 0) break;
output.Write(buffer, 0, numRead);
}
}
That’s quite lovely. Comparatively, the hand-written code is mind-numbing, and it’s not something you want to have to write each and every time you need to perform some kind of repeating operation like this asynchronously.
As such, some developers have started to take advantage of C# iterators for writing asynchronous code. While not originally designed for this purpose, the compiler transformations employed for C# iterators are similar to what’s necessary for writing asynchronous code, and thus with a bit of library-based support, it’s possible to write an iterator that looks sequential but that takes advantage of asynchrony. Several libraries have been based on this approach, including the Concurrency & Coordination Runtime (CCR) from Microsoft Robotics, Jeffrey Richter’s AsyncEnumerator, and others.
The key to taking advantage of this pattern is yielding something from an iterator that can invoke a callback when an operation completes. The pattern then becomes:
IEnumerable<ThingThatHasCallbackWhenCompletes> AsyncMethod()
{
…
yield return SomethingThatReturnsThingThatHasCallbackWhenCompletes();
… // code here executes when the
// yielded ThingThatHasCallbackWhenCompletes completes
}
The idea is that the iterator method returns an IEnumerable of instances that represent a piece of an asynchronous operation. A utility function is used to invoke the iterator method, and iterates over the resulting enumerator. Each time the utility function gets the next instance from the enumerator, it registers some code to monitor the operation for completion, and when the operation completes, it moves next on the enumerator. Moving next on the enumerator results in re-entering the iterator method at the code location after the last yield point, thus allowing another asynchronous operation to be yielded. In this fashion, the iterator method can in effect yield asynchronous operations, and by using “yield return” to instrument the code with those async points, you as the developer can write an asynchronous method in a manner that looks largely sequential.
Now, think about the description above for the kind of object that needs to be yielded: “something that can invoke a callback when an operation completes.” Sound familiar? The System.Threading.Tasks.Task class in .NET 4 provides this exact functionality. A Task represents an asynchronous operation, and it has a ContinueWith method that enables a callback to be invoked when that asynchronous operation completes. Thus, we should be able to yield Task instances from an iterator in order to write an asynchronous method. Here’s the same CopyStreamToStream example implemented asynchronously in this fashion:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
// Buffer space for the data to be read and written
byte [] buffer = new byte[0x2000];
// While there’s data to be read and written
while(true)
{
// Read data asynchronously. When the operation completes,
// if no data could be read, we’re done.
var read = Task<int>.Factory.FromAsync(
input.BeginRead, input.EndRead, buffer, 0, buffer.Length, null,
TaskCreationOptions.DetachedFromParent);
yield return read;
if (read.Result == 0) break;
// Write the data asynchronously
yield return Task.Factory.FromAsync(
output.BeginWrite, output.EndWrite, buffer, 0, read.Result, null,
TaskCreationOptions.DetachedFromParent);
}
}
Much simpler. Where we were previously doing synchronous reads and writes, now we’re yielding the result of calling the built-in Task.Factory.FromAsync method, which creates Tasks that represent asynchronous reads and writes following the APM pattern. We could of course simplify this code further by using a few helper extension methods to hide some of the asynchronous details (these helpers are part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples):
public static Task<int> ReadTask(this Stream stream,
byte [] buffer, int offset, int count)
{
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead,
buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}
public static Task WriteTask(this Stream stream,
byte [] buffer, int offset, int count)
{
return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite,
buffer, offset, count, null, TaskCreationOptions.DetachedFromParent);
}
This then enables the previous code to be simplified to:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
byte [] buffer = new byte[0x2000];
while(true)
{
var read = input.ReadTask(buffer, 0, buffer.Length);
yield return read;
if (read.Result == 0) break;
yield return output.WriteTask(buffer, 0, read.Result);
}
}
That looks a lot like the synchronous version, and is *much* easier and less error-prone to write than the manual version shown earlier.
Of course, now we need a mechanism for iterating over the asynchronous iterator. As mentioned, we can take advantage of ContinueWith for the main body of the operation (as with the earlier helpers, the following method and several variants of it are available in the Beta 1 samples):
public static Task Iterate(this TaskFactory factory,
IEnumerable<Task> asyncIterator)
{
// Validate parameters
if (factory == null) throw new ArgumentNullException("factory");
if (asyncIterator == null)
throw new ArgumentNullException("asyncIterator");
// Get the scheduler to use, either the one provided by the factory
// or the current one if the factory didn’t have one specified.
var scheduler = factory.TaskScheduler ?? TaskScheduler.Current;
// Get an enumerator from the enumerable
var enumerator = asyncIterator.GetEnumerator();
if (enumerator == null) throw new InvalidOperationException();
// Create the task to be returned to the caller. And ensure
// that when everything is done, the enumerator is cleaned up.
var trs = new TaskCompletionSource<object>(factory.CreationOptions);
trs.Task.ContinueWith(_ => enumerator.Dispose(),
TaskContinuationOptions.DetachedFromParent, scheduler);
// This will be called every time more work can be done.
Action<Task> recursiveBody = null;
recursiveBody = antecedent =>
{
try
{
// If the previous task completed with any exceptions, bail
if (antecedent != null && antecedent.IsFaulted)
trs.TrySetException(antecedent.Exception);
// If the user requested cancellation, bail.
else if (trs.Task.IsCancellationRequested) trs.TrySetCanceled();
// If we should continue iterating and there’s more to iterate
// over, create a continuation to continue processing. We only
// want to continue processing once the current Task (as yielded
// from the enumerator) is complete.
else if (enumerator.MoveNext())
enumerator.Current.ContinueWith(recursiveBody,
TaskContinuationOptions.DetachedFromParent, scheduler).
IgnoreExceptions();
// Otherwise, we’re done!
else trs.TrySetResult(null);
}
// If MoveNext throws an exception, propagate that to the user
catch (Exception exc) { trs.TrySetException(exc); }
};
// Get things started by launching the first task
factory.StartNew(() => recursiveBody(null),
TaskCreationOptions.DetachedFromParent, scheduler).
IgnoreExceptions();
// Return the representative task to the user
return trs.Task;
}
Using this implementation, we can now run “asynchronous methods” that return IEnumerable<Task>, as did our CopyStreamToStreamAsync method:
var asyncOperation = Task.Factory.Iterate(
CopyStreamToStreamAsync(input, output));
Note that the Iterate implementation shown includes a few handy additions on top of the previous hand-coded solution. First, the Iterate method returns a Task, which can be used to track the entire asynchronous operation. Second, it supports cancellation, meaning a caller can request that the asynchronous iteration to shutdown early, even if it hasn’t completed yet. Third, we’ve now separated out the run logic into a separate method, which means that we no longer need all of that goop in the actual target asynchronous method.
On top of all that, this implementation is now based on TaskFactory, which means we can do things like ensure that the code runs on a certain scheduler, such as a scheduler that targets the UI. As an example of where that is handy, consider a method that asynchronously reads from a long, remote stream and stores the resulting data into a TextBox as it’s available:
static IEnumerable<Task> ReadStreamIntoTextBox(Stream stream)
{
byte [] buffer = new byte[0x2000];
Encoding enc = new UTF8Encoder();
while(true)
{
var read = stream.ReadTask(buffer, 0, buffer.Length);
yield return read;
if (read.Result == 0) break;
myTextBox.Text += enc.GetString(buffer, 0, read.Result)
}
}
As previously shown, I could invoke this method as follows:
public void button1_Click(object sender, EventArgs e)
{
Task.Factory.Iterate(ReadStreamIntoTextBox(inputStream)); // buggy
}
However, accessing myTextBox.Text from a thread other than the thread that created myTextBox is a no-no, and yet that’s potentially what will happen in the above. To address that, I want to ensure that the actual code from the iterator is executed on the UI thread (but I still don’t want the asynchronous operations to block the UI thread). To accomplish that, I can create a TaskFactory that will run tasks on the UI thread, as I do in the following code:
public void button1_Click(object sender, EventArgs e)
{
var uiFactory = new TaskFactory(
TaskScheduler.FromCurrentSynchronizationContext());
uiFactory.Iterate(ReadStreamIntoTextBox(inputStream));
}
Of course, while just being able to yield individual operations is useful, things get more interesting when you start considering multi-task continuations, as exposed through ContinueWhenAny and ContinueWhenAll. For example, in our previous copy stream example, we’re reading, then writing, then reading, then writing, and so forth. But we should be able to write the previously read bits while reading the next chunk, thereby achieving better speeds by overlapping latencies. Writing the code to do that using manual asynchrony would be a nightmare… with iterators and tasks, it’s manageable, almost fun:
static IEnumerable<Task> CopyStreamToStreamAsync(
Stream input, Stream output)
{
byte[][] buffers = new byte[2][] {
new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
int filledBufferNum = 0;
Task writeTask = null;
while (true)
{
var readTask = input.ReadTask(
buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
yield return writeTask == null ?
readTask :
Task.Factory.ContinueWhenAll(new[] { readTask, writeTask },
tasks => Task.WaitAll(tasks), // to propagate exceptions
TaskContinuationOptions.DetachedFromParent);
if (readTask.Result == 0) break;
writeTask = output.WriteTask(
buffers[filledBufferNum], 0, readTask.Result);
filledBufferNum = filledBufferNum == 0 ? 1 : 0;
}
}
Here, ContinueWhenAll is used to ensure that the iterator isn’t re-entered until both any pending writes and reads have completed.
There are of course many variations to this code that you could implement. At the end of the day, I find it quite interesting to see how the Task primitive can be used to enable such scenarios.
![Asynchronous methods, C# iterators, and Tasks Asynchronous methods, C# iterators, and Tasks]()
Tagged: -net-4-0 • asynchronous • code-samples • data • microsoft • parallel-extensions • post • previous • task-parallel-library • user
24.06.2009 at
5:39 pm · Saved under
.NET Help
In concurrent programs, race conditions are a fact of life but they aren’t all bad. Sometimes, race conditions are benign, as is often the case with lazy initialization.
The problem with racing to set a value, however, is that it can result in multiple objects being instantiated when only one is needed. Take the LazyInitializer class that shipped in Visual Studio 2010 Beta 1, for example. The method EnsureInitialized(ref T target, Func<T> valueFactory) accepts a ByRef value and a function that produces a value. If target is null, EnsureInitialized will execute valueFactory and set target to the return value. If multiple threads happen to overlap calls to EnsureInitialized, which is likely to be a rare occurrence, these threads may both initially see target as null and then execute valueFactory. One thread will win the race and will get to set target.
In Beta 1, we thought we’d be really smart by disposing of the objects that were created by threads that lost the race, if they implemented IDisposable. Turns out that’s a pretty bad thing to do. With lazy initialization, we assumed that most of time valueFactory would be creating a new value but that isn’t always the case. It’s quite possible that a valueFactory could return an object that has already been created, in which case, we’d be disposing of an object we didn’t own.
We’re making a couple of changes in the Parallel Extensions to ensure that we generally don’t dispose of object we aren’t sure we own and, moving forward, any new APIs that do eagerly instantiate objects that might not be used will not dispose of said objects.
Now you might say, “why? Ninety-percent of the time I am creating a new object with my value factory and it might be a really heavy object like a file or wait handle!” We hear you, but typically, once an object is disposed, there is no bringing it back and so by disposing of these objects we don’t allow anyone to opt out of that behavior if need be. If we don’t dispose, the GC will still cleanup your unused objects and if you really need to dispose of an object manually, there is a work around, i.e.
ManualResetEvent theEvent = null;
ManualResetEvent tmpEvent;
LazyInitializer.EnsureInitialized(ref theEvent,
() => { return tmpEvent = new ManualResetEvent(false); });
if (tmpEvent != theEvent) tmpEvent.Close();
In fact, you could easily create a small wrapper function to do the work for you, i.e.
public static T EnsureInitializedWithDispose<T>(ref T target, Func<T> function)
where T : class, IDisposable
{
T tmp = default(T);
T actual = LazyInitializer.EnsureInitialized(ref target, () => tmp = function());
if (actual != tmp) ((IDisposable)tmp).Dispose();
return actual;
}
And the issue isn’t just about race conditions. In general, we should never dispose of an object we didn’t explicitly create. BlockingCollection<T> in Beta 1, for example, will dispose of it’s underlying collection if the BlockingCollection<T> itself is disposed. Again, this is bad news if you only were using the BlockingCollection<T> as a temporary wrapper. In future releases of Visual Studio 2010, this will be corrected.
So when using these APIs, and any other API that may consume or create an IDisposable object, think about what that API is doing with the object and if you need to clean up resources manually. If you’re not sure, the finalizer should take care of most of your problems. In the rare situations where immediate cleanup is important, make sure you keep track of the objects and dispose of them properly. Also, when designing your own libraries, don’t dispose of objects you don’t own!
![Don’t dispose of objects that you don’t own Don’t dispose of objects that you don’t own]()
Tagged: architecture • contests • duarte-nunes • jobs • microsoft • phillips-joshua • post • visual-studio • wikipedia
22.06.2009 at
4:00 am · Saved under
.NET Help
One of the great features that crosses all of Parallel Extensions types is a consistent approach to cancellation (see http://blogs.msdn.com/pfxteam/archive/2009/05/22/9635790.aspx). In this post we explore some of the ways cancellation is used in Parallel Extensions and explain the guidance we developed.
The new cancellation system is a cooperative approach based on two new types: CancellationTokenSource, which initiates cancellation requests, and CancellationToken which communicates a cancellation request to asynchronous operations and to long-running and blocking method calls.
If you are experimenting with Parallel Extensions, you might like to keep an eye out for the methods that accept a CancellationToken and test them out.
Blocking calls
Some of the Parallel Extension types introduce blocking methods, for example BlockingCollection.Take() and Task.Wait(). The default overloads for these methods will block indefinitely if the condition they are waiting for never occurs. One solution used by these APIs, and many others like them, is to provide a timeout overload so that they may return after some duration and report that the condition did not occur. However, a timeout isn’t particularly convenient if you only want to stop waiting if a specific activity occurs, such as a user clicking a ‘cancel’ button. One possibility is to wait for some amount of time such as 100 milliseconds, check if the button was pressed, and if not then go back to waiting, but this puts the burden on the call-site and must be re-implemented repeatedly; and depending on the frequency of the polling, it may also add unnecessary cost. The types in Parallel Extensions use the new cancellation system and provide overloads that accept a CancellationToken which can cause early termination of blocking methods in response to a specific cancellation request.
Here are some examples of the blocking calls in Parallel Extensions that accept a CancellationToken:
BlockingCollection:
void Add(T item, CancellationToken cancellationToken)
T Take(CancellationToken cancellationToken)
ManualResetEventSlim:
void Wait(CancellationToken cancellationToken)
Task:
void Wait(CancellationToken cancellationToken)
void WaitAll(Task[] tasks, CancellationToken cancellationToken)
In each case, you supply a CancellationToken to the method, and if the CancellationToken is signaled via a call to the associated CancellationTokenSource.Cancel(), then the method will wake up and throw an OperationCanceledException. This frees you from the need to use the timeouts if you are happy to wait until the condition is true or a specific cancellation request occurs. Many of these methods also have overloads that accept both a CancellationToken and a timeout, so that both needs may be accommodated.
Long-running calls
There are also a variety of methods that may simply take a while to complete, particularly if large amounts of data or complex processing is taking place. For example, the following call is likely to take some time, even if individual calls to DoFunc(x) are relatively fast:
Parallel.For(1, 1000000000, (x) => DoFunc(x));
To assist with this, the Parallel APIs have overloads that accept a ParallelOptions class instance that holds a CancellationToken. The machinery inside Parallel.For(…)observes the supplied CancellationToken and will exit early if it sees that the token has be signaled (for more information on exiting loops early, see http://blogs.msdn.com/pfxteam/archive/2009/05/27/9645023.aspx). For example,
CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions options = new ParallelOptions
{CancellationToken = cts.Token};
Parallel.For(1, 100, options, (x) => DoSlowFunc(x));
// from another thread, call cts.Cancel() to request cancellation.
As you can see, the approach for canceling blocking calls and long-running calls is identical. In both cases you supply a CancellationToken to a specific method call and use the associated CancellationTokenSource to signal a cancellation request.
Digression: Internal cancellation
In certain situations in Parallel Extensions and in other systems, it is necessary to wake up a blocked method for reasons that aren’t due to explicit cancellation by a user. For example, if one thread is blocked on blockingCollection.Take() due to the collection being empty and another thread subsequently calls blockingCollection.CompleteAdding(), then the first call should wake up and throw an InvalidOperationException to represent an incorrect usage. A great way to implement this is to use an internal CancellationTokenSource that can wake things up due to internal concerns, and to link it to a second, external CancellationTokenSource that has been supplied by the user. For example, the following captures the essential details of how CompleteAdding() and Take(CancellationToken) are implemented:
class BlockingCollection<T>
{
CancellationTokenSource internalCTS = new CancellationTokenSource();
public void FinishAdding()
{
//…
internalCTS.Cancel();
}
public T Take(CancellationToken externalToken)
{
using(CancellationTokenSource linkedTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(
internalCTS.Token, externalToken)){
try
{
return InternalTake(linkedTokenSource.Token);
}
catch (OperationCanceledException oce)
{
if(internalCTS.Token.IsCancellationRequested)
throw new InvalidOperationException(”..msg..”);
else if(externalToken.IsCancellationRequested)
throw new OperationCanceledException(externalToken);
else throw;
}
}
}
In this fragment, a linked CancellationTokenSource is created and its token passed to the InternalTake() method. This simplifies the implementation for InternalTake() but still allows it to be woken up due to either the external token being signaled or an incorrect concurrent call to CompleteAdding(). If the InternalTake() method throws an OperationCanceledException, we can test the individual tokens to determine what the cause was and take appropriate action. Note that oce.CancellationToken will be equal to linkedTokenSource.Token as this is the token that was actually being observed, but we can look at the original tokens to determine the cause for the linked token being signaled. There is a minor risk of confusion if both sources are signaled simultaneously, but this confusion is benign as it is reasonable to behave as though either source was responsible, and a particular implementation may choose to prioritize one mechanism over the other.
Cooperative cancellation with user-code
Both TPL and PLINQ provide infrastructure that calls back to user code. If the user code is long running or blocking, then it is very useful for the user code to be able to respond to cancellation in a cooperative fashion. TPL and PLINQ achieve this by tracking a supplied CancellationToken, and if they see some user code throw an OperationCanceledException that mentions this specific token, then it is treated as cooperative acknowledgement of cancellation. This means that both TPL and PLINQ understand that the user code did not suffer some catastrophic exceptional conditional, but rather that it is simply responding to cancellation. As a result, TPL and PLINQ constructs will simply exit with an OperationCanceledException of their own, rather than aggregating exceptions from all the participating worker threads and throwing an AggregateException.
PLINQ follows this basic plan; for example, the following query will throw a single OperationCanceledException if the CancellationTokenSource becomes signaled. It doesn’t matter whether PLINQ or the user delegate sees the cancellation request first, as the user-delegate exception will be correctly understood by the PLINQ execution engine.
CancellationTokenSource cts = new CancellationTokenSource();
var resultArray =
data.AsParallel()
.WithCancellation(cts.Token)
.Select((x) =>
{
if (cts.Token.IsCancellationRequested)
throw new OperationCanceledException(cts.Token);
// …
})
.ToArray();
In both PLINQ and TPL, it is interesting to note that the external cancellation token is not supplied back to the user-delegate via a parameter as may be expected. This would have bloated the number of method overloads considerably and, as shown above, the external token can be conveniently accessed via closures or equivalent techniques.
If the user code throws an OperationCanceledException but one or more “normal” exceptions occur on other threads, then all of the exceptions will be collated into an AggregateException, including the OperationCanceledException. Hence, if only cancellation occurred then a single OperationCanceledException is thrown, but an actual failure will always result in an AggregateException.
PLINQ Queries
PLINQ offers deep support for cancellation and endeavors to ensure that cancellation will be enacted swiftly for any query. For queries that involve only simple and fast user delegates, simply supply a CancellationToken via the WithCancellation() method and PLINQ takes care of the rest. If your query involves long-running or blocking user code, then you should use the cooperative cancellation pattern described in the preceding section. For reasons that are discussed in the next section, PLINQ may not check for cancellation every time it calls a user-delegate, and so long running delegates should check cancellation frequently (as a working rule of thumb, PLINQ will check for cancellation approximately once per one hundred calls to a user delegate). For example, if a delegate call runs for 50ms but does not perform any cancellation checks of its own, then PLINQ may not detect the cancellation request itself for up to 5 seconds, and this could significantly affect a user’s experience. For this reason, we recommend that long-running delegates called from PLINQ queries should check for cancellation as frequently as possible. A good lower bound is to check cancellation once per few thousand IL instructions executed, and an upper bound is to check no less frequently than once per 10ms, but once per 1ms is preferred to ensure snappy response.
The following example demonstrates a user delegate that may run for a while, and so checks cancellation itself every few thousand IL instructions. This provides for timely cancellation support with a negligible cost to performance.
// Assume an external CancellationToken ‘token’ has been supplied.
int[] data = …;
var query =
data.AsParallel()
.WithCancellation(token)
.Select((x) =>
{
int result = 0;
for ( int i = 0; i < x; i++ ) {
for (int j = 0; j < 1000 * x; j++)
{
result += SimpleFunc(i,j);
}
token.ThrowIfCancellationRequested();
}
return result;})
.ToArray();
[Update: the call to token.ThrowIfCancellationRequested shown above is not available in Beta1 (but will appear in subsequent releases). This behavior can be acheived via a manual test-and-throw as shown in earlier examples, or via the extension-method approach described in the comments. -Mike]
Summary
The Parallel Extension types use the new cancellation system to provide consistent and rich cancellation features. All of the long-running and blocking methods in Parallel Extensions provide overloads that take a CancellationToken, and any callbacks made to user-code can participate in the process. Application code can also make use of the same ideas and facilities to ensure that cancellation can be a fully supported feature in any situation.
Tagged: cancellation • cds • contests • default-aspx • operation • parallel-extensions • post • silverlight • task-parallel-library • user
19.06.2009 at
1:43 pm · Saved under
.NET Help
As has been discussed previously, one of the new features in the Task Parallel Library is TaskCompletionSource<TResult>, which enables the creation of a Task<TResult> that represents any other asynchronous operation.
There are a wide variety of sources in the .NET Framework for asynchronous work. One comes from components that implement the Asynchronous Programming Model (APM) pattern, which we discussed here. Another includes types that implement the Event-based Asynchronous Pattern (EAP). For some synchronous method Xyz, the EAP provides an asynchronous counterpart XyzAsync. Calling this method launches the asynchronous work, and when the work completes, a corresponding XyzCompleted event is raised. (That’s an oversimplification of the pattern, but it provides a grounding.)
For many situations, the EAP is quite straightforward and simple to use. However, there are cases where you’d like to be able to do things like join across multiple EAP asynchronous invocations, such as to download three different web pages asynchronously, and only when all three have completed do something else. Tasks in .NET 4.0 make this kind of operation easy, through Task.Factory.ContinueWhenAll (or ContinueWhenAny if you want to do something when any one of the items completes rather than when all of them do). However, in order to use these methods, you need Tasks, thus to use them with components that implement the EAP, you need to create Tasks from EAP. TaskCompletionSource<TResult> can be used to do exactly that.
First, let’s create two small helper functions that we can reuse over and over for multiple EAP-to-Task implementations:
private static TaskCompletionSource<T> CreateSource<T>(object state)
{
return new TaskCompletionSource<T>(
state, TaskCreationOptions.DetachedFromParent);
}
private static void TransferCompletion<T>(
TaskCompletionSource<T> tcs, AsyncCompletedEventArgs e,
Func<T> getResult)
{
if (e.UserState == tcs)
{
if (e.Cancelled) tcs.TrySetCanceled();
else if (e.Error != null) tcs.TrySetException(e.Error);
else tcs.TrySetResult(getResult());
}
}
The first helper, CreateSource, simply creates an instance of TaskCompletionSource<T> with the right type and settings. I’ve included TaskCreationOptions.DetachedFromParent under the assumption that tasks created for the EAP pattern aren’t necessarily meant to participate in parent/child relationships, but you could certainly change this if your scenarios needed that functionality.
The second helper, TransferCompletion, will be used whenever an EAP event signals completion of an asynchronous operation. It takes the AsyncCompletedEventArgs provided through the XyzCompleted event and uses it to determine whether the operation completed due to cancellation or due to an unhandled exception. Both of those pieces of data are standard to the base AsyncCompletedEventArgs class. However, each EAP operation typically comes with its own type derived from AsyncCompletedEventArgs: to be able to use this one helper function with any of those to mine the result of the operation, TransferCompletion also accepts a Func<T> that will return the results from the derived instance.
With those two helpers, writing a Task wrapper for an EAP operation is a cinch. Consider System.Net.WebClient, which provides support for downloading and uploading to and from URIs, with methods like DownloadData. We can write a Task-based wrapper for DownloadData in just a few lines (in this case, as an extension method):
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
webClient.DownloadDataCompleted +=
(sender, e) => TransferCompletion(tcs, e, () => e.Result);
webClient.DownloadDataAsync(address, tcs);
return tcs.Task;
}
We first create a TaskCompletionSource<byte[]> whose Task<byte[]> will be returned from the method. We then register with the DownloadDataCompleted event handler, such that when the event is raised, the downloaded data (or exception or cancellation information) will be transferred to the returned Task. And then we start the operation. Piece of cake.
One interesting thing to note about the EAP is that some implementations support multiple asynchronous operations concurrently on the same instance. In such cases, a user-supplied token is needed to correlate the asynchronous invocations to the asynchronous completions (something that’s provided in Task implicitly by having a Task reference returned from and serving as a reference for an asynchronous operation). For that purpose, we pass the created TaskCompletionSource<TResult> as the user-supplied token, and in TransferCompletion, we only transfer the results if the token received matches the target completion source. If it doesn’t match, this is an event completion for another operation and should be ignored.
One potential issue with the previously shown implementation (and with the EAP pattern in general) is that, if they’re used over and over, the XyzCompleted event handlers will start to build up. Delegates are being registered with the event but not released. To fix that, we can also remove the event handler in the delegate handling the event. For example, DownloadDataTask can be rewritten as:
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
DownloadDataCompletedEventHandler handler = null;
handler = (sender, e) => {
TransferCompletionToTask(tcs, e, () => e.Result);
webClient.DownloadDataCompleted -= handler;
};
webClient.DownloadDataCompleted += handler;
try { webClient.DownloadDataAsync(address, tcs); }
catch {
webClient.DownloadDataCompleted -= handler;
tcs.TrySetCanceled();
throw;
}
return tcs.Task;
}
The Beta 1 samples available at http://code.msdn.microsoft.com/ParExtSamples already include Task-based extensions for WebClient, as well as extensions for other EAP implementations like SmtpClient and Ping. Download and enjoy!
![Tasks and the Event based Asynchronous Pattern Tasks and the Event based Asynchronous Pattern]()
Tagged: -net-4-0 • contests • download • eap • event • media • parallel • parallel-extensions • post • task-parallel-library • wikipedia
13.06.2009 at
4:39 am · Saved under
.NET Help
As Ed Essey explained in Partitioning in PLINQ, partitioning is an important step in PLINQ execution. Partitioning splits up a single input sequence into multiple sequences that can be processed in parallel. This post further explains chunk partitioning, the most general partitioning scheme that works on any IEnumerable<T>.
Chunk partitioning appears in two places in Parallel Extensions. First, it is one of the algorithms that PLINQ uses under the hood to execute queries in parallel. Second, chunk partitioning is available as a standalone algorithm through the Partitioner.Create() method.
To explain the design of the chunk partitioning algorithm, let’s walk through the possible ways of processing an IEnumerable<T> with multiple worker threads, finally arriving at the solution used in PLINQ (approach 4).
Approach 1: Load the input sequence into an intermediate array
As a simple solution, we could walk over the input sequence and store all elements into an array. Then, we can split up the array into ranges, and assign each range to a different worker.
The disadvantage of this approach is that we need to allocate an array large enough to store all input elements. If the input sequence is long, this will algorithm leads to unnecessarily large memory consumption. Also, we need to wait until the entire input sequence is ready before the workers can start executing.
Approach 2: Hand out elements to threads on demand
An entirely different approach is to have all worker threads share one input enumerator. When a worker is ready to process the next input element, it takes a shared lock, gets the next element from the input enumerator, and releases the lock.
This algorithm has a fairly large overhead because processing every element requires locking. Also, handing out elements individually is prone to poor cache behavior.
This approach does have an interesting advantage over Approach 1, though: since workers receive data on demand, the workers that finish faster will come back to request more work. In contrast, Approach 1 splits up all work ahead of time, and a worker that is done early simply goes away.
Approach 3: Hand out elements in chunks
To mitigate the two drawbacks of Approach 2 (synchronization cost and cache behavior), we can hand out elements to threads in “chunks”. When a thread is ready to process more inputs, it will take say 64 elements from the input enumerator.
Unfortunately, while this approach nicely amortizes the synchronization cost over multiple elements, it does not work well for short inputs. For example, if the input contains 50 elements and the chunk size is 64, all inputs will go into a single partition. Even if the work per element is large, we will not be able to benefit from parallelism, since one worker gets all the work.
And since IEnumerable<T> in general does not declare its length, we cannot simply tune the chunk size based on the input sequence length.
Approach 4: Hand out elements in chunks of increasing size
A solution to the problem with small inputs is to use chunks of a growing size. The first chunk assigned to each thread is of size 1 and subsequent chunks are gradually larger, until a specific threshold is reached.
Our solution doubles the chunk size every few chunks. So, each thread first receives a few chunks of size 1, then a few chunks of size 2, then 4, and so forth. Once the chunk size reaches a certain threshold, it remains constant.
This chunking strategy ensures that if the input is short, it will still get split up fairly among the cores. But, the chunk size also grows fairly quickly, and the per-chunk overheads are small for large inputs. Also, the algorithm is quite good at load-balancing, so if one worker is taking longer to process its inputs, other workers will process more elements to decrease the overall processing time.
One interesting consequence of the chunk partitioning algorithm is that multiple threads will call MoveNext() on the input enumerator. The worker threads will use a lock to ensure mutual exclusion, but the enumerator must not assume that MoveNext() will be called from a particular thread (e.g., it should not use thread-local storage, manipulate UI, etc).
The current implementation of both PLINQ chunk partitioning and Partitioner.Create() follows approach 4 fairly closely. Now you know how it behaves and why!
Tagged: -net-4-0 • a-few-chunks • approach • default-aspx • igoro • input • parallel-extensions • partitioner • post • silverlight • time • tools