04.08.2009 at
4:55 pm · Saved under
.NET Help
In this post, we’ll investigate some ways that Parallel Extensions can be used to introduce parallelism and asynchrony to I/O scenarios.
Here’s a simple scenario. I want to retrieve data from a number of web resources.
static string[] Resources = new string[]
{
“http://www.microsoft.com”, “http://www.msdn.com”,
“http://www.msn.com”, “http://www.bing.com”
};
Using the WebClient class, I might end up with the following.
var data = new List<byte[]>();
var wc = new WebClient();
foreach (string resource in Resources)
{
data.Add(wc.DownloadData(resource));
}
// Use the data.
However, these days, downloading data from the web usually utilizes only a small fraction of my available bandwidth. So there are potential performance gains here, and with TPL’s parallel ForEach loop, they are easily had.
var data = new ConcurrentBag<byte[]>();
Parallel.ForEach(Resources, resource =>
{
data.Add((new WebClient()).DownloadData(resource));
});
// Use the data.
Note that WebClient instances do not support multiple pending asynchronous operations (and the class is not thread-safe), so I need a separate instance for each operation. Also, since the normal BCL collections (List<T>, etc.) are not thread-safe, I need something like ConcurrentBag<T> to store the results. Of course, storing all the data in a collection assumes the scenario requires that all retrieval operations complete before processing. If this was not the case, I could start processing each data chunk right after obtaining it right in the loop, exploiting more parallelism. However, for the purposes of this investigation, I wanted to determine the possible performance gains in the absence of CPU-intensive work.
As it turns out, the above often yields linear speedup against sequential, with some variation due to the inconsistent nature of web site response times. And it was pretty straightforward. However, things would have been even easier had I started out with a “LINQ” frame of mind. First, I can convert my original sequential code to a LINQ query. Then, I can turn it into PLINQ using the AsParallel method and use WithDegreeOfParallelism to control the number of concurrent retrievals.
var data =
from resource in Resources
.AsParallel()
.WithDegreeOfParallelism(numConcurrentRetrievals)
select (new WebClient()).DownloadData(resource);
// Sometime later…
foreach (byte[] result in data) { }
(As an aside, it’s worth noting that WithDegreeOfParallelism causes PLINQ to use exactly numConcurrentRetrivals Tasks. This differs from the MaxDegreeOfParallelism option that I could have used with my previous Parallel.ForEach code, because that option sets the maximum; the actual number of threads still depends on the ThreadPool’s thread-adjusting logic.)
This code offers enhanced readability and makes storing the data easier. In addition, I can continue on the main thread, as PLINQ queries do not execute until the data they represent is accessed – that is, when MoveNext is called on the relevant enumerator. However, in this particular case, I don’t want to delay my query’s execution until I need the data; I actually want to execute my query while continuing on the main thread. To do so, I can wrap my query in a Task and force its immediate execution using ToArray.
var t = Task.Factory.StartNew(() =>
{
return
from resource in Resources
.AsParallel()
.WithDegreeOfParallelism(numConcurrentRetrievals)
select (new WebClient()).DownloadData(resource).ToArray();
});
// Sometime later…
foreach (byte[] result in t.Result) { }
// OR, use a continuation
t.ContinueWith(dataTask =>
{
foreach (byte[] result in dataTask.Result) { }
});
Now, I’ve got asynchrony, and I still get similar speedup. However, there’s still something about this code that is not ideal. The work (sending off download requests and blocking) requires almost no CPU, but it is being done by ThreadPool threads since I’m using the default scheduler. Ideally, threads should only be used for CPU-bound work (when there’s actually work to do). Of course, this probably won’t matter much for most typical client applications, but in scenarios where resources are tight, it could be a serious issue. Therefore, it’s worth investigating how we might reduce the number of blocked threads, perhaps by not using threads at all where possible.
To achieve this, I’ll be using ideas from a previous post: Tasks and the Event-based Asynchronous Pattern. That article explained how to create a Task<TResult> from any type that implements the EAP, and it presented an extension method for WebClient (available along with many others in the ParallelExtensionsExtras):
public static Task<byte[]> DownloadDataTask(
this WebClient webClient, Uri address);
The key point is that this method produces a Task<TResult> by integrating WebClient’s EAP implementation with a TaskCompletionSource<TResult>, and I can use it to rewrite my scenario.
var tasks = new Queue<Task<byte[]>>();
foreach (string resource in Resources)
{
WebClient wc = new WebClient();
tasks.Enqueue(wc.DownloadDataTask(new Uri(resource)));
}
// Sometime later…
while (tasks.Count > 0)
{
byte[] result = tasks.Dequeue().Result;
}
// OR, use a continuation
Task<byte[]>.Factory.ContinueWhenAll(tasks.ToArray(), dataTasks =>
{
foreach (var dataTask in dataTasks)
{
byte[] result = dataTask.Result;
}
});
With this, I’ve got a solution that uses parallelism for speed-up, is asynchronous, and does not burn more threads than necessary!
To recap, in this post, we considered a typical I/O scenario. First, we saw how easy it was to arrive at solutions that are better than the sequential one. Then, we delved deeper to discover a more complex solution (integrating EAP with Tasks) that offers even more benefits.
24.06.2009 at
5:39 pm · Saved under
.NET Help
In concurrent programs, race conditions are a fact of life but they aren’t all bad. Sometimes, race conditions are benign, as is often the case with lazy initialization.
The problem with racing to set a value, however, is that it can result in multiple objects being instantiated when only one is needed. Take the LazyInitializer class that shipped in Visual Studio 2010 Beta 1, for example. The method EnsureInitialized(ref T target, Func<T> valueFactory) accepts a ByRef value and a function that produces a value. If target is null, EnsureInitialized will execute valueFactory and set target to the return value. If multiple threads happen to overlap calls to EnsureInitialized, which is likely to be a rare occurrence, these threads may both initially see target as null and then execute valueFactory. One thread will win the race and will get to set target.
In Beta 1, we thought we’d be really smart by disposing of the objects that were created by threads that lost the race, if they implemented IDisposable. Turns out that’s a pretty bad thing to do. With lazy initialization, we assumed that most of time valueFactory would be creating a new value but that isn’t always the case. It’s quite possible that a valueFactory could return an object that has already been created, in which case, we’d be disposing of an object we didn’t own.
We’re making a couple of changes in the Parallel Extensions to ensure that we generally don’t dispose of object we aren’t sure we own and, moving forward, any new APIs that do eagerly instantiate objects that might not be used will not dispose of said objects.
Now you might say, “why? Ninety-percent of the time I am creating a new object with my value factory and it might be a really heavy object like a file or wait handle!” We hear you, but typically, once an object is disposed, there is no bringing it back and so by disposing of these objects we don’t allow anyone to opt out of that behavior if need be. If we don’t dispose, the GC will still cleanup your unused objects and if you really need to dispose of an object manually, there is a work around, i.e.
ManualResetEvent theEvent = null;
ManualResetEvent tmpEvent;
LazyInitializer.EnsureInitialized(ref theEvent,
() => { return tmpEvent = new ManualResetEvent(false); });
if (tmpEvent != theEvent) tmpEvent.Close();
In fact, you could easily create a small wrapper function to do the work for you, i.e.
public static T EnsureInitializedWithDispose<T>(ref T target, Func<T> function)
where T : class, IDisposable
{
T tmp = default(T);
T actual = LazyInitializer.EnsureInitialized(ref target, () => tmp = function());
if (actual != tmp) ((IDisposable)tmp).Dispose();
return actual;
}
And the issue isn’t just about race conditions. In general, we should never dispose of an object we didn’t explicitly create. BlockingCollection<T> in Beta 1, for example, will dispose of it’s underlying collection if the BlockingCollection<T> itself is disposed. Again, this is bad news if you only were using the BlockingCollection<T> as a temporary wrapper. In future releases of Visual Studio 2010, this will be corrected.
So when using these APIs, and any other API that may consume or create an IDisposable object, think about what that API is doing with the object and if you need to clean up resources manually. If you’re not sure, the finalizer should take care of most of your problems. In the rare situations where immediate cleanup is important, make sure you keep track of the objects and dispose of them properly. Also, when designing your own libraries, don’t dispose of objects you don’t own!
![Don’t dispose of objects that you don’t own Don’t dispose of objects that you don’t own]()
24.06.2009 at
3:26 pm · Saved under
.NET Help
In a previous post, it was demonstrated how for loops with very small loop bodies could be parallelized by creating an iterator over ranges, and then using Parallel.ForEach over those ranges. A similar technique can be used to write parallel loops over iteration spaces of non-integers. For example, let’s say I wanted to parallelize the following loop, where the iteration range is based on doubles:
for(double d = 0.0; d < 1.0; d += .001)
{
Process(d);
}
Parallel.For only contains overloads for where the iteration variable is an Int32 or an Int64. To accomodate doubles, one approach would be to translate the range into an integer-based range in order to use Parallel.For, and then within the body of the loop translate it into a double. As an example, the previously shown loop could be rewritten as:
Parallel.For(0, 1000, i =>
{
double d = i / 1000.0;
Process(d);
});
Due to floating point arithmetic, this may not be exactly the same, but it may be close enough. Another approach is to implement an iterator like the following:
private static IEnumerable<double> Iterate(
double fromInclusive, double toExclusive, double step)
{
for(double d = fromInclusive; d < toExclusive; d += step) yield return d;
}
With that Iterate method, now I can parallelize the sequential loop using Parallel.ForEach:
Parallel.ForEach(Iterate(0.0, 1.0, .001), d =>
{
Process(d);
});
This same technique can be applied to a wide variety of scenarios. Keep in mind, however, that the IEnumerator<T> interface isn’t thread-safe, which means that Parallel.ForEach needs to take locks when accessing the data source. While ForEach internally uses some smarts to try to ammortize the cost of such locks over the processing, this is still overhead that needs to be overcome by more work in the body of the ForEach in order for good speedups to be achieved.
Parallel.ForEach has optimizations used when working on indexible data sources, such as lists and arrays, and in those cases the need for locking is decreased. Thus, performance may actually be improved in some cases by transforming the iteration space into a list or an array, which can be done using LINQ, even though there is both time and memory cost associated with creating an array from an enumerable. For example:
Parallel.ForEach(Iterate(0.0, 1.0, .001).ToArray(), d =>
{
Process(d);
});
Happy coding.