22.09.2009 at
11:46 am · Saved under
.NET Help
The Task abstractions in .NET 4 run on instances of the TaskScheduler class. Two implementations of TaskScheduler ship as part of the .NET Framework 4. The first is the default scheduler, which is integrated with the .NET 4 ThreadPool and takes advantage of its work-stealing queues. The second is the type of TaskScheduler returned from the static method TaskScheduler.FromCurrentSynchronizationContext.
According to MSDN, SynchronizationContext “provides the basic functionality for propagating a synchronization context in various synchronization models.” What does that really mean? At its core, SynchronizationContext provides two methods, Send and Post, both of which accept a delegate to be executed. Send synchronously invokes the delegate, and Post asynchronously invokes the delegate. That’s it, and the base implementation of SynchronizationContext doesn’t do anything fancier than that:
public virtual void Send(SendOrPostCallback d, object state)
{
d(state);
}
public virtual void Post(SendOrPostCallback d, object state)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(d.Invoke), state);
}
Where things get interesting is when new types are derived from SynchronizationContext, something typically done by a UI framework (though there are other non-UI framework implementations). For folks familiar with Windows Forms and Windows Presentation Framework development, you’re likely aware that UI controls should only be accessed by the thread that created them, almost always the main UI thread. Thus, if a thread doing work in the background wants to update something in the UI, it needs to marshal that work back to the GUI thread so that the controls may be accessed safely. Different UI frameworks expose different ways for accomplishing this marshaling. For example, in Windows Forms, one uses the Invoke or BeginInvoke method of the target Control (or at least a Control created on the same thread as the target Control). In WPF, one uses the target thread’s Dispatcher and corresponding Invoke/BeginInvoke methods. With every UI framework having its own model for marshaling work to a particular “synchronization context”, it becomes difficult to write code that supports this marshaling concept but which is agnostic to the particular environment that it’s in. Enter SynchronizationContext. A new type may be derived from SynchronizationContext such that its Send method synchronously marshals a delegate to the right thread for execution, and Post does the same but asynchronously. If you look at the implementations of the SynchronizationContexts provided by Windows Forms and WPF, that’s exactly what they do, delegating to the relevant Invoke/BeginInvoke methods from Send and Post to marshal the work correctly.
To make it easy to get at the right SynchronizationContext, a UI framework like Windows Forms will publish an instance of its SynchronizationContext-derived class to SynchronizationContext.Current. Code can then grab SynchronizationContext.Current and use it to marshal work, without having to know whether it’s being used from Windows Forms or Windows Presentation Foundation or another similar model.
“TaskScheduler.FromCurrentSynchronizationContext” should now make more sense. This method creates a TaskScheduler that wraps the SynchronizationContext returned from SynchronizationContext.Current. Thus, this gives you a TaskScheduler that will execute Tasks on the current SynchronizationContext. Why is that useful? It means you can create Tasks that are able to access UI controls safely, simply by running them on the right scheduler.
Let’s say that I wanted to load three images from some data source. When those images have been loaded, I want to blend them all together, and then I want to display the result into a PictureBox on my UI. Using Tasks and TaskScheduler.FromCurrentSynchronizationContext, I could write code like the following:
private void Button1_Click(…)
{
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var tf = Task.Factory;
// Load the three images asynchronously
var imageOne = tf.StartNew(() => LoadFirstImage());
var imageTwo = tf.StartNew(() => LoadSecondImage());
var imageThree = tf.StartNew(() => LoadThirdImage());
// When they’ve been loaded, blend them
var blendedImage = tf.ContinueWhenAll(
new [] { imageOne, imageTwo, imageThree }, _ =>
BlendImages(imageOne.Result, imageTwo.Result, imageThree.Result));
// When we’re done blending, display the blended image
blendedImage.ContinueWith(_ =>
{
pictureBox1.Image = blendedImage.Result;
}, ui);
}
This code runs three tasks to load the three input images asynchronously. When all of those have been loaded, again asynchronously some BlendImages method is used to blend the images, taking the three inputs and returning the blended image. Finally, once that’s done, another task is used to render the blended image by storing it into a PictureBox on the UI. Since this modifies a UI control, we need to do it from the UI thread. Thus, we pass a TaskScheduler to the ContinueWith method; this scheduler targets the UI’s SynchronizationContext, and will cause the Task to execute on the UI thread.
TaskScheduler.FromCurrentSynchronizationContext is provided for convenience and because this is a very common need. However, due to TaskScheduler’s extensibility, it’s actually possible to implement this behavior yourself, and in doing so you could modify it to suit your own needs however you see fit.
Let’s say you did want to develop a new SynchronizationContextTaskScheduler. It might look something like this:
public class SynchronizationContextTaskScheduler : TaskScheduler
{
private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
private SynchronizationContext _context;
public SynchronizationContextTaskScheduler() :
this(SynchronizationContext.Current) { }
public SynchronizationContextTaskScheduler(
SynchronizationContext context)
{
if (context == null) throw new ArgumentNullException(”context”);
_context = context;
}
protected override void QueueTask(Task task)
{
// Add the task to the collection
_tasks.Enqueue(task);
// Queue up a delegate that will dequeue and execute a task
_context.Post(delegate
{
Task toExecute;
if (_tasks.TryDequeue(out toExecute)) TryExecuteTask(toExecute);
}, null);
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return SynchronizationContext.Current == _context &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel { get { return 1; } }
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
}
Not a lot of code for a fairly powerful thing.
- The constructors simply accept the target SynchronizationContext and store it, also initializing a thread-safe queue that will store the tasks to be executed.
- The QueueTask method is called whenever the system is providing a Task for this scheduler to execute: this scheduler handles it by storing that Task into a queue, and then Post’ing to the SynchronizationContext a delegate that will pull the next Task from the queue and execute it.
- The TryExecuteTaskInline is invoked any time the system wants to run a Task inline on the current thread (either from a call to RunSynchronously or from a Wait attempt): we need to make sure that the call is coming from the same SynchronizationContext as the target, otherwise we may end up running in the wrong context.
- We only intended to support SynchronizationContexts that represent a single thread of execution (that’s most common), so we return 1 from MaximumConcurrencyLevel.
- And we want the debugger to be able to display tasks scheduled to this scheduler, so we override GetScheduledTasks to return an array of the tasks queued. (This is why we need to explicitly store the queue of tasks; otherwise, we could have simply relied on lambda closures to capture each task to be executed.)
![Task Scheduler From Current Synchronization Context]()
Tagged: code-samples • contests • parallel-extensions • queue • silverlight • target • task-parallel-library • threadpool • tools • visual-studio-2010 • wikipedia • windows-forms
30.08.2009 at
2:35 am · Saved under
.NET Help
Every System.Threading.Tasks.Task instance goes through a lifecycle, and it only makes this journey once. To provide insight into where in that lifecycle a given Task is, the Task class provides an instance Status property. That property returns a value from the TaskStatus enumeration that reflects the current point in the lifecycle.
“Current” is the operative word here. Many of the states are transient, meaning that we’re dealing with a concurrent system, and by the time you’ve received the value of the status, the status may have changed. However, the lifecycle does not contain cycles (as ironic as that may sound), meaning that there’s a directionality to the various states, and once a given state has been reached, the Task won’t return back to that state again or to any that came before it. Additionally, there are three “final” states, where once in those states, the Task is no longer executing and will no longer change state.
Here are the states and an informal description of what they imply:
| TaskStatus Enumeration Value |
Informal Description |
| Created |
This is the starting state for tasks created through a Task constructor. Tasks in this state will not leave the state until Start or RunSynchronously is called on the instance or until the Task is canceled. |
| WaitingForActivation |
This is the starting state for tasks created through methods like ContinueWith, ContinueWhenAll, ContinueWhenAny, and FromAsync, as well as from a TaskCompletionSource<TResult>. The task isn’t currently scheduled, and won’t be until some dependent operation has completed (some tasks may never be scheduled, such as those created by a TaskCompletionSource<TResult> that have nothing relevant to be scheduled). For example, a task created by ContinueWith won’t be scheduled until the antecedent task (the one off of which ContinueWith was called) completes execution. |
| WaitingToRun |
The task has been scheduled to a TaskScheduler and is waiting to be picked up by that scheduler and run. This is the starting state for tasks created through TaskFactory.StartNew; by the time the Task is returned from StartNew, it will already have been scheduled, and thus the state will be at least WaitingToRun (“at least”, since by the time StartNew returns, the Task could of course have already started or even completed executing). |
| Running |
The Task is currently executing. |
| WaitingForChildrenToComplete |
A Task isn’t considered complete until its attached children have completed. If a Task has finished executing its code body, it will leave the Running state, and if it’s then implicitly waiting for its children to complete, it will enter this state. |
| RanToCompletion |
One of the three final states. A Task in this state has successfully completed execution, running to the end of its body without cancellation and without throwing an unhandled exception. |
| Canceled |
One of the three final states. A Task in this state completed execution, but it did so through cancellation. To end in the Canceled state, a Task must either have cancellation requested prior to starting execution, or it must acknowledge a cancellation request during its execution. |
| Faulted |
One of the three final states. A Task in this state completed execution due to an unhandled exception in its body or due to one of its attached children completing in this state. |
Several of these status values are also communicated through helper properties on the Task class. For a Task t:
- t.IsCanceled returns true if and only if t.Status equals TaskStatus.Canceled.
- t.IsFaulted returns true if and only if t.Status equals TaskStatus.Faulted. Additionally, t.Exception will be non-null if and only if t.IsFaulted is true.
- t.IsCompleted returns true if and only if t.Status equals TaskStatus.RanToCompletion, TaskStatus.Canceled, or TaskStatus.Faulted (i.e. one of the three final states). Note that IsCompleted does not just correspond to RanToCompletion, but rather to all final states.
Several operations in the Task Parallel Library are tied to the concept of a final state, or more generally to IsCompleted. For example, a call to the Wait method on a Task will not return until that Task’s IsCompleted is true. And the continuations for a Task will not fire until that same condition is met.
![The meaning of TaskStatus The meaning of TaskStatus]()
Tagged: -net-4-0 • contests • default-aspx • media • microsoft • parallel • post • running • search • silverlight • task-parallel-library • three • tools
20.07.2009 at
6:32 pm · Saved under
.NET Help
Update: The survey is now closed. Thanks to all that participated!
We’ve heard our customers’ frustrations with asynchronous programming and their call for improved support. We are hoping to better understand why and how you and your customers use asynchronous programming in .NET and how the support we provide for it can be improved in the future.
We invite you to complete a short survey on the Asynchronous Programming Model (APM) as well as on a theoretical language construct called “Asynchronous Methods.” Completing this survey shouldn’t take more than 5-10 minutes of your time, and doing so will help us to better understand where the APM is lacking, how we can make asynchronous programming better and, ultimately, how we can increase your productivity and the scalability and reliability of your code. We encourage you to forward this message on to others who may also have had experience with asynchronous programming.
http://deploy.ztelligence.com/start/index.jsp?PIN=15TMLTB8M3X9C
Thank you!
Josh Phillips
![Asynchronous programming in .NET survey Asynchronous programming in .NET survey]()
Tagged: asynchronous • cancellation • contests • media • message-passing • microsoft • news • notification • parallel-extensions • programming • survey • tools
10.07.2009 at
6:31 pm · Saved under
.NET Help
On Wednesday, Microsoft announced the Imagine Cup 2009 worldwide winners:
http://www.microsoft.com/presspass/press/2009/jul09/07-07ImagineCup2009WinnersPR.mspx
Congratulations to Team Biollel that took first place in the Parallel Computing Award for their “Parallel implementation of maximum likelihood method of phylogenetic tree construction for protein amino acid sequences using Task Parallel Library”!
Tagged: cancellation • contests • imagine • jobs • microsoft • search • tags • wikipedia
10.07.2009 at
6:12 pm · Saved under
.NET Help
If you’re reading this post, you most likely have an interest in parallel or distributed computing, writing concurrent software, and the like. Take that interest a step further, and help us make the manycore era a successful reality by coming to work on the Parallel Computing Platform team at Microsoft.
We currently have several positions available:
If you’re interested in any of these, please submit your resume through the “Submit Resume” link on the relevant page. We look forward to receiving it!
Tagged: contests • default-aspx • development • jobs • news • notification • parallel • search • tags • wikipedia
24.06.2009 at
5:39 pm · Saved under
.NET Help
In concurrent programs, race conditions are a fact of life but they aren’t all bad. Sometimes, race conditions are benign, as is often the case with lazy initialization.
The problem with racing to set a value, however, is that it can result in multiple objects being instantiated when only one is needed. Take the LazyInitializer class that shipped in Visual Studio 2010 Beta 1, for example. The method EnsureInitialized(ref T target, Func<T> valueFactory) accepts a ByRef value and a function that produces a value. If target is null, EnsureInitialized will execute valueFactory and set target to the return value. If multiple threads happen to overlap calls to EnsureInitialized, which is likely to be a rare occurrence, these threads may both initially see target as null and then execute valueFactory. One thread will win the race and will get to set target.
In Beta 1, we thought we’d be really smart by disposing of the objects that were created by threads that lost the race, if they implemented IDisposable. Turns out that’s a pretty bad thing to do. With lazy initialization, we assumed that most of time valueFactory would be creating a new value but that isn’t always the case. It’s quite possible that a valueFactory could return an object that has already been created, in which case, we’d be disposing of an object we didn’t own.
We’re making a couple of changes in the Parallel Extensions to ensure that we generally don’t dispose of object we aren’t sure we own and, moving forward, any new APIs that do eagerly instantiate objects that might not be used will not dispose of said objects.
Now you might say, “why? Ninety-percent of the time I am creating a new object with my value factory and it might be a really heavy object like a file or wait handle!” We hear you, but typically, once an object is disposed, there is no bringing it back and so by disposing of these objects we don’t allow anyone to opt out of that behavior if need be. If we don’t dispose, the GC will still cleanup your unused objects and if you really need to dispose of an object manually, there is a work around, i.e.
ManualResetEvent theEvent = null;
ManualResetEvent tmpEvent;
LazyInitializer.EnsureInitialized(ref theEvent,
() => { return tmpEvent = new ManualResetEvent(false); });
if (tmpEvent != theEvent) tmpEvent.Close();
In fact, you could easily create a small wrapper function to do the work for you, i.e.
public static T EnsureInitializedWithDispose<T>(ref T target, Func<T> function)
where T : class, IDisposable
{
T tmp = default(T);
T actual = LazyInitializer.EnsureInitialized(ref target, () => tmp = function());
if (actual != tmp) ((IDisposable)tmp).Dispose();
return actual;
}
And the issue isn’t just about race conditions. In general, we should never dispose of an object we didn’t explicitly create. BlockingCollection<T> in Beta 1, for example, will dispose of it’s underlying collection if the BlockingCollection<T> itself is disposed. Again, this is bad news if you only were using the BlockingCollection<T> as a temporary wrapper. In future releases of Visual Studio 2010, this will be corrected.
So when using these APIs, and any other API that may consume or create an IDisposable object, think about what that API is doing with the object and if you need to clean up resources manually. If you’re not sure, the finalizer should take care of most of your problems. In the rare situations where immediate cleanup is important, make sure you keep track of the objects and dispose of them properly. Also, when designing your own libraries, don’t dispose of objects you don’t own!
![Don’t dispose of objects that you don’t own Don’t dispose of objects that you don’t own]()
Tagged: architecture • contests • duarte-nunes • jobs • microsoft • phillips-joshua • post • visual-studio • wikipedia
24.06.2009 at
3:26 pm · Saved under
.NET Help
In a previous post, it was demonstrated how for loops with very small loop bodies could be parallelized by creating an iterator over ranges, and then using Parallel.ForEach over those ranges. A similar technique can be used to write parallel loops over iteration spaces of non-integers. For example, let’s say I wanted to parallelize the following loop, where the iteration range is based on doubles:
for(double d = 0.0; d < 1.0; d += .001)
{
Process(d);
}
Parallel.For only contains overloads for where the iteration variable is an Int32 or an Int64. To accomodate doubles, one approach would be to translate the range into an integer-based range in order to use Parallel.For, and then within the body of the loop translate it into a double. As an example, the previously shown loop could be rewritten as:
Parallel.For(0, 1000, i =>
{
double d = i / 1000.0;
Process(d);
});
Due to floating point arithmetic, this may not be exactly the same, but it may be close enough. Another approach is to implement an iterator like the following:
private static IEnumerable<double> Iterate(
double fromInclusive, double toExclusive, double step)
{
for(double d = fromInclusive; d < toExclusive; d += step) yield return d;
}
With that Iterate method, now I can parallelize the sequential loop using Parallel.ForEach:
Parallel.ForEach(Iterate(0.0, 1.0, .001), d =>
{
Process(d);
});
This same technique can be applied to a wide variety of scenarios. Keep in mind, however, that the IEnumerator<T> interface isn’t thread-safe, which means that Parallel.ForEach needs to take locks when accessing the data source. While ForEach internally uses some smarts to try to ammortize the cost of such locks over the processing, this is still overhead that needs to be overcome by more work in the body of the ForEach in order for good speedups to be achieved.
Parallel.ForEach has optimizations used when working on indexible data sources, such as lists and arrays, and in those cases the need for locking is decreased. Thus, performance may actually be improved in some cases by transforming the iteration space into a list or an array, which can be done using LINQ, even though there is both time and memory cost associated with creating an array from an enumerable. For example:
Parallel.ForEach(Iterate(0.0, 1.0, .001).ToArray(), d =>
{
Process(d);
});
Happy coding.
Tagged: -net-4-0 • code-samples • contests • iteration • jobs • media • microsoft • news • parallel • programming • task-parallel-library • tools • wikipedia
22.06.2009 at
4:00 am · Saved under
.NET Help
One of the great features that crosses all of Parallel Extensions types is a consistent approach to cancellation (see http://blogs.msdn.com/pfxteam/archive/2009/05/22/9635790.aspx). In this post we explore some of the ways cancellation is used in Parallel Extensions and explain the guidance we developed.
The new cancellation system is a cooperative approach based on two new types: CancellationTokenSource, which initiates cancellation requests, and CancellationToken which communicates a cancellation request to asynchronous operations and to long-running and blocking method calls.
If you are experimenting with Parallel Extensions, you might like to keep an eye out for the methods that accept a CancellationToken and test them out.
Blocking calls
Some of the Parallel Extension types introduce blocking methods, for example BlockingCollection.Take() and Task.Wait(). The default overloads for these methods will block indefinitely if the condition they are waiting for never occurs. One solution used by these APIs, and many others like them, is to provide a timeout overload so that they may return after some duration and report that the condition did not occur. However, a timeout isn’t particularly convenient if you only want to stop waiting if a specific activity occurs, such as a user clicking a ‘cancel’ button. One possibility is to wait for some amount of time such as 100 milliseconds, check if the button was pressed, and if not then go back to waiting, but this puts the burden on the call-site and must be re-implemented repeatedly; and depending on the frequency of the polling, it may also add unnecessary cost. The types in Parallel Extensions use the new cancellation system and provide overloads that accept a CancellationToken which can cause early termination of blocking methods in response to a specific cancellation request.
Here are some examples of the blocking calls in Parallel Extensions that accept a CancellationToken:
BlockingCollection:
void Add(T item, CancellationToken cancellationToken)
T Take(CancellationToken cancellationToken)
ManualResetEventSlim:
void Wait(CancellationToken cancellationToken)
Task:
void Wait(CancellationToken cancellationToken)
void WaitAll(Task[] tasks, CancellationToken cancellationToken)
In each case, you supply a CancellationToken to the method, and if the CancellationToken is signaled via a call to the associated CancellationTokenSource.Cancel(), then the method will wake up and throw an OperationCanceledException. This frees you from the need to use the timeouts if you are happy to wait until the condition is true or a specific cancellation request occurs. Many of these methods also have overloads that accept both a CancellationToken and a timeout, so that both needs may be accommodated.
Long-running calls
There are also a variety of methods that may simply take a while to complete, particularly if large amounts of data or complex processing is taking place. For example, the following call is likely to take some time, even if individual calls to DoFunc(x) are relatively fast:
Parallel.For(1, 1000000000, (x) => DoFunc(x));
To assist with this, the Parallel APIs have overloads that accept a ParallelOptions class instance that holds a CancellationToken. The machinery inside Parallel.For(…)observes the supplied CancellationToken and will exit early if it sees that the token has be signaled (for more information on exiting loops early, see http://blogs.msdn.com/pfxteam/archive/2009/05/27/9645023.aspx). For example,
CancellationTokenSource cts = new CancellationTokenSource();
ParallelOptions options = new ParallelOptions
{CancellationToken = cts.Token};
Parallel.For(1, 100, options, (x) => DoSlowFunc(x));
// from another thread, call cts.Cancel() to request cancellation.
As you can see, the approach for canceling blocking calls and long-running calls is identical. In both cases you supply a CancellationToken to a specific method call and use the associated CancellationTokenSource to signal a cancellation request.
Digression: Internal cancellation
In certain situations in Parallel Extensions and in other systems, it is necessary to wake up a blocked method for reasons that aren’t due to explicit cancellation by a user. For example, if one thread is blocked on blockingCollection.Take() due to the collection being empty and another thread subsequently calls blockingCollection.CompleteAdding(), then the first call should wake up and throw an InvalidOperationException to represent an incorrect usage. A great way to implement this is to use an internal CancellationTokenSource that can wake things up due to internal concerns, and to link it to a second, external CancellationTokenSource that has been supplied by the user. For example, the following captures the essential details of how CompleteAdding() and Take(CancellationToken) are implemented:
class BlockingCollection<T>
{
CancellationTokenSource internalCTS = new CancellationTokenSource();
public void FinishAdding()
{
//…
internalCTS.Cancel();
}
public T Take(CancellationToken externalToken)
{
using(CancellationTokenSource linkedTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(
internalCTS.Token, externalToken)){
try
{
return InternalTake(linkedTokenSource.Token);
}
catch (OperationCanceledException oce)
{
if(internalCTS.Token.IsCancellationRequested)
throw new InvalidOperationException(”..msg..”);
else if(externalToken.IsCancellationRequested)
throw new OperationCanceledException(externalToken);
else throw;
}
}
}
In this fragment, a linked CancellationTokenSource is created and its token passed to the InternalTake() method. This simplifies the implementation for InternalTake() but still allows it to be woken up due to either the external token being signaled or an incorrect concurrent call to CompleteAdding(). If the InternalTake() method throws an OperationCanceledException, we can test the individual tokens to determine what the cause was and take appropriate action. Note that oce.CancellationToken will be equal to linkedTokenSource.Token as this is the token that was actually being observed, but we can look at the original tokens to determine the cause for the linked token being signaled. There is a minor risk of confusion if both sources are signaled simultaneously, but this confusion is benign as it is reasonable to behave as though either source was responsible, and a particular implementation may choose to prioritize one mechanism over the other.
Cooperative cancellation with user-code
Both TPL and PLINQ provide infrastructure that calls back to user code. If the user code is long running or blocking, then it is very useful for the user code to be able to respond to cancellation in a cooperative fashion. TPL and PLINQ achieve this by tracking a supplied CancellationToken, and if they see some user code throw an OperationCanceledException that mentions this specific token, then it is treated as cooperative acknowledgement of cancellation. This means that both TPL and PLINQ understand that the user code did not suffer some catastrophic exceptional conditional, but rather that it is simply responding to cancellation. As a result, TPL and PLINQ constructs will simply exit with an OperationCanceledException of their own, rather than aggregating exceptions from all the participating worker threads and throwing an AggregateException.
PLINQ follows this basic plan; for example, the following query will throw a single OperationCanceledException if the CancellationTokenSource becomes signaled. It doesn’t matter whether PLINQ or the user delegate sees the cancellation request first, as the user-delegate exception will be correctly understood by the PLINQ execution engine.
CancellationTokenSource cts = new CancellationTokenSource();
var resultArray =
data.AsParallel()
.WithCancellation(cts.Token)
.Select((x) =>
{
if (cts.Token.IsCancellationRequested)
throw new OperationCanceledException(cts.Token);
// …
})
.ToArray();
In both PLINQ and TPL, it is interesting to note that the external cancellation token is not supplied back to the user-delegate via a parameter as may be expected. This would have bloated the number of method overloads considerably and, as shown above, the external token can be conveniently accessed via closures or equivalent techniques.
If the user code throws an OperationCanceledException but one or more “normal” exceptions occur on other threads, then all of the exceptions will be collated into an AggregateException, including the OperationCanceledException. Hence, if only cancellation occurred then a single OperationCanceledException is thrown, but an actual failure will always result in an AggregateException.
PLINQ Queries
PLINQ offers deep support for cancellation and endeavors to ensure that cancellation will be enacted swiftly for any query. For queries that involve only simple and fast user delegates, simply supply a CancellationToken via the WithCancellation() method and PLINQ takes care of the rest. If your query involves long-running or blocking user code, then you should use the cooperative cancellation pattern described in the preceding section. For reasons that are discussed in the next section, PLINQ may not check for cancellation every time it calls a user-delegate, and so long running delegates should check cancellation frequently (as a working rule of thumb, PLINQ will check for cancellation approximately once per one hundred calls to a user delegate). For example, if a delegate call runs for 50ms but does not perform any cancellation checks of its own, then PLINQ may not detect the cancellation request itself for up to 5 seconds, and this could significantly affect a user’s experience. For this reason, we recommend that long-running delegates called from PLINQ queries should check for cancellation as frequently as possible. A good lower bound is to check cancellation once per few thousand IL instructions executed, and an upper bound is to check no less frequently than once per 10ms, but once per 1ms is preferred to ensure snappy response.
The following example demonstrates a user delegate that may run for a while, and so checks cancellation itself every few thousand IL instructions. This provides for timely cancellation support with a negligible cost to performance.
// Assume an external CancellationToken ‘token’ has been supplied.
int[] data = …;
var query =
data.AsParallel()
.WithCancellation(token)
.Select((x) =>
{
int result = 0;
for ( int i = 0; i < x; i++ ) {
for (int j = 0; j < 1000 * x; j++)
{
result += SimpleFunc(i,j);
}
token.ThrowIfCancellationRequested();
}
return result;})
.ToArray();
[Update: the call to token.ThrowIfCancellationRequested shown above is not available in Beta1 (but will appear in subsequent releases). This behavior can be acheived via a manual test-and-throw as shown in earlier examples, or via the extension-method approach described in the comments. -Mike]
Summary
The Parallel Extension types use the new cancellation system to provide consistent and rich cancellation features. All of the long-running and blocking methods in Parallel Extensions provide overloads that take a CancellationToken, and any callbacks made to user-code can participate in the process. Application code can also make use of the same ideas and facilities to ensure that cancellation can be a fully supported feature in any situation.
Tagged: cancellation • cds • contests • default-aspx • operation • parallel-extensions • post • silverlight • task-parallel-library • user
19.06.2009 at
1:43 pm · Saved under
.NET Help
As has been discussed previously, one of the new features in the Task Parallel Library is TaskCompletionSource<TResult>, which enables the creation of a Task<TResult> that represents any other asynchronous operation.
There are a wide variety of sources in the .NET Framework for asynchronous work. One comes from components that implement the Asynchronous Programming Model (APM) pattern, which we discussed here. Another includes types that implement the Event-based Asynchronous Pattern (EAP). For some synchronous method Xyz, the EAP provides an asynchronous counterpart XyzAsync. Calling this method launches the asynchronous work, and when the work completes, a corresponding XyzCompleted event is raised. (That’s an oversimplification of the pattern, but it provides a grounding.)
For many situations, the EAP is quite straightforward and simple to use. However, there are cases where you’d like to be able to do things like join across multiple EAP asynchronous invocations, such as to download three different web pages asynchronously, and only when all three have completed do something else. Tasks in .NET 4.0 make this kind of operation easy, through Task.Factory.ContinueWhenAll (or ContinueWhenAny if you want to do something when any one of the items completes rather than when all of them do). However, in order to use these methods, you need Tasks, thus to use them with components that implement the EAP, you need to create Tasks from EAP. TaskCompletionSource<TResult> can be used to do exactly that.
First, let’s create two small helper functions that we can reuse over and over for multiple EAP-to-Task implementations:
private static TaskCompletionSource<T> CreateSource<T>(object state)
{
return new TaskCompletionSource<T>(
state, TaskCreationOptions.DetachedFromParent);
}
private static void TransferCompletion<T>(
TaskCompletionSource<T> tcs, AsyncCompletedEventArgs e,
Func<T> getResult)
{
if (e.UserState == tcs)
{
if (e.Cancelled) tcs.TrySetCanceled();
else if (e.Error != null) tcs.TrySetException(e.Error);
else tcs.TrySetResult(getResult());
}
}
The first helper, CreateSource, simply creates an instance of TaskCompletionSource<T> with the right type and settings. I’ve included TaskCreationOptions.DetachedFromParent under the assumption that tasks created for the EAP pattern aren’t necessarily meant to participate in parent/child relationships, but you could certainly change this if your scenarios needed that functionality.
The second helper, TransferCompletion, will be used whenever an EAP event signals completion of an asynchronous operation. It takes the AsyncCompletedEventArgs provided through the XyzCompleted event and uses it to determine whether the operation completed due to cancellation or due to an unhandled exception. Both of those pieces of data are standard to the base AsyncCompletedEventArgs class. However, each EAP operation typically comes with its own type derived from AsyncCompletedEventArgs: to be able to use this one helper function with any of those to mine the result of the operation, TransferCompletion also accepts a Func<T> that will return the results from the derived instance.
With those two helpers, writing a Task wrapper for an EAP operation is a cinch. Consider System.Net.WebClient, which provides support for downloading and uploading to and from URIs, with methods like DownloadData. We can write a Task-based wrapper for DownloadData in just a few lines (in this case, as an extension method):
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
webClient.DownloadDataCompleted +=
(sender, e) => TransferCompletion(tcs, e, () => e.Result);
webClient.DownloadDataAsync(address, tcs);
return tcs.Task;
}
We first create a TaskCompletionSource<byte[]> whose Task<byte[]> will be returned from the method. We then register with the DownloadDataCompleted event handler, such that when the event is raised, the downloaded data (or exception or cancellation information) will be transferred to the returned Task. And then we start the operation. Piece of cake.
One interesting thing to note about the EAP is that some implementations support multiple asynchronous operations concurrently on the same instance. In such cases, a user-supplied token is needed to correlate the asynchronous invocations to the asynchronous completions (something that’s provided in Task implicitly by having a Task reference returned from and serving as a reference for an asynchronous operation). For that purpose, we pass the created TaskCompletionSource<TResult> as the user-supplied token, and in TransferCompletion, we only transfer the results if the token received matches the target completion source. If it doesn’t match, this is an event completion for another operation and should be ignored.
One potential issue with the previously shown implementation (and with the EAP pattern in general) is that, if they’re used over and over, the XyzCompleted event handlers will start to build up. Delegates are being registered with the event but not released. To fix that, we can also remove the event handler in the delegate handling the event. For example, DownloadDataTask can be rewritten as:
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address)
{
var tcs = CreateSource<byte[]>(address);
DownloadDataCompletedEventHandler handler = null;
handler = (sender, e) => {
TransferCompletionToTask(tcs, e, () => e.Result);
webClient.DownloadDataCompleted -= handler;
};
webClient.DownloadDataCompleted += handler;
try { webClient.DownloadDataAsync(address, tcs); }
catch {
webClient.DownloadDataCompleted -= handler;
tcs.TrySetCanceled();
throw;
}
return tcs.Task;
}
The Beta 1 samples available at http://code.msdn.microsoft.com/ParExtSamples already include Task-based extensions for WebClient, as well as extensions for other EAP implementations like SmtpClient and Ping. Download and enjoy!
![Tasks and the Event based Asynchronous Pattern Tasks and the Event based Asynchronous Pattern]()
Tagged: -net-4-0 • contests • download • eap • event • media • parallel • parallel-extensions • post • task-parallel-library • wikipedia
06.06.2009 at
3:27 pm · Saved under
.NET Help
The Parallel class represents a significant advancement in parallelizing managed loops. For many common scenarios, it just works, resulting in terrific speedups. However, while ideally Parallel.For could be all things to all people, such things rarely work out, and we’ve had to prioritize certain scenarios over others.
One area Parallel.For may fall a bit short is in attempts to use it with very small loop bodies, such as:
int [] array = new int[100000000];
Parallel.For(0, array.Length, i=>
{
array[i] = i*i*i;
});
Such an operation should be readily parallelizable; after all, every iteration is completely independent of every other iteration, and we’re dealing with a big data parallel problem. The devil is in the details, however. To handle typical scenarios where the time it takes to complete each iteration ends up being non-uniform, Parallel.For’s implementation takes a lot of care to load balance across all threads participating in the loop, and that load balancing comes at a small performance cost. This cost is typically trumped by the performance benefits that come from doing the load balancing; however, when the body of the loop is as tiny as it is in the example above, even small overheads add up. Another overhead that also contributes is the delegate invocation required to invoke the loop body. It can be easy to forget when looking at a Parallel.For call that Parallel.For is really just a method, accepting as a parameter a delegate to be invoked for every iteration. That invocation isn’t free, and in the above case may even be more expensive than the body of the loop itself.
Fear not, however, as there exist ways to still achieve good speedups on such cases. One way is based on creating larger chunks of work for Parallel to operate on: as the chunk size increases, the overhead costs start to pale in comparison, and speedups are realized.
Consider a new ForRange method you could implement:
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body);
Unlike For, which invokes the body once per iteration, ForRange will invoke the body with a start and end of a range. Thus, given an initial sequential loop like the following:
for(int i=0; i<N; i++)
{
DoWork(i);
}
with For it would be parallelized by replacing the for loop with a Parallel.For:
Parallel.For(0, N, i=>
{
DoWork(i);
});
and with ForRange, it would be parallelized by wrapping the for loop with a ForRange:
ForRange(0, N, (from,to) =>
{
for(int i=from; i<to; i++)
{
DoWork(i);
}
});
There are several ways we can now implement ForRange. The first is simply by doing a little math. We can calculate the boundaries of each range and use a Parallel.For to run the user-supplied body action for each range, e.g.
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body)
{
int numberOfRanges = Environment.ProcessorCount;
int range = toExclusive – fromInclusive;
int stride = range / numberOfRanges;
if (range <= 0) numberOfRanges = 0;
return Parallel.For(0, numberOfRanges, i => {
int start = i * stride;
int end = (i == numberOfRanges – 1) ? toExclusive : start + stride;
body(start, end);
});
}
Another way is actually by using Parallel.ForEach under the covers. Rather than doing the math as was done above, we can write an iterator in C# that yields the ranges, and then Parallel.ForEach over those ranges, e.g.
public static ParallelLoopResult ForRange(
int fromInclusive, int toExclusive, Action<int, int> body)
{
int rangeSize = (toExclusive – fromInclusive) / Environment.ProcessorCount;
if (rangeSize == 0) rangeSize = 1;
return Parallel.ForEach(
CreateRanges(fromInclusive, toExclusive, rangeSize), range =>
{
body(range.Item1, range.Item2);
});
}
private static IEnumerable<Tuple<int,int>> CreateRanges(
int fromInclusive, int toExclusive, int rangeSize)
{
// Enumerate all of the ranges
for (int i = fromInclusive; i < toExclusive; i += rangeSize)
{
int from = i, to = i + rangeSize;
if (to > toExclusive) to = toExclusive;
yield return Tuple.Create(from, to);
}
}
(You can download an implementation of ForRange as part of the Beta 1 samples at http://code.msdn.microsoft.com/ParExtSamples.)
In general, we expect the design and implementation of Parallel.For will be right for the vast majority of scenarios. However, solutions like those above can be used to accommodate cases that don’t quite fit the mold.
Tagged: code • contests • cost • default-aspx • media • parallel • parallelism-blockers • silverlight • threadpool • tools • toub • work