Monday, November 11, 2013

Использование тасков в C#

В этой статье я затрону одну из моих любимых тем, а именно − использование Task в C#. Класс Task представляет собой обертку над потоками для выполнения асинхронных операций. Он очень удобен тем что программисту нужно перестать мыслить о том как создать поток, когда его создать, когда запустить , как уничтожить поток. За вас всю эту роботу выполняют таски. Ну что же приступим. Есть несколько способов создать экземпляр класса Task. Наиболее распространенный подход с использованием свойства Factory задачи для извлечения экземпляра TaskFactory, который можно использовать для создания задачи для нескольких целей. Например, для создания задачи Task, выполняющей действие, можно использовать фабричный метод StartNew. В основном часто используемым является типизированный возврат значений из таска с помощью Task<TResult> . В .Net Framework 4.0 и выше основой асинхронного и параллельного программирования есть Tasks и TPL (Task Parallel Library). 
Другой подход разработки асинхронных программ основан на:
  • Threads;
  • Async Programing Model (APM представлена с помощью вызова асинхронных методов делегата BeginInvoke/EndInvoke);
  • Event-based Asynchronous Pattern (примером может служить реализация BackgroundWorker);
  • ThreadPool.QueueUserWorkItem.
Приступим к началу работы с тасками. Рассмотрим создание тасков, а также их использование. Класс Task расположен в пространстве имен
using System.Threading.Tasks;
Основная конструкция работы с классом таск выглядит следующим образом:
var task = new Task(() => { });
task.Start();
В конструктор класс Task принимает делегат Action, то есть вместо него Вы можете подставить любую функцию, которая возвращает void. Для Task<TResult> первым параметром идет делегат Func<>. С принципами работы этих делегатов можете ознакомиться в моей статье о делегатах. Для примера я подставил анонимный метод. Вы же можете подставить свое решение.
var task = new Task(code);
После вызова метода Start() выполнения метода, переданного в класс Task, выполняется немедленно.

Более эффективным есть создание Task и запуск его одним методом, не вызывая метод Start. Для этого можно воспользоваться свойством Factory класса Task, о котором я упоминал выше. Вот как выглядит такой код:
var task = Task.Factory.StartNew(code);
где code − это функция пользователя, которая выполняет некие действия параллельно. 
Интересная особенность класса Task заключается в том, что он спроектирован так, чтобы распределять нагрузку по выполняемым задачам равномерно по процессорам в системе.

Рисунок схематически демонстрирует распараллеливания задач по процессорам.
Существует еще один способ создания тасков, который также очень удобен, но о котором почему-то не встречается частых упоминаний. Это так называемая надстройка Task над существующим кодом.
var op = new TaskCompletionSource<long>();
//Code here
return op.Task;
По приведенному примеру не совсем понятно преимущество данного подхода. Поэтому я постараюсь продемонстрировать пример, в котором данный подход может пригодиться.
private Task<byte[]> DownloadFileTask()
{
    var op = new TaskCompletionSource<byte[]>();
    var webClint = new WebClient();
    webClint.DownloadDataCompleted += (sender, args) =>
        {
            if(args.Error != null)
                op.SetException(args.Error);
            else if(args.Cancelled)
                op.SetCanceled();
            else
                op.SetResult(args.Result);
        };

    webClint.DownloadDataAsync(new Uri(""));
    return op.Task;
}
Пример является решением при загрузке байтового массива данных через WebClient. Этот подход удобно использовать для того, чтобы построить модель поведения на тасках, а не на APM.  

Примечание: В .Net Framework 4.5 часть операций уже имеет аналогичные функции, которые возвращают Task. Например, рассмотренный выше WebClient имеет методы DownloadDataTaskAsync и DownloadFileTaskAsync и т.д. Если Вы получаете данные через WCF клиент или используете OData, у которых моделью поведения выступает APM, то можете написать свою обертку для таких функций или посмотреть, как такая обертка реализуется, и написать свою.  Для того, чтобы использовать Task вместо APM с ее методами Begin/End, нужно использовать TaskFactory.FromAsync, который над этими методами создаст Task.

Операции WaitAll/WaitAny над тасками:

Для того, чтобы дождаться выполнения нескольких тасков, можно написать ожидание для каждого таска через метод Wait, как в примере ниже:
var task1 = Task.Factory.StartNew(Method1);
task1.Wait();
var task2 = Task.Factory.StartNew(Method2);
task2.Wait();
var task3 = Task.Factory.StartNew(Method3);
task3.Wait();
Можно также использовать статические методы класса Task: Task.WaitAll или Task.WaitAny. Первый метод дожидается выполнения всех запущенных тасков, которые переданы как параметр в метод, а второй  возвращает индекс первого таска, который закончил выполнение.
var task1 = Task.Factory.StartNew(Method1);
var task2 = Task.Factory.StartNew(Method2);
var task3 = Task.Factory.StartNew(Method3);
Task.WaitAll(task1, task2, task3);
Для тасков, которые возвращают результат, мы можем получить данные через свойство Result. Как это выглядит, Вы можете посмотреть на рисунке ниже.

 Свойство Result неявно вызывает метод Wait для ожидания результата.

Паттерн «WaitAllOneByOne»:

Для того, чтобы использовать таски более продуктивно, например, на многопроцессорной машине, для выделения на каждую задачу отдельного процесса используют паттерн WailAllOneByOne. Его особенность в том, чтобы вместо метода Task.WaitAll, использовать метод Task.WaitAny. Как это выглядит:

Этот паттерн работает следующим образом. Сначала создадим пустой список тасков. Затем заполняем этот список задачами, которые будут выполняться. Следующим шагом в цикле проверяем список на пустоту, и пока он не пустой, дожидаемся выполнения первого задания и удаляем это задание из списка. Этот паттерн стоит использовать:
   1.   Если результат выполнения выбросил исключение: прекратить выполнение.
   2. Для перекрытия вычислений с результатом обработки.
Данный паттерн не оптимизирован под работу с количеством процессором. И, по сути, может создать много задач, которые все время будут между собой делить процессорное время и ресурсы системы. Это говорит о том, что данный подход несколько неверный. Поэтому данный паттерн нуждается в небольшой доработке.
var tasks = new List<Task>();
//Количество создаваемых задач
int taskCount = 100;

var numberCores = Environment.ProcessorCount;
for (int i = 0; i < numberCores; i++)
{
    var task = Task.Factory.StartNew(DoSomeLongWork, TaskCreationOptions.LongRunning);
    tasks.Add(task);
}

while (tasks.Count > 0)
{
    int index = Task.WaitAny(tasks.ToArray());
    tasks.RemoveAt(index);

    taskCount--;

    if (taskCount > 0)
    {
        var task = Task.Factory.StartNew(DoSomeLongWork, TaskCreationOptions.LongRunning);
        tasks.Add(task);
    }
}
Как видно из модифицированного варианта, у нас добавилась привязка к количеству ядер машины. Сразу создается столько задач, сколько ядер в машине, затем как только первая задача будет выполнена, вместо нее создается другая задача. И так до тех пор, пока переменная taskCount не станет равна нулю. То есть будет создано 100 задач, которые будут долго выполняться в системе. Этот паттерн Вы можете изменять под свои нужды как вам будет угодно. Единственное, за чем вам нужно следить,  так это одна задача  одно ядро. Так Вы достигнете максимальной производительности системы.

Операции ContinueWith над тасками

Иногда нужно выполнить одну задачу сразу после выполнения предыдущей. Поэтому вместо кода
var task1 = Task.Factory.StartNew(Method1);
task1.Wait();
в тасках существует метод ContinueWith, который продолжает выполнение другого таска сразу после завершения предыдущего. Данный метод неявно вызывает метод Wait для завершения работы предыдущего задания.  
var task1 = Task.Factory.StartNew<int>(Method1);
var task2 = task1.ContinueWith((previous) =>
    {

    });
В отличие от явного вызова метода Wait, .Net оптимизирует планирование. Использование ContinueWith очень удобно, если задания выполнялись в отдельном потоке, а нужно обновить данные в GUI потоке, − мы можем написать такой код:
var task1 = Task.Factory.StartNew<int>(Method1);
var task2 = task1.ContinueWith((previous) =>
    {
        //Доступ к UI контрлам
    }, TaskScheduler.FromCurrentSynchronizationContext());
Метод TaskScheduler.FromCurrentSynchronizationContext() возвращает текущий контекст GUI потока. При использовании Silverlight Вам такое решение должно понравиться. Поскольку в Silverlight нельзя блокировать основной поток, пока не выполнится некая длительная операция, с помощью тасков мы избегаем этой проблемы. Единственная задача, которая останется за Вами,  это сделать красивый контрол, который будет показывать процесс выполнения асинхронной операции. Затем Вы просто отобразите полученный результат.
Если мы работаем не с одним таском, а с несколькими, то для них существуют методы Task.Factory.ContinueWhenAll и Task.Factory.ContinueWhenAny. Пример:
var task1 = Task.Factory.StartNew<int>(Method1);
var task2 = Task.Factory.StartNew<int>(Method2);
var task3 = Task.Factory.StartNew<int>(Method3);
var tasks = new Task[] { task1, task2, task3 };

Task.Factory.ContinueWhenAll(tasks, (tasksResult) =>
    {
        //TODO code
    });
Task.Factory.ContinueWhenAny(tasks, (task) =>
    {
        //TODO code
    });
Эти методы работаю по аналогии с Task.WaitAll и Task.WaitAny, только после их выполнения продолжается следующая задача.  

Обработка ошибок при использовании тасков:

Если таск бросил ошибку, то выполнение таска прекращено. Ошибка таска сохраняется как часть AggregateException (AE) и хранится в объекте таска Exception (E) проперти. AE пробрасывается на функции .Wait, .Result или .WaitAll.
Приведем пример кода обработки ошибок.
var task1 = Task.Factory.StartNew<int>(Method1);
var result = task1.Result;
В примере метод task1.Result выбросит Exception то его нужно перехватить
try
{
    var task1 = Task.Factory.StartNew<int>(Method1);
    var result = task1.Result;
}
catch (AggregateException ae)
{
    Console.WriteLine(ae.InnerException.Message);
}
Для того, чтобы отобразить все ошибки, которые привели к выбросу ошибки, нужно вызвать метод Flatten().
try
{
    var task1 = Task.Factory.StartNew<int>(Method1);
    var result = task1.Result;
}
catch (AggregateException ae)
{
    var exceptions = ae.Flatten();
    foreach (var exception in exceptions.InnerExceptions)
    {
        Console.WriteLine(exception.Message);
    }
}
Правила работы с ошибками при использовании тасков (Exception handling design):

  • при использовании методов .Wait, .Result или .WaitAll нужно обрамлять в блок try/catch
  • нужно выполнить подписку на TaskScheduler.UnobservedTaskException для того чтобы отловить необработанные критические ошибки при вызове тасков.
Отмена задач (Task cancellation)

При выполнении длительных задач для некоторых пользователей необходима возможность прерывать этот процесс. Это необходимо для обеспечения User Friendly интерфейсу взаимодействия программы и пользователя. Для этого при создании длительного таска мы создаем CancellationToken и передаем его как параметр в задание. Затем в задании мониторим токен, и если пользователь отменил действие очищаем и пробрасываем ошибку с помощью метода ThrowIfCancellationRequested(). Как это работает:
var tokenSource2 = new CancellationTokenSource();
CancellationToken ct = tokenSource2.Token;
var task = Task.Factory.StartNew(() =>
{
    // Were we already canceled?
    ct.ThrowIfCancellationRequested();
    bool moreToDo = true;
    while (moreToDo)
    {
        // Poll on this property if you have to do
        // other cleanup before throwing.
        if (ct.IsCancellationRequested)
        {
            // Clean up here, then...
            ct.ThrowIfCancellationRequested();
        }
    }
}, tokenSource2.Token); // Pass same token to StartNew.
tokenSource2.Cancel();

// Just continue on this thread, or Wait/WaitAll with try-catch:
try
{
    task.Wait();
}
catch (AggregateException e)
{
    foreach (var v in e.InnerExceptions)
        Console.WriteLine(e.Message + " " + v.Message);
}
Пример взят с MSDN (Task Cancellation). Если при получении результата мы получим OperationCanceledException, то это значит, что пользователь прекратил выполнение задания сам. Вот как будет выглядеть переписанная обработка ошибок:
try
{
    task.Wait();
}
catch (AggregateException ae)
{
    var exceptions = ae.Flatten();
    foreach (var exception in exceptions.InnerExceptions)
    {
        if (exception is OperationCanceledException)
        {
            //ignore cancel
        }
        else
            Console.WriteLine(exception.Message);
    }
}

Дочерние задания

Для тасков можно создавать дополнительные дочерние задания, у которых будет родитель, который дожидается их выполнения.
var parent = Task.Factory.StartNew(() =>
{
    var child1 = Task.Factory.StartNew(() => { }, TaskCreationOptions.AttachedToParent);
    var child2 = Task.Factory.StartNew(() => { }, TaskCreationOptions.AttachedToParent);
});

Вот как выглядит это решение. TaskCreationOptions задает флаги, которые управляют необязательным поведением создания и выполнения задач. Существуют такие флаги TaskCreationOptions:

Имя члена
Описание
AttachedToParent
Указывает, что задача присоединена к родительской задаче в иерархии задач.
DenyChildAttach
Указывает, что при попытке вложить дочернюю задачу в созданную задачу возникнет исключение InvalidOperationException
HideScheduler
Не позволяет видеть внешний планировщик как текущий планировщик в созданной задаче. Это означает, что такие операции, как StartNew или ContinueWith, которые выполняются в созданной задаче, в качестве текущего планировщика будут видеть свойство Default.
LongRunning
Указывает, что задача будет долгой, широкослойной операцией, включая большие, чем детализированные компоненты, системы. Предоставляет сведения для TaskScheduler, что следует ожидать избыточной подписки. Превышение лимита подписки позволяет создать больше потоков, чем количество доступных аппаратных потоков.
None
Указывает, что следует использовать поведение по умолчанию.
PreferFairness
Рекомендация для TaskScheduler по планированию задач максимально прямым способом, то есть задачи, запланированные раннее, будут выполняться ранее, а более поздние - позже.

Итоги: 

В данной статье рассмотрены принципы работы с тасками (задачами). А также представлена обработка ошибок, ожидание завершение таска, прерывание выполнения задачи и паттерн “WaitAllOneByOne”.

Источники:

No comments:

Post a Comment