Today I want to
show you a few tricks for working with async in C#.
Run sync
methods asynchronously:
var t1 = Task.Factory.StartNew(() => Console.WriteLine("Task One"));
var t2 = Task.Run(() => Console.WriteLine("Task Two"));
In this example I have run two sync methods
asynchronously. For better understanding of these methods under the hood use
ThreadPool for running your sync methods within threads and do not block your
UI stuff. (TPL use ThreadPool or Thread because all logic related for running
your operation is applied in TaskSheduler class. By default .net uses ThreadPoolTaskScheduler).
Run async method synchronously:
AsyncHelper
There are a few ways how you can run your methods
asynchronously. The first variant was borrowed from AsyncHelper class. I suppose you can use this class as needed. You just
have to change modificatory from internal to public.
public static class AsyncHelper
{
private static readonly TaskFactory _myTaskFactory = new TaskFactory(CancellationToken.None,
TaskCreationOptions.None, TaskContinuationOptions.None, TaskScheduler.Default);
public static TResult RunSync<TResult>(Func<Task<TResult>> func)
{
var cultureUi = CultureInfo.CurrentUICulture;
var culture = CultureInfo.CurrentCulture;
return _myTaskFactory.StartNew(() =>
{
Thread.CurrentThread.CurrentCulture = culture;
Thread.CurrentThread.CurrentUICulture = cultureUi;
return func();
}).Unwrap().GetAwaiter().GetResult();
}
public static void RunSync(Func<Task> func)
{
var cultureUi = CultureInfo.CurrentUICulture;
var culture = CultureInfo.CurrentCulture;
_myTaskFactory.StartNew(() =>
{
Thread.CurrentThread.CurrentCulture = culture;
Thread.CurrentThread.CurrentUICulture = cultureUi;
return func();
}).Unwrap().GetAwaiter().GetResult();
}
}
And here is an example how to use this AsyncHelper.
class Program
{
static void Main(string[] args)
{
AsyncHelper.RunSync(() => DoAsyncStuff());
}
public static async Task DoAsyncStuff()
{
await Task.Factory.StartNew(() => Console.WriteLine("Task One"));
}
}
ConfigureAwait(false)
Another way to call you method synchronously is to use
the method ConfigureAwait(false) for discarding the context. So, without this,
very often you will have a deadlock if you try to call synchronous
method in async context.
When you await an async method that returns a Task or
a Task<T>, there is an implicit capture of the SynchronizationContext by
the TaskAwaitable being generated by the Task.GetAwaiter method.
Once that sync context is in place and the async
method call completes, the TaskAwaitable attempts to marshal the continuation
(which is basically the rest of the method calls after the first await keyword)
onto the SynchronizationContext (using SynchronizationContext.Post) which was
previously captured. If the calling thread is blocked, waiting on that same
method to finish, you have a deadlock.
public static async Task DoAsyncStuff()
{
await Task.Factory.StartNew(() => Console.WriteLine("Task One")).ConfigureAwait(false);
}
RunSynchronously
Ordinarily, tasks are executed asynchronously on a
thread pool or thread and do not block the calling thread. Tasks executed by
calling the RunSynchronously() method are associated with the current
TaskScheduler and are run on the calling thread. If the target scheduler does
not support running this task on the calling thread, the task will be scheduled
for execution on the schedule, and the calling thread will block until the task
has completed execution (MSDN).
var syncTask = new Task<long>(() => {
Console.WriteLine("Task {0} (syncTask) executing on Thread {1}",
Task.CurrentId,
Thread.CurrentThread.ManagedThreadId);
long sum = 0;
for (int ctr = 1; ctr <= 1000000; ctr++)
sum += ctr;
return sum;
});
syncTask.RunSynchronously();
RunSynchronously asks the scheduler to run it synchronously but then
the scheduler could very well ignore the hint and run it in a thread pool or thread
and your current thread will synchronously block until it is completed.
The scheduler does not have to run it on the current
thread and does not have to run it immediately although I think it is what will
happen on common schedulers (ThreadPoolTaskScheduler and common UI schedulers).
RunSynchronously will also throw an exception if the task has already
been started or is completed/faulted (this means you will not be able to use it
on async methods).
Unwrap “Magic”
Unwrap function creates a proxy Task that represents
the asynchronous operation of a TryExecuteTaskInline(Task,
Boolean). Let’s look at the example below.
static Task<string> DoWorkAsync()
{
return Task<String>.Factory.StartNew(() =>
{
//...
return "Work completed.";
});
}
static void StartTask()
{
Task<String> t = DoWorkAsync();
t.Wait();
Console.WriteLine(t.Result);
}
However, in some scenarios, you might want to create a
task within another task, and then return the nested task. In this case, the
TResult of the enclosing task is itself a task. In the following example, the
Result property is a Task<Task<string>>.
var task = Task<String>.Factory.StartNew(() => "Work completed.")
.ContinueWith(s => Task<String>.Factory.StartNew(() => "More Work completed."));
// Outputs: System.Threading.Tasks.Task`1[System.String]
Console.WriteLine(task.Result);
So, if we want to get the result “More Work completed”,
we can use the method Unwrap.
var task = Task<String>.Factory.StartNew(() => "Work completed.")
.ContinueWith(s => Task<String>.Factory.StartNew(() => "More Work completed.")).Unwrap();
// Outputs "More Work completed."
Console.WriteLine(task.Result);
Below you will find some useful example of Unwrap
function with Lazy class. Lazy loading implementation for tasks.
public class LazyAsync<T> : Lazy<Task<T>>
{
public LazyAsync(Func<T> func) : base(() => Task.Run(func)){ }
public LazyAsync(Func<Task<T>> func) : base(() => Task.Factory.StartNew(() => func()).Unwrap()) { }
}
And here is an example how we can use it in your code.
static async void StartTask()
{
LazyAsync<string> lazy = new LazyAsync<string>(async () => await Task.Factory.StartNew(() => "Hello World"));
var result = await lazy.Value;
//Outputs: Hello World
Console.WriteLine(result);
}
Making sync operation Task compatible.
Sometimes, when we implemented all our code with
asynchronous methods, and to handle the same way through our code, it would be
necessary to wrap our synchronous methods that return Task or
Task<TResult>. Here is the way how I actually did such thing for .Net
Framework 4.0.
private Task<byte[]> DownloadFileTask()
{
var op = new TaskCompletionSource<byte[]>();
var webClint = new WebClient();
webClint.DownloadDataCompleted += (sender, args) =>
{
if(args.Error != null)
op.SetException(args.Error);
else if(args.Cancelled)
op.SetCanceled();
else
op.SetResult(args.Result);
};
webClint.DownloadDataAsync(new Uri(""));
return op.Task;
}
For .Net Framework 4.5 and higher we can use this code
instead of the code above.
public static class TaskAsyncHelper<T>
{
public static Task<T> WrapSync(Func<T> func)
{
var op = new TaskCompletionSource<T>();
Task.Run(() => {
try
{
var result = func();
op.SetResult(result);
}
catch (Exception ex)
{
op.SetException(ex);
}
});
return op.Task;
}
public static Task WrapSync(Action action, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
try
{
action();
return Task.FromResult(0);
}
catch (Exception e)
{
return Task.FromException(e);
}
}
}
Both these variants run your synchronous operation
under the hood. The first one uses TaskCompletitionSource<T> as a wrapper
over sync function. The second one just simulates Task base operation.
Pattern “WaitAllOneByOne”
This pattern is used to execute the task in a more
productive way, such as on a multiprocessor machine. What’s beneficial, instead
of the Task.WaitAll method, WaitAllOneByOne uses the Task.WaitAny method. How
it looks like:
This pattern works as follows. First, let's create an
empty list of tasks. Then we fill this list with tasks that will be performed. In
the loop we need to check the list for emptiness, and until it is not empty,
wait for the first job to be completed and remove that job from the list. How
it works:
var tasks = new List<Task>();
//Number of creating tasks
int taskCount = 100;
var numberCores = Environment.ProcessorCount;
for (int i = 0; i < numberCores; i++)
{
var task = Task.Factory.StartNew(DoSomeLongWork, TaskCreationOptions.LongRunning);
tasks.Add(task);
}
while (tasks.Count > 0)
{
int index = Task.WaitAny(tasks.ToArray());
tasks.RemoveAt(index);
taskCount--;
if (taskCount > 0)
{
var task = Task.Factory.StartNew(DoSomeLongWork, TaskCreationOptions.LongRunning);
tasks.Add(task);
}
}
In this sample, WaitAllOneByOne is optimized to use
multi-processor environment. Alternative way: you can use Task.WhenAny instead
of Task.WaitAny for working with the list. Below you will find a class WaitAllOneByOne
which allows you to perform the same pattern, however in more OOP way.
public class WaitAllOneByOne<TResult>
{
private readonly List<Task<TResult>> _tasks;
public event Action<TResult> ProcessResultEvent;
public event Action<Exception> HandleExceptionEvent;
public WaitAllOneByOne(List<Func<TResult>> actions) : this(actions.Select(act => new Task<TResult>(act)).ToList()){ }
public WaitAllOneByOne(List<Task<TResult>> tasks)
{
_tasks = tasks;
}
public async Task StartTasks()
{
_tasks.ForEach(t => t.Start());
while (_tasks.Count > 0)
{
var t = await Task.WhenAny(_tasks);
_tasks.Remove(t);
try { ProcessResultEvent(await t); }
catch (OperationCanceledException) { }
catch (Exception exc) { HandleExceptionEvent(exc); }
}
}
}
If you do not like events such as developed in my
class, you can easily substitute it for the delegates and pass them as
parameters to WaitAllOneByOne constructor.
NB! Functionally,
this is fine, and as long as the number of tasks is small, the performance of
this should be fine, as well. However,
if the number of tasks is large here, this could result in non-negligible
performance overheads. What we’ve
effectively created here is an O(N2) algorithm: for each task, we search the
list for the task to remove it, which is an O(N) operation, and we register a
continuation with each task, which is also an O(N) operation.
Long running Tasks
Let’s take a look at the code below and try to guess
what is wrong here:
var task = Task.Factory.StartNew(DoSomeLongWork);
To understand what is wrong here it would be good to
know what happens here on the inside. By default, our Task class
uses ThreadPoolTaskScheduler for running our async methods. I’ve added some
piece of code from referencesource:
/// <summary>
/// Schedules a task to the ThreadPool.
/// </summary>
/// <param name="task">The task to schedule.</param>
[SecurityCritical]
protected internal override void QueueTask(Task task)
{
if ((task.Options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
}
}
So, here you see that when we try to run long-running
operation and create a task with task options TaskCreationOptions.LongRunning,
ThreadPoolTaskScheduler will use Thread, otherwise it will use ThreadPool.
So, the question is still actual: why it’s wrong to
use ThreadPool for long running operation. I would suggest to take a look more
deeply at ThreadPool. ThreadPool
thread is lightweight and cheap since could be rescheduled to serve a new
request from the ThreadPool, so as soon as thread operation is done, ThreadPool
could reschedule the same thread for other operation, also you can manipulate
by minimum threads count (ThreadPool.SetMinThreads()), so those would be alive
until new requests are come. So, this is a good solution for multiple
lightweight operations, for instance you need to create a separate/new thread
each few seconds. Once the minimum number of threads is reached, the thread
pool aims to limit the number of threads being created to one per 500
milliseconds. This is an intelligent mechanism, avoiding the expensive cost of
creating a new thread when multiple thread pool threads may be released within
that time period.
If you need to run a long running operation with Task,
please do not forget to use this option TaskCreationOptions.LongRunning in your
task constructor.
Conclusion
I hope this article will help you to implement your
asynchronous solution in more efficient way, or maybe you will find some
interesting facts about tasks and async\await keyword. The idea of article is
to share my experience and source code to make your job easier. Please do not
hesitate to share your feedback about this article and what can be improved
there.
No comments:
Post a Comment