Wednesday, January 15, 2014

Использование Reactive Extensions

В предыдущей статье речь шла об использовании паттерна наблюдатель (Observer). Поэтому мы пойдем далее и рассмотрим библиотеку Rx (Reactive Extensions), которая используется для написания асинхронных и основанных на событиях (event-based) программ с использованием интерфейса IObservable, а также возможности использования LINQ для построения запросов. Почему-то многие разработчики считают использование Rx ненужным. В основном тезисы о ненадобности данной библиотеки приводятся в виде: "Какой смысл использовать асинхронную модель с Rx, если есть уже TPL (Task Parallel Library)?" или: "Перекрутили IEnumerable с ног на голову". Это высказывания разработчиков, лишь поверхностно знакомых с библиотекой Rx. Но давайте смотреть на вещи реально. Пытаться использовать данную библиотеку как замену библиотеке TPL не имеет смысла, но Rx имеет право на существование как дополнение и расширение библиотеки TLP . Хотя, как могут заметить скептики, смысл какой-то библиотеки для расширения функционала TLP иногда не стоит того, чтобы для этих целей загружать отдельную библиотеку. Здесь нам на помощь приходит второй мощный механизм, который нам предоставляет Rx, - это работа с событиями с использованием интерфейса IObservable. По сути, это использования паттерна Наблюдатель.
Примечание: Если Вы не знакомы с принципом работы паттерна наблюдатель, рекомендую ознакомиться с этим паттерном (ссылка на статью) для понимания принципа работы библиотеки Rx.
Я также скептически отношусь к разным новинкам как компании Майкрософт, так и других компаний по разработке ПО. Поскольку моим пристрастием является программирование на WPF, то Rx в этом плане мне не очень понравился, так как при программировании на WPF я стараюсь придерживаться использования паттерна MVVM, поэтому в основном работаю с командами (интерфейс ICommand) вместо обычных событий. Да и те обертки, которые написаны над Routed Event для Rx (посмотрите в NuGet Packages пакеты XAML Support Library, WPF Helpers и др.) совсем не впечатляют. Возможно, Вы сможете найти им применение, но я пока воздержусь от использования данной библиотеки в своих WPF-приложениях. Но у этой библиотеки есть огромный плюс: она прекрасно ложится на модель событий .NET. Поэтому если Вы пишете приложение на Windows Forms или пишете библиотеку, в которой активно используете модель событий, посмотрите в сторону Rx, возможно, она позволит упростить Вам работу и начать писать код намного проще и понятнее для сопровождения и поддержки.
Рассмотрим, как использовать данную библиотеку и какие задачи она позволяет решать. Пример выполнения асинхронного кода с помощью Rx:
static void Main(string[] args)
{
    Action action = async () =>
    {
        var o = Observable.Start(() =>
        {
            //This starts on a background thread.
            Console.WriteLine("From background thread. Does not block main thread.");
            Console.WriteLine("Calculating...");
            Thread.Sleep(1000);
            Console.WriteLine("Background work completed.");
        });
        await o.FirstAsync();
        Console.WriteLine("Main thread completed.");
    };
    action.Invoke();
    Console.ReadLine();
}
Вот как предлагает нам асинхронно выполнять код библиотека Rx. Но зачем такая возможность этой библиотеки, если такой же код можно переписать на использование TPL, и никакой разницы не будет. Пример с использованием TPL:
static void Main(string[] args)
{
    Action action = async () =>
    {
        var task = Task.Factory.StartNew(() =>
        {
            //This starts on a background thread.
            Console.WriteLine("From background thread. Does not block main thread.");
            Console.WriteLine("Calculating...");
            Thread.Sleep(1000);
            Console.WriteLine("Background work completed.");
        });
        await task;
        Console.WriteLine("Main thread completed.");
    };
    action.Invoke();
    Console.ReadLine();
}
Как видим, пока преимущества по сравнению с TPL особо не заметно. Давайте допустим, что у нас есть некая синхронная операция, которую мы хотим сделать асинхронной.
public IObservable<int> FactorialAsync(int number)
{
    return Observable.Create<int>(
        o => Observable.ToAsync<int, int>(Factorial)(number).Subscribe(o)
    );
}

public static int Factorial(int number)
{
    if (number <= 1)
        return 1;
    return number * Factorial(number - 1);
}
Пример этого же примера с использование тасков:
public Task<int> FactorialTaskAsync(int number)
{
    var task = Task.Factory.StartNew(value => Factorial((int) value), number);
    return task;
}

public static int Factorial(int number)
{
    if (number <= 1)
        return 1;
    return number * Factorial(number - 1);
}  
Вы не находите, что TPL для простых асинхронных операций выглядит лучше, чем Rx? Пожалуй, пока Вы не слишком разочаровались в этой библиотеке, рассмотрим ее сильные стороны. Одна из них - это то, как Rx работает с событиями.
class Program
{
    public static event EventHandler<EventArgs> SimpleEvent;

    static void Main(string[] args)
    {
        IObservable<EventPattern<EventArgs>> eventAsObservable = Observable.FromEventPattern<EventArgs>(
            ev => SimpleEvent += ev,
            ev => SimpleEvent -= ev);

        var observer1 = eventAsObservable.Subscribe(arg => Console.WriteLine("Received event for observer1 subscriber"));
        var observer2 =
            eventAsObservable.Subscribe(arg => Console.WriteLine("Received event for observer2 subscriber"));
        SimpleEvent(null, EventArgs.Empty);
        Console.ReadLine();
    }
}
Как видим из примера, из модели, основанной на событиях, мы создаем модель, основанную на паттерне "Observer". Одно из преимуществ данного подхода - в том, что беспокоиться насчёт того, что Вы не отпишетесь от события, не нужно. Это уже есть огромным плюсом данной библиотеки. Рассмотрим, как реализован интерфейс IObservable в .NET Framework.
 public interface IObserver<in T>
    {
        // Summary:
        //     Notifies the observer that the provider has finished sending push-based notifications.
        void OnCompleted();
        //
        // Summary:
        //     Notifies the observer that the provider has experienced an error condition.
        //
        // Parameters:
        //   error:
        //     An object that provides additional information about the error.
        void OnError(Exception error);
        //
        // Summary:
        //     Provides the observer with new data.
        //
        // Parameters:
        //   value:
        //     The current notification information.
        void OnNext(T value);
    }
Посмотрите, как работает Observable в Rx.
С помощью библиотеки Rx мы можем создавать паттерн наблюдатель на лету и использовать его где угодно. Ниже приведены несколько примеров использования данного паттерна. Что если нам нужно выполнять некоторые действия асинхронно через некоторый промежуток времени? Использование для таких целей класса Timer - решение неплохое, но нужно подписаться на метод Tick, а также не забыть от него отписаться, создать нужные функции в асинхронном режиме (например, через таски) и т.д. Очень много кропотливой лишней работы. Данная библиотека делает это с помощью нескольких строк кода.
static void Main(string[] args)
{
    var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp();

    using (observable.Subscribe(
        x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
    {
        Console.WriteLine("Press any key to unsubscribe");
        Console.ReadKey();
    }

    Console.WriteLine("Press any key to exit");
           
    Console.ReadLine();
}
Вот более интересный пример с использованием событий Windows Forms:
static void Main(string[] args)
{
    var form = new Form();
    var textBox = new TextBox();
    form.Controls.Add(textBox);

    var tx = Observable.FromEventPattern<EventArgs>(textBox, "TextChanged");
    var res = from e in tx
                select ((TextBox) e.Sender).Text;
    using (res.Subscribe(Console.WriteLine))
    {
        Application.Run(form);
    }
}
Выше приведена простенькая программа, которая транслирует изменения текста в программе Windows в консольное окно. Довольно неплохо, учитывая тот факт, что нам, помимо всего, не нужно заботится о том, чтобы отписаться от события. Стоит вспомнить об одном из главных принципов Rx, это двойственность.
var collection1 = Enumerable.Range(1, 3).ToObservable();

var collection2 = Observable.Range(1, 3).ToEnumerable();
Вы можете привести вашу коллекцию для выполнения в асинхронном режиме с использованием extension метода ToObservable, так и с observable collection в синхронную коллекцию с помощью метода ToEnumerable. Чтобы коллекции получить доступ к элементам интерфейса, нужно делать синхронизацию с интерфейсом пользователя через метод ObserveOn. Если Вы помните, то таски из библиотеки TPL также не могут обратиться к визуальным элементам управления. Их также нужно синхронизировать через метод TaskScheduler.FromCurrentSynchronizationContext(), который возвращает текущий контекст GUI потока. Метод ObserveOn работает по такому же принципу. Рассмотрим пример Drag&Drop контролов на форме Windows Form, чтобы более наглядно посмотреть, как можно комбинировать использование событий с помощью Rx.
static void Main(string[] args)
{
    var form = new Form
    {
        Controls =
        {
            new Label {Text = "label1", BorderStyle = BorderStyle.FixedSingle},                      
            new Button {Text = "button1"},
            new Label {Text = "label2", BorderStyle = BorderStyle.FixedSingle},
        }
    };

    Func<Control, IObservable<EventPattern<MouseEventArgs>>> mouseMove = c =>
        Observable.FromEventPattern<MouseEventArgs>(c, "MouseMove");

    Func<Control, IObservable<EventPattern<MouseEventArgs>>> mouseUp = c =>
        Observable.FromEventPattern<MouseEventArgs>(c, "MouseUp");

    Func<Control, IObservable<EventPattern<MouseEventArgs>>> mouseDown = c =>
        Observable.FromEventPattern<MouseEventArgs>(c, "MouseDown");
    var q =
        from Control con in form.Controls
        select
        (
            from d in mouseDown(con)
            from u in mouseMove(con).TakeUntil(mouseUp(con))
            select u
        );

    q.Merge().Subscribe(arg =>
    {
        var control = arg.Sender as Control;
        control.Location = new Point(
            new Size(arg.EventArgs.X, arg.EventArgs.Y));
    });

    Application.Run(form);
}
Использование Reactive Extensions дает нам определенную гибкость при работе с событиями. К сожалению, для себя применения Observable вместо Task<TResult> я не нашел. Как по мне,то стандартная модель TPL выполняет все возложенные на нее возможности (а с выходом .NET 4.5 почти для всех основных функций для получения некоторых данных есть аналогичный метод, который возвращает таск). Но для модели событий использование Rx очень даже оправдано. Мне приходилось видеть проекты, в которых для такой роли использовали Prism, с которого, кроме модели событий, использовался только Bootstraper для определения, какой IoC контейнер использовать в программе. А в данном случае у нас полнофункциональная библиотека, которая именно эту роль и исполняет. Рекомендую к просмотру DevCamp 2010 Keynote - Rx: Curing your asynchronous programming blues, в котором Барт де Смет рассказывает о возможностях данной библиотеки. Если Вам будет не лень, посмотрите пример в конце выступления: Вы увидите пример мощного варианта использования данной библиотеки. В конце статьи приведен список литературы с материалами для того, чтобы освоить данную библиотеку с ее возможностями в кратчайшие строки.

Список литературы:

No comments:

Post a Comment