Sunday, June 15, 2014

Использование TPL совместно с Reactive Extensions

В этой статье мы рассмотрим, как можно комбинировать таски с библиотеки Task Parallel Library (TPL) совместно с библиотекой Rx (Reactive Extensions). Эта статья будет небольшой разминкой между статьями по Prism 5. Сама по себе библиотека реактивных решений, или, иными словами, Reactive Extensions, или же просто Rx, позволяет нам работать с событиями, асинхронными вызовами в композиционном стиле. Благодаря набору библиотек Rx у нас есть возможность работать с нашим кодом в асинхронном режиме. Основная идеология данной библиотеки заключается том, что она позволяет нам работать с так называемыми push-коллекциями – это когда в коллекции происходят какие-то изменения, то она нас сама об этом информирует, нам только нужно подписаться на изменения этих данных. В противоположность push-коллекциям существуют так званные pull-коллекции, когда мы сами бегам по коллекции и берем оттуда данные. Примером такой коллекции может быть использование интерфейсов IEnumerable и IEnumerator, который с помощью метода MoveNext() (интерфейс IEnumerator) позволяет обойти коллекцию. Библиотеки реактивных решений построены вокруг новых интерфейсов IObserver и IObservable, и, по сути, реализуют паттерн наблюдатель (Observer). Более детально о начале работы с библиотекой Rx вы можете почерпнуть из моей статьи "Использование Reactive Extensions".
Как же Rx нам мог помочь с тасками? Одной из самых популярных возможностей, которые нам давал Rx для 4-го .NET Framework, – это возможность строить выполнение тех же тасков последовательно. В 4.5 фреймворке ситуация немного улучшилась, и мы можем выполнить несколько тасков с помощью ключевых слов async/await. Давайте приведем несколько примеров. Для того чтобы выполнить несколько разных тасков, нам нужно было написать код, подобный коду ниже.
Action action = () =>
    {
        var res1 = Task<string>.Run(() =>
        {
            Thread.Sleep(500);
            return "Hello";
        });
        var res2 = Task<string>.Run(() =>
        {
            Thread.Sleep(500);
            return "World";
        });

        Task.WaitAll(res1, res2);

        Console.WriteLine(res1.Result + " " + res2.Result);
    };
action();
Если нам нужно сделать выполнение тасков последовательно, то у нас уже проявлялась проблема, так как сделать это мы могли только используя метод ContinueWith.
Action action = () =>
    {
        var res1 = Task<string>.Run(() =>
        {
            Thread.Sleep(500);
            return "Hello";
        });

        var res2 = res1.ContinueWith((task) =>
            {
                var res3 = Task<string>.Run(() =>
                {
                    Thread.Sleep(500);
                    return "World";
                });

                Task.WaitAll(res3);

                return task.Result + " " + res3.Result;
            });

        Task.WaitAll(res2);
        Console.WriteLine(res2.Result);
    };
action();
Код получился страшноватым. Давайте посмотрим, как мы могли бы переписать этот код, используя возможности фреймворка 4.5.
Action a = async () =>
    {
        var res1 = await Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "Hello";
        });
        var res2 = await Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "World";
        });

        Console.WriteLine(res1 + " " + res2);
    };
a();
Код стал намного компактнее и синтаксис стал более понятен. Теперь давайте установим библиотеку Rx (точнее, набор библиотек) и посмотрим, как можно переписать код выше с возможностями, которые нам дают реактивные решения. Для удобства я буду использовать возможности .Net Framework 4.5 с использованием методов Delay, async/await и т.д.
После того как мы добавим в созданное нами новое консольное приложение библиотеку Rx, мы можем посмотреть, как использовать ее возможности. Например, последний пример мы можем переписать следующим образом через Rx.
Action a = async () =>
    {
        IObservable<string> observable1 = Task<string>.Run(async () =>
            {
                await Task.Delay(500);
                return "Hello";
            }).ToObservable();

        IObservable<string> observable2 = Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "World";
        }).ToObservable();

        var result = await observable1.Zip(observable2, (x1, x2) => string.Join(" ", x1, x2));
        Console.WriteLine(result);
    };
a();
Для того чтобы мы могли использовать Extension MethodToObservable(), нам нужно добавить ссылку на пространство имен:
using System.Reactive.Threading.Tasks;
Результат очень простой. С помощью функции Zip мы склеили результат, полученный с наших тасков. А как быть, если нам нужно вывести тот результат, который будет выполнен первым? Давайте посмотрим, как это можно сделать с помощью async/await и метода Task.WhenAny.
Action action = async () =>
{
    var task1 = Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "Hello";
        });

    var task2 = Task<string>.Run(async () =>
        {
            await Task.Delay(1000);
            return "World";
        });

    var result = await Task.WhenAny(task1, task2).Result;
    Console.WriteLine(result);
};
action();
В методе, который возвращает результатом слово "World", я поставил время ожидания 1 секунду. Теперь посмотрим, как это же можно сделать через Rx. Это можно сделать двумя способами.
Первый способ:
Action action = async () =>
{
    var observable1 = Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "Hello";
        }).ToObservable();

    var observable2 = Task<string>.Run(async () =>
        {
            await Task.Delay(1000);
            return "World";
        }).ToObservable();

    var result = await observable1.Merge(observable2).FirstAsync();
    Console.WriteLine(result);
};
action();
Второй способ:
Action action = async () =>
{
    var observable1 = Task<string>.Run(async () =>
        {
            await Task.Delay(500);
            return "Hello";
        }).ToObservable();

    var observable2 = Task<string>.Run(async () =>
        {
            await Task.Delay(1000);
            return "World";
        }).ToObservable();

    var result = await observable1.Amb(observable2);
    Console.WriteLine(result);
};
action();
Следующим примером мы рассмотрим, как можно сделать конкатенацию для IObservable коллекций.
Для того чтобы не дублировать код получения строк "Hello" и "World", я вынес их в отдельные методы.
private static Task<string> GetWorldString()
{
    return Task<string>.Run(async () =>
    {
        await Task.Delay(1000);
        return "World";
    });
}

private static Task<string> GetHelloString()
{
    return Task<string>.Run(async () =>
    {
        await Task.Delay(500);
        return "Hello";
    });
}
Теперь нам будет проще понимать саму конкатенацию:
Action action = async () =>
{
    var observable1 = GetHelloString().ToObservable();

    var observable2 = GetWorldString().ToObservable();

    var resultObservable = Observable.Concat(observable1, observable2);

    await resultObservable.ForEachAsync(x => Console.WriteLine(x));
};
action();
Мы могли поступить немного проще и после приведения нашего таска к наблюдаемому через метод ToObservable() – просто сделать конкатенацию на первой коллекции.
Action action = async () =>
{
    var observable1 = GetHelloString().ToObservable()
        .Concat(GetWorldString().ToObservable());

    await observable1.ForEachAsync(x => Console.WriteLine(x));
};
action();
Предыдущий вариант вывода мы можем переписать еще одним методом, дописав еще одну подписку.
var observable1 = GetHelloString().ToObservable()
    .Concat(GetWorldString().ToObservable())
    .Subscribe(x =>
        {
            Console.WriteLine(x);
        }
    );
Либо вот таким способом через упомянутый ранее метод ForEachAsync:
var observable1 = GetHelloString().ToObservable()
    .Concat(GetWorldString().ToObservable())
    .ForEachAsync(x => Console.WriteLine(x));

В этой небольшой статье мы с вами рассмотрели возможность использования реактивных решений совместно с тасками. Если же вас интересует вопрос, когда лучше использовать таски, а когда реактивные решение (IObservable), то вы можете посмотреть статью "Task<T> vs IObservable<T>: when to use what?".  

No comments:

Post a Comment