Tuesday, February 25, 2020

C# Async Trics


Today I want to show you a few tricks for working with async in C#.

Run sync methods asynchronously:

var t1 = Task.Factory.StartNew(() => Console.WriteLine("Task One"));

var t2 = Task.Run(() => Console.WriteLine("Task Two"));

In this example I have run two sync methods asynchronously. For better understanding of these methods under the hood use ThreadPool for running your sync methods within threads and do not block your UI stuff. (TPL use ThreadPool or Thread because all logic related for running your operation is applied in TaskSheduler class. By default .net uses ThreadPoolTaskScheduler).

Run async method synchronously:

AsyncHelper

There are a few ways how you can run your methods asynchronously. The first variant was borrowed from AsyncHelper class. I suppose you can use this class as needed. You just have to change modificatory from internal to public. 

public static class AsyncHelper
{
    private static readonly TaskFactory _myTaskFactory = new TaskFactory(CancellationToken.None,
        TaskCreationOptions.NoneTaskContinuationOptions.NoneTaskScheduler.Default);

    public static TResult RunSync<TResult>(Func<Task<TResult>> func)
    {
        var cultureUi = CultureInfo.CurrentUICulture;
        var culture = CultureInfo.CurrentCulture;
        return _myTaskFactory.StartNew(() =>
        {
            Thread.CurrentThread.CurrentCulture = culture;
            Thread.CurrentThread.CurrentUICulture = cultureUi;
            return func();
        }).Unwrap().GetAwaiter().GetResult();
    }

    public static void RunSync(Func<Taskfunc)
    {
        var cultureUi = CultureInfo.CurrentUICulture;
        var culture = CultureInfo.CurrentCulture;
        _myTaskFactory.StartNew(() =>
        {
            Thread.CurrentThread.CurrentCulture = culture;
            Thread.CurrentThread.CurrentUICulture = cultureUi;
            return func();
        }).Unwrap().GetAwaiter().GetResult();
    }
}

And here is an example how to use this AsyncHelper.

class Program
{
    static void Main(string[] args)
    {
        AsyncHelper.RunSync(() => DoAsyncStuff());
    }

    public static async Task DoAsyncStuff()
    {
        await Task.Factory.StartNew(() => Console.WriteLine("Task One"));
    }
}

ConfigureAwait(false)

Another way to call you method synchronously is to use the method ConfigureAwait(false) for discarding the context. So, without this, very often you will have a deadlock if you try to call synchronous method in async context.
When you await an async method that returns a Task or a Task<T>, there is an implicit capture of the SynchronizationContext by the TaskAwaitable being generated by the Task.GetAwaiter method.

Once that sync context is in place and the async method call completes, the TaskAwaitable attempts to marshal the continuation (which is basically the rest of the method calls after the first await keyword) onto the SynchronizationContext (using SynchronizationContext.Post) which was previously captured. If the calling thread is blocked, waiting on that same method to finish, you have a deadlock.

public static async Task DoAsyncStuff()
{
    await Task.Factory.StartNew(() => Console.WriteLine("Task One")).ConfigureAwait(false);
}

RunSynchronously

Ordinarily, tasks are executed asynchronously on a thread pool or thread and do not block the calling thread. Tasks executed by calling the RunSynchronously() method are associated with the current TaskScheduler and are run on the calling thread. If the target scheduler does not support running this task on the calling thread, the task will be scheduled for execution on the schedule, and the calling thread will block until the task has completed execution (MSDN).

var syncTask = new Task<long>(() => {
    Console.WriteLine("Task {0} (syncTask) executing on Thread {1}",
                    Task.CurrentId,
                    Thread.CurrentThread.ManagedThreadId);
    long sum = 0;
    for (int ctr = 1ctr <= 1000000ctr++)
        sum += ctr;
    return sum;
});
syncTask.RunSynchronously();

RunSynchronously asks the scheduler to run it synchronously but then the scheduler could very well ignore the hint and run it in a thread pool or thread and your current thread will synchronously block until it is completed.
The scheduler does not have to run it on the current thread and does not have to run it immediately although I think it is what will happen on common schedulers (ThreadPoolTaskScheduler and common UI schedulers).
RunSynchronously will also throw an exception if the task has already been started or is completed/faulted (this means you will not be able to use it on async methods).

Unwrap “Magic”

Unwrap function creates a proxy Task that represents the asynchronous operation of a TryExecuteTaskInline(Task, Boolean). Let’s look at the example below.

static Task<stringDoWorkAsync()
{
    return Task<String>.Factory.StartNew(() =>
    {
        //...
        return "Work completed.";
    });
}

static void StartTask()
{
    Task<Stringt = DoWorkAsync();
    t.Wait();
    Console.WriteLine(t.Result);
}

However, in some scenarios, you might want to create a task within another task, and then return the nested task. In this case, the TResult of the enclosing task is itself a task. In the following example, the Result property is a Task<Task<string>>.

var task = Task<String>.Factory.StartNew(() => "Work completed.")
    .ContinueWith(s => Task<String>.Factory.StartNew(() => "More Work completed."));
// Outputs: System.Threading.Tasks.Task`1[System.String]
Console.WriteLine(task.Result);

So, if we want to get the result “More Work completed”, we can use the method Unwrap.

var task = Task<String>.Factory.StartNew(() => "Work completed.")
    .ContinueWith(s => Task<String>.Factory.StartNew(() => "More Work completed.")).Unwrap();
// Outputs "More Work completed."
Console.WriteLine(task.Result);

Below you will find some useful example of Unwrap function with Lazy class. Lazy loading implementation for tasks.

public class LazyAsync<T> : Lazy<Task<T>>
{
    public LazyAsync(Func<Tfunc) : base(() => Task.Run(func)){ }

    public LazyAsync(Func<Task<T>> func) : base(() => Task.Factory.StartNew(() => func()).Unwrap()) { }
}

And here is an example how we can use it in your code.

static async void StartTask()
{
    LazyAsync<stringlazy = new LazyAsync<string>(async () => await Task.Factory.StartNew(() => "Hello World"));

    var result = await lazy.Value;

    //Outputs: Hello World
    Console.WriteLine(result);
}

Making sync operation Task compatible.

Sometimes, when we implemented all our code with asynchronous methods, and to handle the same way through our code, it would be necessary to wrap our synchronous methods that return Task or Task<TResult>. Here is the way how I actually did such thing for .Net Framework 4.0.

private Task<byte[]> DownloadFileTask()
{
    var op = new TaskCompletionSource<byte[]>();
    var webClint = new WebClient();
    webClint.DownloadDataCompleted += (senderargs) =>
        {
            if(args.Error != null)
                op.SetException(args.Error);
            else if(args.Cancelled)
                op.SetCanceled();
            else
                op.SetResult(args.Result);
        };

    webClint.DownloadDataAsync(new Uri(""));
    return op.Task;
}

For .Net Framework 4.5 and higher we can use this code instead of the code above.

public static class TaskAsyncHelper<T>
{
    public static Task<TWrapSync(Func<Tfunc)
    {
        var op = new TaskCompletionSource<T>();

        Task.Run(() => {
            try
            {
                var result = func();
                op.SetResult(result);
            }
            catch (Exception ex)
            {
                op.SetException(ex);
            }
        });

        return op.Task;
    }

    public static Task WrapSync(Action actionCancellationToken cancellationToken)
    {
        if (cancellationToken.IsCancellationRequested)
        {
            return Task.FromCanceled(cancellationToken);
        }

        try
        {
            action();
            return Task.FromResult(0);
        }
        catch (Exception e)
        {
            return Task.FromException(e);
        }
    }
}

Both these variants run your synchronous operation under the hood. The first one uses TaskCompletitionSource<T> as a wrapper over sync function. The second one just simulates Task base operation.

Pattern “WaitAllOneByOne”

This pattern is used to execute the task in a more productive way, such as on a multiprocessor machine. What’s beneficial, instead of the Task.WaitAll method, WaitAllOneByOne uses the Task.WaitAny method. How it looks like:
This pattern works as follows. First, let's create an empty list of tasks. Then we fill this list with tasks that will be performed. In the loop we need to check the list for emptiness, and until it is not empty, wait for the first job to be completed and remove that job from the list. How it works:

var tasks = new List<Task>();
//Number of creating tasks
int taskCount = 100;

var numberCores = Environment.ProcessorCount;
for (int i = 0i < numberCoresi++)
{
    var task = Task.Factory.StartNew(DoSomeLongWorkTaskCreationOptions.LongRunning);
    tasks.Add(task);
}

while (tasks.Count > 0)
{
    int index = Task.WaitAny(tasks.ToArray());
    tasks.RemoveAt(index);

    taskCount--;

    if (taskCount > 0)
    {
        var task = Task.Factory.StartNew(DoSomeLongWorkTaskCreationOptions.LongRunning);
        tasks.Add(task);
    }
}

In this sample, WaitAllOneByOne is optimized to use multi-processor environment. Alternative way: you can use Task.WhenAny instead of Task.WaitAny for working with the list. Below you will find a class WaitAllOneByOne which allows you to perform the same pattern, however in more OOP way.

public class WaitAllOneByOne<TResult>
{
    private readonly List<Task<TResult>> _tasks;

    public event Action<TResultProcessResultEvent;
    public event Action<ExceptionHandleExceptionEvent;

    public WaitAllOneByOne(List<Func<TResult>> actions) : this(actions.Select(act => new Task<TResult>(act)).ToList()){ }
    public WaitAllOneByOne(List<Task<TResult>> tasks)
    {
        _tasks = tasks;
    }

    public async Task StartTasks()
    {
        _tasks.ForEach(t => t.Start());

        while (_tasks.Count > 0)
        {
            var t = await Task.WhenAny(_tasks);
            _tasks.Remove(t);

            try { ProcessResultEvent(await t); }
            catch (OperationCanceledException) { }
            catch (Exception exc) { HandleExceptionEvent(exc); }
        }
    }
}

If you do not like events such as developed in my class, you can easily substitute it for the delegates and pass them as parameters to WaitAllOneByOne constructor.

NB! Functionally, this is fine, and as long as the number of tasks is small, the performance of this should be fine, as well.  However, if the number of tasks is large here, this could result in non-negligible performance overheads.  What we’ve effectively created here is an O(N2) algorithm: for each task, we search the list for the task to remove it, which is an O(N) operation, and we register a continuation with each task, which is also an O(N) operation.

Long running Tasks

Let’s take a look at the code below and try to guess what is wrong here:

var task = Task.Factory.StartNew(DoSomeLongWork);

To understand what is wrong here it would be good to know what happens here on the inside. By default, our Task class uses ThreadPoolTaskScheduler for running our async methods. I’ve added some piece of code from referencesource:

/// <summary>
/// Schedules a task to the ThreadPool.
/// </summary>
/// <param name="task">The task to schedule.</param>
[SecurityCritical]
protected internal override void QueueTask(Task task)
{
    if ((task.Options & TaskCreationOptions.LongRunning) != 0)
    {
        // Run LongRunning tasks on their own dedicated thread.
        Thread thread = new Thread(s_longRunningThreadWork);
        thread.IsBackground = true// Keep this thread from blocking process shutdown
        thread.Start(task);
    }
    else
    {
        // Normal handling for non-LongRunning tasks.
        bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
        ThreadPool.UnsafeQueueCustomWorkItem(taskforceToGlobalQueue);
    }
}

So, here you see that when we try to run long-running operation and create a task with task options TaskCreationOptions.LongRunning, ThreadPoolTaskScheduler will use Thread, otherwise it will use ThreadPool.
So, the question is still actual: why it’s wrong to use ThreadPool for long running operation. I would suggest to take a look more deeply at ThreadPool.  ThreadPool thread is lightweight and cheap since could be rescheduled to serve a new request from the ThreadPool, so as soon as thread operation is done, ThreadPool could reschedule the same thread for other operation, also you can manipulate by minimum threads count (ThreadPool.SetMinThreads()), so those would be alive until new requests are come. So, this is a good solution for multiple lightweight operations, for instance you need to create a separate/new thread each few seconds. Once the minimum number of threads is reached, the thread pool aims to limit the number of threads being created to one per 500 milliseconds. This is an intelligent mechanism, avoiding the expensive cost of creating a new thread when multiple thread pool threads may be released within that time period.
If you need to run a long running operation with Task, please do not forget to use this option TaskCreationOptions.LongRunning in your task constructor.

Conclusion
I hope this article will help you to implement your asynchronous solution in more efficient way, or maybe you will find some interesting facts about tasks and async\await keyword. The idea of article is to share my experience and source code to make your job easier. Please do not hesitate to share your feedback about this article and what can be improved there.