Friday, December 6, 2019

Asynchronous streams under the hood in C# 8


You would probably hear about asynchronous stream in C# 8? If no, today I am going to show you how this new feature in C# 8.0 works under the hood. Starting with C# 8.0, you can create and consume streams asynchronously. A method that returns an asynchronous stream has three properties:
  • It's declared with the async modifier.
  • It returns an IAsyncEnumerable<T>.
  • The method contains yield return statements to return successive elements in the asynchronous stream.
Below you will see the example from What's new in C# 8.0 - Asynchronous streams. For reproducing this functionality, please run Visual Studio 2019 and create a new Console App (.NET Core)
After that, replace existing Main method to the code below.

static async Task Main(string[] args)
        {
            await foreach (var number in GenerateSequence())
            {
                Console.WriteLine(number);
            }
        }

        public static async System.Collections.Generic.IAsyncEnumerable<int> GenerateSequence()
        {
            for (int i = 0; i < 20; i++)
            {
                await Task.Delay(100);
                yield return i;
            }
        }

You can run this example and see how following code generates a sequence from 0 to 19, waiting 100 ms between generating each number:
Async streams rely on new interfaces introduced in .NET Standard 2.1 and this is a new killer feature from Microsoft which was proposes to be in C# 7, however because implementing this feature required a lot of changes in .net framework, these final changes were implemented in .NET Core 3.0. If you want to try this new feature, you need to setup some prerequisites:
You’ll need to set up your machine to run .NET Core, including the C# 8.0 compiler. The C# 8 compiler is available starting with Visual Studio 2019 version 16.3 or .NET Core 3.0 SDK.
Fur supporting asynchronous streams Microsoft added IAsyncEnumerable<out T> and IAsyncEnumerator<out T> . The code that generates the sequence can now use yield return to return elements in a method that was declared with the async modifier. You can consume an async stream using an await foreach loop just as you consume any sequence using a foreach loop. In addition, they added a new interface IAsyncDisposable for implementing disposable pattern for async streams.

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }
}

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

These three interfaces should be familiar to most C# developers. They behave in a manner similar to their synchronous counterparts:
From all this structure probably only one type will be new for you. This ValueTask Struct was added to .NET Core for performance reasons. The ValueTask is a structure that can wrap either a Task or a IValueTaskSource instance. If you want to read about performance issue and optimization with this new ValueTask type, I would suggest reading this article Understanding the Whys, Whats, and Whens of ValueTask.
So let’s proceed with these new interfaces. Under the hood .net core compiler generates very similar functionality as it generates for async\await for Tasks. (Additional information how async\await works you can find in my article Async и await в C# 6.0).
For generating IL code and check how this code works under the hood, I have used tool called dotnet-ildasm. For install, this tool you can run command:

dotnet tool install -g dotnet-ildasm

After that, you can generate IL code as in example bellow:

dotnet ildasm ConsoleAppTestCosmosDb.dll -o disassembledAssembly.il

After running this tool, we can find expected result in file disassembledAssembly.il. To tell the truth when you see this file the first time, it is not possible to understand how does it work. I am not sure that adding all generated IL code here would be reasonable, because .net core 3.0 for our C# code in 20 lines generates ….. 725 lines of source code. Here I will provide only tiny piece of this code.

.class nested private auto ansi sealed beforefieldinit<GenerateSequence> d__1 extends[System.Runtime] System.Object implements genericinstance, genericinstance, [System.Runtime]System.IAsyncDisposable, genericinstance, [System.Runtime]System.Threading.Tasks.Sources.IValueTaskSource, [System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine
  {
    .field public int32 '<>1__state'
    .field public [System.Threading.Tasks] System.Runtime.CompilerServices.AsyncIteratorMethodBuilder '<>t__builder'
    .field public genericinstance '<>v__promiseOfValueOrEnd'
    .field private int32 '<>2__current'
    .field private boolean '<>w__disposeMode'
    .field private int32 '<>l__initialThreadId'
    .field private int32 '<i>5__1'
    .field private [System.Runtime] System.Runtime.CompilerServices.TaskAwaiter '<>u__1'

    .method public hidebysig specialname rtspecialname instance void .ctor(int32<>1__state) cil managed
{
      // Code size 37
      .maxstack 8
      IL_0000: ldarg.0
      IL_0001: call instance void [System.Runtime]
    System.Object::.ctor()
      IL_0006: nop
     IL_0007: ldarg.0
      IL_0008: ldarg.1
      IL_0009: stfld int32 ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>1__state'
      IL_000e: ldarg.0
      IL_000f: call int32 [System.Runtime.Extensions]
    System.Environment::get_CurrentManagedThreadId()
      IL_0014: stfld int32 ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>l__initialThreadId'
      IL_0019: ldarg.0
      IL_001a: call [System.Threading.Tasks]
    System.Runtime.CompilerServices.AsyncIteratorMethodBuilder [System.Threading.Tasks]
    System.Runtime.CompilerServices.AsyncIteratorMethodBuilder::Create()
      IL_001f: stfld [System.Threading.Tasks]
    System.Runtime.CompilerServices.AsyncIteratorMethodBuilder ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>t__builder'
      IL_0024: ret
} // End of method System.Void ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::.ctor(System.Int32)

    .method private hidebysig newslot virtual final instance void MoveNext() cil managed
{
      // Code size 323
      .maxstack 3
      .locals init(int32 V_0, [System.Runtime]
    System.Runtime.CompilerServices.TaskAwaiter V_1, ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1 V_2, int32 V_3, boolean V_4, [System.Runtime]
    System.Exception V_5)
      IL_0000: ldarg.0
      IL_0001: ldfld int32 ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>1__state'
      IL_0006: stloc.0
      .try
      {
.Net core generated two IAsyncStateMachine classes for Main and GenerateSequence function.

.class nested private auto ansi sealed beforefieldinit<Main> d__0 extends[System.Runtime] System.Object implements[System.Runtime] System.Runtime.CompilerServices.IAsyncStateMachine
.class nested private auto ansi sealed beforefieldinit<GenerateSequence> d__1 extends[System.Runtime] System.Object implements genericinstance, genericinstance, [System.Runtime]System.IAsyncDisposable, genericinstance, [System.Runtime]System.Threading.Tasks.Sources.IValueTaskSource, [System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine

So, if you see this code still not possible to read. For better understanding how it generates code, I have translated this IL code to C# and refactored existing code for better understanding.  We have here two state machines: the first one was generated for GenerateSequence method and the second one was generated for Main method (await foreach loop). We will start from the first method GenerateSequence.

[CompilerGenerated]
public sealed class GenerateSequenceMethod : IAsyncEnumerable<int>, IAsyncEnumerator<int>, IAsyncDisposable, IValueTaskSource<bool>, IValueTaskSource, IAsyncStateMachine
{
    public int _state;

    public AsyncIteratorMethodBuilder _builder;

    public ManualResetValueTaskSourceCore<bool> _promiseOfValueOrEnd;

    private int _current;

    private bool _disposeMode;

    private int _initialThreadId;

    private int _number;

    private TaskAwaiter _taskAwaiter;

    [DebuggerHidden]
    public GenerateSequenceMethod(int state)
    {
        _state = state;
        _initialThreadId = Environment.CurrentManagedThreadId;
        _builder = AsyncIteratorMethodBuilder.Create();
    }

    int IAsyncEnumerator<int>.Current
    {
        [DebuggerHidden]
        get
        {
            return _current;
        }
    }


    private void MoveNext()
    {
        int num = _state;
        try
        {
            TaskAwaiter awaiter;
            switch (num)
            {
                default:
                    if (!_disposeMode)
                    {
                        _state = -1;
                        _number = 0;
                        goto LOOP;
                    }
                    goto ENDTRYBLOCK;
                case 0:
                    awaiter = _taskAwaiter;
                    _taskAwaiter = default(TaskAwaiter);
                    _state = -1;
                    break;
                case -4:
                    {
                        _state = -1;
                        if (!_disposeMode)
                        {
                            _number++;
                            goto LOOP;
                        }
                        goto ENDTRYBLOCK;
                    }
                LOOP:
                    if (_number < 20)
                    {
                        awaiter = Task.Delay(100).GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            _state = 0;
                            _taskAwaiter = awaiter;
                            GenerateSequenceMethod stateMachine = this;
                            _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                            return;
                        }
                        break;
                    }
                    goto ENDTRYBLOCK;
            }
            awaiter.GetResult();
            _current = _number;
            _state = -4;
            goto COMPLETED;
        ENDTRYBLOCK:;
        }
        catch (Exception exception)
        {
            _state = -2;
            _promiseOfValueOrEnd.SetException(exception);
            return;
        }
        _state = -2;
        _promiseOfValueOrEnd.SetResult(false);
        return;
    COMPLETED:
        _promiseOfValueOrEnd.SetResult(true);
    }

    void IAsyncStateMachine.MoveNext()
    {
        //ILSpy generated this explicit interface implementation from .override directive in MoveNext
        this.MoveNext();
    }

    private void SetStateMachine(IAsyncStateMachine stateMachine)
    {
    }

    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
        this.SetStateMachine(stateMachine);
    }

    IAsyncEnumerator<int> IAsyncEnumerable<int>.GetAsyncEnumerator(CancellationToken cancellationToken)
    {
        GenerateSequenceMethod result;
        if (_state == -2 && _initialThreadId == Environment.CurrentManagedThreadId)
        {
            _state = -3;
            result = this;
            _disposeMode = false;
        }
        else
        {
            result = new GenerateSequenceMethod(-3);
        }
        return result;
    }

    ValueTask<bool> IAsyncEnumerator<int>.MoveNextAsync()
    {
        if (_state == -2)
        {
            return default(ValueTask<bool>);
        }

        _promiseOfValueOrEnd.Reset();
        GenerateSequenceMethod stateMachine = this;
        _builder.MoveNext(ref stateMachine);
        short version = _promiseOfValueOrEnd.Version;
        if (_promiseOfValueOrEnd.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
        {
            return new ValueTask<bool>(_promiseOfValueOrEnd.GetResult(version));
        }
        return new ValueTask<bool>(this, version);
    }

    bool IValueTaskSource<bool>.GetResult(short token)
    {
        return _promiseOfValueOrEnd.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
    {
        return _promiseOfValueOrEnd.GetStatus(token);
    }

    void IValueTaskSource<bool>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags);
    }

    void IValueTaskSource.GetResult(short token)
    {
        _promiseOfValueOrEnd.GetResult(token);
    }

    ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
    {
        return _promiseOfValueOrEnd.GetStatus(token);
    }

    void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
    {
        _promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags);
    }

    ValueTask IAsyncDisposable.DisposeAsync()
    {
        if (_state >= -1)
        {
            throw new NotSupportedException();
        }
        if (_state == -2)
        {
            return default(ValueTask);
        }
        _disposeMode = true;
        _promiseOfValueOrEnd.Reset();
        GenerateSequenceMethod stateMachine = this;
        _builder.MoveNext(ref stateMachine);
        return new ValueTask(this, _promiseOfValueOrEnd.Version);
    }
}

This class GenerateSequenceMethod is no so big and it is not hard to read, because 90% of this code was implemented in function MoveNext(). This MoveNext() function implements for loop with jump table (state table). This code also has some magic code with calling function AwaitUnsafeOnCompleted. I will describe what this function do when I describe current state manager and it works. 
So, the next logic was generated for Main function (await foreach) and I’ve decided to rename this class to MainFunctionClass (sorry about this terrible naming convention) and refactor existing class such as removing redundant code and provide appropriate naming convention for better understanding.

public sealed class MainFunctionClass : IAsyncStateMachine
{
    public int _state;

    public AsyncTaskMethodBuilder _builder;

    private IAsyncEnumerator<int> _collectionAsyncEnumerator;

    private Exception _exception;

    private int _number;

    private bool _checkResult;

    private ValueTaskAwaiter<bool> _taskAwaiter;

    private ValueTaskAwaiter _firstCallAwaiter;
    private static int i = 0;

    private void MoveNext()
    {
        Console.WriteLine($"MoveNext Calls {i++}");
        int num = _state;
        try
        {
            ValueTaskAwaiter awaiter;
            if (num != 0)
            {
                if (num == 1)
                {
                    awaiter = _firstCallAwaiter;
                    _firstCallAwaiter = default(ValueTaskAwaiter);
                    _state = -1;
                    goto GETFIRSTRUNRESULT;
                }
                _collectionAsyncEnumerator = Program.GenerateSequence().GetAsyncEnumerator(default(CancellationToken));
                _exception = null;
            }
            try
            {
                if (num != 0)
                {
                    goto GETNEXTAWAITER;
                }
                ValueTaskAwaiter<bool> awaiter2 = _taskAwaiter;
                _taskAwaiter = default(ValueTaskAwaiter<bool>);
                _state = -1;
                goto GETRESULT;
            GETNEXTAWAITER:
                awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
                if (!awaiter2.IsCompleted)
                {
                    _state = 0;
                    _taskAwaiter = awaiter2;
                    MainFunctionClass stateMachine = this;
                    _builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
                    return;
                }
            GETRESULT:
                 _checkResult = awaiter2.GetResult();
                if (_checkResult)
                {
                    _number = _collectionAsyncEnumerator.Current;
                    Console.WriteLine(_number);
                    goto GETNEXTAWAITER;
                }
            }
            catch (Exception obj)
            {
                _exception = obj;
            }
            if (_collectionAsyncEnumerator != null)
            {
                awaiter = _collectionAsyncEnumerator.DisposeAsync().GetAwaiter();
                if (!awaiter.IsCompleted)
                {
                    _state = 1;
                    _firstCallAwaiter = awaiter;
                    MainFunctionClass stateMachine = this;
                    _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                    return;
                }
                goto GETFIRSTRUNRESULT;
            }
            goto EXCEPTIONCHECK;
        GETFIRSTRUNRESULT:
            awaiter.GetResult();
        EXCEPTIONCHECK:
            if (_exception != null)
            {
                ExceptionDispatchInfo.Capture(_exception).Throw();
            }
            _exception = null;
            _collectionAsyncEnumerator = null;
        }
        catch (Exception exception)
        {
            _state = -2;
            _builder.SetException(exception);
            return;
        }
        _state = -2;
        _builder.SetResult();
    }

    void IAsyncStateMachine.MoveNext()
    {
        //ILSpy generated this explicit interface implementation from .override directive in MoveNext
        this.MoveNext();
    }

    [DebuggerHidden]
    private void SetStateMachine(IAsyncStateMachine stateMachine)
    {
    }

    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
        this.SetStateMachine(stateMachine);
    }
}

This class MainFunctionClass also has all logic located in one method called MoveNext(). This method do 3 different things: the first one is initializing Program.GenerateSequence().GetAsyncEnumerator, the second thing is calling DisposeAsync() for deleting resource and MoveNextAsync() for calling next element. And the third thing I will call “magic” for now and that just simple call AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine) method.
Also I’ve updated existing Main method with supporting AsyncStateMachine classes and their logic.

public class Program
{
    static Task Main(string[] args)
    {
        MainFunctionClass stateMachine = new MainFunctionClass();
        stateMachine._builder = AsyncTaskMethodBuilder.Create();
        stateMachine._state = -1;
        AsyncTaskMethodBuilder builder = stateMachine._builder;
        builder.Start(ref stateMachine);
        return stateMachine._builder.Task;
    }

    [AsyncIteratorStateMachine(typeof(GenerateSequenceMethod))]
    public static IAsyncEnumerable<int> GenerateSequence()
    {
        return new GenerateSequenceMethod(-2);
    }
}

It’s high time to deep dive into this code to understand what happens here under the hood.

MainFunctionClass stateMachine = new MainFunctionClass();
stateMachine._builder = AsyncTaskMethodBuilder.Create();
stateMachine._state = -1;

In the first line we have created state machine for our await foreach loop. If you does not have much experience with state machine pattern, probably it would be better to you understand this part as design pattern “state”. All this logic generated by compiler can be refactored to State pattern (GOF patterns). The second line of code we just create struct AsyncTaskMethodBuilder.

/// <summary>Initializes a new <see cref="AsyncTaskMethodBuilder"/>.</summary>
/// <returns>The initialized <see cref="AsyncTaskMethodBuilder"/>.</returns>
public static AsyncTaskMethodBuilder Create()
{
    return default(AsyncTaskMethodBuilder);
    // Note: If ATMB<T>.Create is modified to do any initialization, this
    //       method needs to be updated to do m_builder = ATMB<T>.Create().

}

I’ve copied this source code from referencesource.microsoft.com for show you that’s simple factory method for creating AsyncTaskMethodBuilder. There are two similar types which helps you to work with asynchronous stream. The first one AsyncTaskMethodBuilder you will see above (you can consider this class as a manger and a wrapper under Task and Task<Result>) and AsyncVoidMethodBuilder provides a builder for asynchronous methods that return void. The next line of source code run our state machine. For better understanding these two classes generated by compiler when you use structure such as async void (AsyncVoidMethodBuilder) or async Task  (AsyncTaskMethodBuilder) or async Task<T> (AsyncTaskMethodBuilder<T>)

builder.Start(ref stateMachine);

Below you will find Start method from AsyncTaskMethodBuilder

/// <summary>Initiates the builder's execution with the associated state machine.</summary>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="stateMachine">The state machine instance, passed by reference.</param>
[SecuritySafeCritical]
[DebuggerStepThrough]

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    // See comment on AsyncMethodBuilderCore.Start
    // AsyncMethodBuilderCore.Start(ref stateMachine);
    if (stateMachine == null) throw new ArgumentNullException("stateMachine");
    Contract.EndContractBlock();

    // Run the MoveNext method within a copy-on-write ExecutionContext scope.
    // This allows us to undo any ExecutionContext changes made in MoveNext,
    // so that they won't "leak" out of the first await.

    ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
    RuntimeHelpers.PrepareConstrainedRegions();

    try
    {
        ExecutionContext.EstablishCopyOnWriteScope(ref ecs);
        stateMachine.MoveNext();
    }
    finally
    {
        ecs.Undo();
    }

}

Therefore, the main idea of this method just runs the MoveNext() method within a copy-on-write ExecutionContext scope. If we translate this code to our logic described above, that would be something like stateMachine.MoveNext() call for MainFunctionClass instance. Just pay attention that initial _state for out MainFunctionClass is stetted to -1.

stateMachine._state = -1;

For better understanding what happens for the first call I will left only logic which would be enough for understanding and do not distract your attention.

private void MoveNext()
{
    int num = _state;

    if (num != 0)
    {
        _collectionAsyncEnumerator = Program.GenerateSequence().GetAsyncEnumerator(default(CancellationToken));
        _exception = null;
    }

    ValueTaskAwaiter<bool>  awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
    if (!awaiter2.IsCompleted)
    {
        _state = 0;
        _taskAwaiter = awaiter2;
        MainFunctionClass stateMachine = this;
        _builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
        return;
    }
}

I’ve removed GetResult() call from the first run because in original code we have Task.Delay(100).GetAwaiter() and it is not possible to get true after calls MoveNextAsync() for GenerateSequenceMethod state machine

ValueTaskAwaiter<bool>  awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
    if (!awaiter2.IsCompleted)

This part has been removed because when we have the first run for our state machine, this code does not execute

GETFIRSTRUNRESULT:
     awaiter.GetResult();

So, I would suggest leaving as minimum code as possible for better understanding the whole picture. Because this is first run, we called GetAsyncEnumerator() (Return IAsyncEnumerator<int> interface) for GenerateSequenceMethod class.

IAsyncEnumerator<int> IAsyncEnumerable<int>.GetAsyncEnumerator(CancellationToken cancellationToken)
{
    GenerateSequenceMethod result;
    if (_state == -2 && _initialThreadId == Environment.CurrentManagedThreadId)
    {
        _state = -3;
        result = this;
        _disposeMode = false;
    }
    else
    {
        result = new GenerateSequenceMethod(-3);
    }
    return result;
}

Because for the first run _state filed is assigned to -2 and _initialThreadId == Environment.CurrentManagedThreadId this code runs if block and return as result this (binding for himself) and current _state was changed to -3. We have such result because GenerateSequenceMethod has only one constructor with parameter state

public GenerateSequenceMethod(int state)
{
    _state = state;
    _initialThreadId = Environment.CurrentManagedThreadId;
    _builder = AsyncIteratorMethodBuilder.Create();
}

And the second reason why we have -2 as state parameter, because our original call runs this function like Program.GenerateSequence(). Below you will find how this function has been generated in our code.

[AsyncIteratorStateMachine(typeof(GenerateSequenceMethod))]
public static IAsyncEnumerable<int> GenerateSequence()
{
    return new GenerateSequenceMethod(-2);
}

Let’s continue with current logic and take a look more precise to the next code.

ValueTaskAwaiter<bool>  awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
    if (!awaiter2.IsCompleted)
    {
        _state = 0;
        _taskAwaiter = awaiter2;
        MainFunctionClass stateMachine = this;
        _builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
        return;
    }

This lines called MoveNextAsync() method from GenerateSequenceMethod class.

ValueTask<bool> IAsyncEnumerator<int>.MoveNextAsync()
{
    if (_state == -2)
    {
        return default(ValueTask<bool>);
    }

    _promiseOfValueOrEnd.Reset();
    GenerateSequenceMethod stateMachine = this;
    _builder.MoveNext(ref stateMachine);
    short version = _promiseOfValueOrEnd.Version;
    if (_promiseOfValueOrEnd.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
    {
        return new ValueTask<bool>(_promiseOfValueOrEnd.GetResult(version));
    }
    return new ValueTask<bool>(this, version);
}

Here is the total new functionality, which has been added in .net framework 4.8 and .net core 3.0. Partially you are already see some part of this logic like this in Main function.

GenerateSequenceMethod stateMachine = this;
_builder.MoveNext(ref stateMachine);

We also use here two new structures. Structure ValueTask<bool> for performance improvement and ManualResetValueTaskSourceCore<bool> (promiseOfValueOrEnd variable) for improving works with Task<TResult>. Under the hood, this class ManualResetValueTaskSourceCore<bool> hides logic related to Tasks like

private void InvokeContinuation()
{
    switch (_capturedContext)
    {
        case null:
            if (RunContinuationsAsynchronously)
            {
                Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
            }
            else
            {
                _continuation(_continuationState);
            }
            break;
        case SynchronizationContext sc:
            sc.Post(s =>
            {
                var state = (Tuple<Action<object>, object>)s;
                state.Item1(state.Item2);
            }, Tuple.Create(_continuation, _continuationState));
            break;
        case TaskScheduler ts:
            Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
            break;
    }

}

Also this structure implement mechanism something like implemented in class TaskCompletionSource<TResult>.

public static Task<Socket> AcceptAsync(this Socket socket)
{
    if (socket == null)
        throw new ArgumentNullException("socket");

    var tcs = new TaskCompletionSource<Socket>();

    socket.BeginAccept(asyncResult =>
    {
        try
        {
            var s = asyncResult.AsyncState as Socket;
            var client = s.EndAccept(asyncResult);

            tcs.SetResult(client);
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
        }

    }, socket);

    return tcs.Task;
}

The different between ManualResetValueTaskSourceCore<bool>  and TaskCompletionSource<TResult> that  ManualResetValueTaskSourceCore has additional logic to validation current status of your task execution.
So, we already described about ManualResetValueTaskSourceCore and slightly explained why this need and what the reason of using this structure. I would suggest proceeding to dig deeper our code. 

GenerateSequenceMethod stateMachine = this;
builder.MoveNext(ref stateMachine);

Here the compiler runs another MoveNext() method from GenerateSequenceMethod class. Below you will find how this code were generated by .net compiler.  I’ve just refactored this code slightly for better understanding how it works.


private void MoveNext()
{
    int num = _state;
    try
    {
        TaskAwaiter awaiter;
        switch (num)
        {
            default:
                if (!_disposeMode)
                {
                    _state = -1;
                    _number = 0;
                    goto LOOP;
                }
                goto ENDTRYBLOCK;
            case 0:
                awaiter = _taskAwaiter;
                _taskAwaiter = default(TaskAwaiter);
                _state = -1;
                break;
            case -4:
            {
                    _state = -1;
                    if (!_disposeMode)
                    {
                        _number++;
                        goto LOOP;
                    }
                    goto ENDTRYBLOCK;
                }
                LOOP:
                if (_number < 20)
                {
                    awaiter = Task.Delay(100).GetAwaiter();
                    if (!awaiter.IsCompleted)
                    {
                        _state = 0;
                        _taskAwaiter = awaiter;
                        GenerateSequenceMethod stateMachine = this;
                        _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                    break;
                }
                goto ENDTRYBLOCK;
        }
        awaiter.GetResult();
        _current = _number;
        _state = -4;
        goto COMPLETED;
        ENDTRYBLOCK:;
    }
    catch (Exception exception)
    {
        _state = -2;
        _promiseOfValueOrEnd.SetException(exception);
        return;
    }
    _state = -2;
    _promiseOfValueOrEnd.SetResult(false);
    return;
    COMPLETED:
    _promiseOfValueOrEnd.SetResult(true);

}

I will left for you only part which runs for the first call.


private void MoveNext()
{
    TaskAwaiter awaiter;
    _state = -1;
    _number = 0;

    if (_number < 20)
    {
        awaiter = Task.Delay(100).GetAwaiter();
        if (!awaiter.IsCompleted)
        {
            _state = 0;
            _taskAwaiter = awaiter;
            GenerateSequenceMethod stateMachine = this;
            _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
            return;
        }
    }

}

In original MoveNext() the compiler runs switch(-1) block where -1 is our default _state type. We set our number equal to 0, get awaiter for Task.Delay(100) call and enter into if (!awaiter.IsCompleted) block. Here we changed our _state to 0 and call this magic function AwaitUnsafeOnCompleted

/// <summary>
/// Schedules the specified state machine to be pushed forward when the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">Specifies the type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
[SecuritySafeCritical]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
    ref TAwaiter awaiter, ref TStateMachine stateMachine)
    where TAwaiter : ICriticalNotifyCompletion
    where TStateMachine : IAsyncStateMachine
{
    try
    {
        AsyncMethodBuilderCore.MoveNextRunner runnerToInitialize = null;
        var continuation = m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runnerToInitialize);
        Contract.Assert(continuation != null, "GetCompletionAction should always return a valid action.");

        // If this is our first await, such that we've not yet boxed the state machine, do so now.
        if (m_coreState.m_stateMachine == null)
        {
            // Force the Task to be initialized prior to the first suspending await so
            // that the original stack-based builder has a reference to the right Task.
            var builtTask = this.Task;
            // Box the state machine, then tell the boxed instance to call back into its own builder,
            // so we can cache the boxed reference.
            Contract.Assert(!Object.ReferenceEquals((object)stateMachine, (object)stateMachine), "Expected an unboxed state machine reference");
            m_coreState.PostBoxInitialization(stateMachine, runnerToInitialize, builtTask);
        }
        awaiter.UnsafeOnCompleted(continuation);
    }
    catch (Exception e)
    {
        AsyncMethodBuilderCore.ThrowAsync(e, targetContext: null);
    }

}

Here we can see that .net compiler relies indirectly on AsyncMethodBuilderCore, and this does cause the execution context to flow, through its MoveNextRunner helper class. This manages to do it, in this case, more efficiently, by only storing a single reference to the execution context, rather than one reference for each continuation.


/// <summary>Provides the ability to invoke a state machine's MoveNext method under a supplied ExecutionContext.</summary>
internal sealed class MoveNextRunner
{
    /// <summary>The context with which to run MoveNext.</summary>
    private readonly ExecutionContext m_context;
    /// <summary>The state machine whose MoveNext method should be invoked.</summary>
    internal IAsyncStateMachine m_stateMachine;

    /// <summary>Initializes the runner.</summary>
    /// <param name="context">The context with which to run MoveNext.</param>
    [SecurityCritical] // Run needs to be SSC to map to Action delegate, so to prevent misuse, we only allow construction through SC
    internal MoveNextRunner(ExecutionContext context, IAsyncStateMachine stateMachine)
    {
        m_context = context;
        m_stateMachine = stateMachine;
    }

    /// <summary>Invokes MoveNext under the provided context.</summary>
    [SecuritySafeCritical]
    internal void Run()
    {
        Contract.Assert(m_stateMachine != null, "The state machine must have been set before calling Run.");

        if (m_context != null)
        {
            try
            {
                // Get the callback, lazily initializing it as necessary
                ContextCallback callback = s_invokeMoveNext;
                if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }

                // Use the context and callback to invoke m_stateMachine.MoveNext.
                ExecutionContext.Run(m_context, callback, m_stateMachine, preserveSyncCtx: true);
            }
            finally { m_context.Dispose(); }
        }
        else
        {
            m_stateMachine.MoveNext();
        }
    }

    /// <summary>Cached delegate used with ExecutionContext.Run.</summary>
    [SecurityCritical]
    private static ContextCallback s_invokeMoveNext; // lazily-initialized due to SecurityCritical attribution

    /// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
    /// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
    [SecurityCritical] // necessary for ContextCallback in CoreCLR
    private static void InvokeMoveNext(object stateMachine)
    {
        ((IAsyncStateMachine)stateMachine).MoveNext();
    }

}

Just look on this MoveNextRunner helper class. In constructor we save the context with which to run MoveNext() method and we save stateMachine. (Related to our example it is GenerateSequenceMethod or MainFunctionClass). This helper class has only one method called Run(), and what this method does it just simple call MoveNext() for our state machine depending on executing context.


/// <summary>Invokes MoveNext under the provided context.</summary>
[SecuritySafeCritical]
internal void Run()
{
    Contract.Assert(m_stateMachine != null, "The state machine must have been set before calling Run.");

    if (m_context != null)
    {
        try
        {
            // Get the callback, lazily initializing it as necessary
            ContextCallback callback = s_invokeMoveNext;
            if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }

            // Use the context and callback to invoke m_stateMachine.MoveNext.
            ExecutionContext.Run(m_context, callback, m_stateMachine, preserveSyncCtx: true);
        }
        finally { m_context.Dispose(); }
    }
    else
    {
        m_stateMachine.MoveNext();
    }

}

So, let me explain what happens her in simple words. Depending on how we run our code in sync or async mode, we will call directly m_stateMachine.MoveNext(); in sync mode, and in async mode we will bind our code to executing context and call method InvokeMoveNext()


/// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
/// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
[SecurityCritical] // necessary for ContextCallback in CoreCLR
private static void InvokeMoveNext(object stateMachine)
{
    ((IAsyncStateMachine)stateMachine).MoveNext();

}

I suggest to take a look one more time to original AwaitUnsafeOnCompleted call.

GenerateSequenceMethod stateMachine = this;
builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);

So, I think after such detailed digging into this method we have finally understand how binds our state machine to executing context and how called MoveNext() method in callback in async mode. Interesting magic I think and I like this implementation. So, for our case let’s imaging that we have got this callback stateMachine.MoveNext() let’s see what happened next in GenerateSequenceMethod instance.

private void MoveNext()
{
    int num = _state;
    try
    {
        TaskAwaiter awaiter;
        switch (num)
        {
            case 0:
                awaiter = _taskAwaiter;
                _taskAwaiter = default(TaskAwaiter);
                _state = -1;
                break;
        }
        awaiter.GetResult();
        _current = _number;
        _state = -4;
        goto COMPLETED;
    }
    catch (Exception exception)
    {
        _state = -2;
        _promiseOfValueOrEnd.SetException(exception);
        return;
    }
    COMPLETED:
    _promiseOfValueOrEnd.SetResult(true);
}

Because _state variable has been changed to 0 in scope of block if (!awaiter.IsCompleted) (previous step), we called switch(0) statement.  Here we assign for_taskAwaiter default value and after that, we have break logic and called statement after switch block.

awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;

So here, we assign _number to _current field. In or case because we are working with sequence of int, so we have number here (in our case 0). In you example it can be anything, because IAsyncEnumerator<out T> is generic interface

int IAsyncEnumerator<int>.Current
{
    [DebuggerHidden]
    get
    {
        return _current;
    }
}

Also the compiler assign here _state to -4 for navigation to LOOP label in our next MoveNext() action call. And we notified our upper call (MainFunctionClass instance) that we are done with result (see the code sample below to understand what compiler did).

COMPLETED:
    _promiseOfValueOrEnd.SetResult(true);

I would suggest to finish with this _state = -4 and take a look what happens with next MoveNext() action call before we switching back to MainFunctionClass.MoveNext() function.

private void MoveNext()
{

    TaskAwaiter awaiter;
    _state = -1;
    if (!_disposeMode)
    {
        _number++; //increment our value here
    }
    if (_number < 20)
    {
        awaiter = Task.Delay(100).GetAwaiter();
        if (!awaiter.IsCompleted)
        {
            _state = 0;
            _taskAwaiter = awaiter;
            GenerateSequenceMethod stateMachine = this;
            _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
            return;
        }
    }
}

Here in code I have removed switch (-4) block and just left which construction will be called.  So, our “magic” function  AwaitUnsafeOnComple called the same block code

awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;

because when we get inside if (!awaiter.IsCompleted) logic, we always change our _state variable to 0. The code above will run while this if (_number < 20) clock return false. Then compiler calls this last part

_state = -2;
_promiseOfValueOrEnd.SetResult(false);
return;

This code let the upper call MainFunctionClass.MoveNext() knows that the iteration is over. So, let come back to MainFunctionClass.MoveNext() and take a look what happens there. Because we already described how MoveNext() method works, I think you will understand what happens here without any difficulties.

if (num != 0)
    {
        goto GETNEXTAWAITER;
    }
    ValueTaskAwaiter<bool> awaiter2 = _taskAwaiter;
    _taskAwaiter = default(ValueTaskAwaiter<bool>);
    _state = -1;
    goto GETRESULT;
GETNEXTAWAITER:
    awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
    if (!awaiter2.IsCompleted)
    {
        _state = 0;
        _taskAwaiter = awaiter2;
        MainFunctionClass stateMachine = this;
        _builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
        return;
    }
GETRESULT:
     _checkResult = awaiter2.GetResult();
    if (_checkResult)
    {
        _number = _collectionAsyncEnumerator.Current;
        Console.WriteLine(_number);
        goto GETNEXTAWAITER;
    }

The same trick uses here with changing _state field to 0;

if (!awaiter2.IsCompleted)
    {
        _state = 0;
        _taskAwaiter = awaiter2;
        MainFunctionClass stateMachine = this;
        _builder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
        return;
    }

And you already know about “magic” function AwaitUnsafeOnCompleted. Current state machine life-circle looks like described below:
  1. Set up _state to 0
  2. Call AwaitUnsafeOnCompleted function
  3. Callback for MoveNext() function
  4. Change _state to -1
  5. Navigate to GETRESULT label
  6. Save result to _checkResult variable (this result compiler get from function  _promiseOfValueOrEnd.GetResult() in class GenerateSequenceMethod)
  7.  Call WriteLine function for Current element
  8.  Navigate to next MoveNextAsync() calls

Is it very complicate to understand what happens there under the hood? I do not think so. In additional I want to show you final sequence diagram with our state machines. I will left only important things for better understanding.

Conclusions:
This is one of the greatest things which Microsoft has been implemented in C# 8. I hope, you will use these asynchronous steams\iterators in your application. And I expect more that my article helps you to understand how these async streams works under the hood.

No comments:

Post a Comment