You would probably hear about asynchronous stream in
C# 8? If no, today I am going to show you how this new feature in C# 8.0 works
under the hood. Starting with C# 8.0, you can create and consume streams
asynchronously. A method that returns an asynchronous stream has three
properties:
- It's declared with the async modifier.
- It returns an IAsyncEnumerable<T>.
- The method contains yield return statements to return successive elements in the asynchronous stream.
Below
you will see the example from What's
new in C# 8.0 - Asynchronous streams.
For reproducing this functionality, please run Visual Studio 2019 and create a
new Console App (.NET Core)
After
that, replace existing Main method to the code below.
static async Task Main(string[] args)
{
await foreach (var
number in GenerateSequence())
{
Console.WriteLine(number);
}
}
public static async
System.Collections.Generic.IAsyncEnumerable<int>
GenerateSequence()
{
for (int i = 0; i
< 20; i++)
{
await
Task.Delay(100);
yield return i;
}
}
You
can run this example and see how following code generates a sequence from 0 to
19, waiting 100 ms between generating each number:
Async
streams rely on new interfaces introduced in .NET Standard 2.1 and this is a
new killer feature from Microsoft which was proposes to be in C# 7, however
because implementing this feature required a lot of changes in .net framework,
these final changes were implemented in .NET Core 3.0. If you want to try this
new feature, you need to setup some prerequisites:
You’ll
need to set up your machine to run .NET Core, including the C# 8.0 compiler.
The C# 8 compiler is available starting with Visual
Studio 2019 version 16.3 or .NET
Core 3.0 SDK.
Fur
supporting asynchronous
streams Microsoft added IAsyncEnumerable<out T> and
IAsyncEnumerator<out T> . The code that generates the sequence can now
use yield return to return elements
in a method that was declared with the async
modifier. You can consume an async stream using an await foreach loop just as you consume any sequence using a foreach loop. In addition, they added a
new interface IAsyncDisposable
for implementing disposable pattern for async streams.
namespace
System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T>
GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> :
IAsyncDisposable
{
T Current { get; }
ValueTask<bool>
MoveNextAsync();
}
}
namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
These
three interfaces should be familiar to most C# developers. They behave in a
manner similar to their synchronous counterparts:
From
all this structure probably only one type will be new for you. This ValueTask
Struct was added to .NET Core for
performance reasons. The ValueTask is a structure that can wrap either a Task
or a IValueTaskSource
instance. If you want to read about performance issue and optimization with
this new ValueTask type, I would suggest reading this article Understanding
the Whys, Whats, and Whens of ValueTask.
So
let’s proceed with these new interfaces. Under the hood .net core compiler
generates very similar functionality as it generates for async\await for Tasks.
(Additional information how async\await works you can find in my article Async
и await в C# 6.0).
For
generating IL code and check how this code works under the hood, I have used
tool called dotnet-ildasm. For install, this tool you can run command:
dotnet tool install -g dotnet-ildasm
After
that, you can generate IL code as in example bellow:
dotnet ildasm ConsoleAppTestCosmosDb.dll -o
disassembledAssembly.il
After
running this tool, we can find expected result in file disassembledAssembly.il.
To tell the truth when you see this file the first time, it is not possible to
understand how does it work. I am not sure that adding all generated IL code here
would be reasonable, because .net core 3.0 for our C# code in 20 lines
generates ….. 725 lines of source code. Here I will provide only tiny piece of
this code.
.class nested private auto ansi sealed
beforefieldinit<GenerateSequence> d__1 extends[System.Runtime] System.Object
implements genericinstance, genericinstance,
[System.Runtime]System.IAsyncDisposable, genericinstance,
[System.Runtime]System.Threading.Tasks.Sources.IValueTaskSource,
[System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine
{
.field public int32 '<>1__state'
.field public
[System.Threading.Tasks]
System.Runtime.CompilerServices.AsyncIteratorMethodBuilder '<>t__builder'
.field public
genericinstance '<>v__promiseOfValueOrEnd'
.field private int32 '<>2__current'
.field private boolean '<>w__disposeMode'
.field private int32 '<>l__initialThreadId'
.field private int32 '<i>5__1'
.field private
[System.Runtime] System.Runtime.CompilerServices.TaskAwaiter '<>u__1'
.method public hidebysig
specialname rtspecialname instance void .ctor(int32<>1__state)
cil managed
{
// Code size 37
.maxstack 8
IL_0000: ldarg.0
IL_0001: call instance void
[System.Runtime]
System.Object::.ctor()
IL_0006: nop
IL_0007: ldarg.0
IL_0008: ldarg.1
IL_0009: stfld int32
ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>1__state'
IL_000e: ldarg.0
IL_000f: call int32
[System.Runtime.Extensions]
System.Environment::get_CurrentManagedThreadId()
IL_0014: stfld int32
ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>l__initialThreadId'
IL_0019: ldarg.0
IL_001a: call [System.Threading.Tasks]
System.Runtime.CompilerServices.AsyncIteratorMethodBuilder
[System.Threading.Tasks]
System.Runtime.CompilerServices.AsyncIteratorMethodBuilder::Create()
IL_001f: stfld [System.Threading.Tasks]
System.Runtime.CompilerServices.AsyncIteratorMethodBuilder
ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>t__builder'
IL_0024: ret
} // End of
method System.Void
ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::.ctor(System.Int32)
.method private hidebysig
newslot virtual final instance void MoveNext() cil
managed
{
// Code size 323
.maxstack 3
.locals init(int32 V_0, [System.Runtime]
System.Runtime.CompilerServices.TaskAwaiter
V_1, ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1 V_2, int32
V_3, boolean V_4, [System.Runtime]
System.Exception V_5)
IL_0000: ldarg.0
IL_0001: ldfld int32
ConsoleAppTestCosmosDb.Program/<GenerateSequence>d__1::'<>1__state'
IL_0006: stloc.0
.try
{
.Net
core generated two IAsyncStateMachine classes for Main and GenerateSequence
function.
.class nested private auto
ansi sealed
beforefieldinit<Main> d__0 extends[System.Runtime] System.Object
implements[System.Runtime] System.Runtime.CompilerServices.IAsyncStateMachine
.class nested private auto
ansi sealed
beforefieldinit<GenerateSequence> d__1 extends[System.Runtime]
System.Object implements genericinstance, genericinstance,
[System.Runtime]System.IAsyncDisposable, genericinstance,
[System.Runtime]System.Threading.Tasks.Sources.IValueTaskSource,
[System.Runtime]System.Runtime.CompilerServices.IAsyncStateMachine
So,
if you see this code still not possible to read. For better understanding how
it generates code, I have translated this IL code to C# and refactored existing
code for better understanding. We have
here two state machines: the first one was generated for GenerateSequence
method and the second one was generated for Main method (await foreach
loop). We will start from the first method GenerateSequence.
[CompilerGenerated]
public sealed class GenerateSequenceMethod :
IAsyncEnumerable<int>, IAsyncEnumerator<int>,
IAsyncDisposable, IValueTaskSource<bool>,
IValueTaskSource, IAsyncStateMachine
{
public int _state;
public
AsyncIteratorMethodBuilder _builder;
public
ManualResetValueTaskSourceCore<bool>
_promiseOfValueOrEnd;
private int _current;
private bool
_disposeMode;
private int
_initialThreadId;
private int _number;
private TaskAwaiter
_taskAwaiter;
[DebuggerHidden]
public GenerateSequenceMethod(int state)
{
_state = state;
_initialThreadId =
Environment.CurrentManagedThreadId;
_builder =
AsyncIteratorMethodBuilder.Create();
}
int IAsyncEnumerator<int>.Current
{
[DebuggerHidden]
get
{
return _current;
}
}
private void MoveNext()
{
int num =
_state;
try
{
TaskAwaiter awaiter;
switch (num)
{
default:
if
(!_disposeMode)
{
_state = -1;
_number = 0;
goto LOOP;
}
goto
ENDTRYBLOCK;
case 0:
awaiter = _taskAwaiter;
_taskAwaiter = default(TaskAwaiter);
_state = -1;
break;
case -4:
{
_state = -1;
if (!_disposeMode)
{
_number++;
goto LOOP;
}
goto
ENDTRYBLOCK;
}
LOOP:
if (_number <
20)
{
awaiter =
Task.Delay(100).GetAwaiter();
if
(!awaiter.IsCompleted)
{
_state = 0;
_taskAwaiter =
awaiter;
GenerateSequenceMethod stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref
stateMachine);
return;
}
break;
}
goto
ENDTRYBLOCK;
}
awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;
ENDTRYBLOCK:;
}
catch (Exception
exception)
{
_state = -2;
_promiseOfValueOrEnd.SetException(exception);
return;
}
_state = -2;
_promiseOfValueOrEnd.SetResult(false);
return;
COMPLETED:
_promiseOfValueOrEnd.SetResult(true);
}
void
IAsyncStateMachine.MoveNext()
{
//ILSpy generated this
explicit interface implementation from .override directive in MoveNext
this.MoveNext();
}
private void
SetStateMachine(IAsyncStateMachine stateMachine)
{
}
void
IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
//ILSpy generated this
explicit interface implementation from .override directive in SetStateMachine
this.SetStateMachine(stateMachine);
}
IAsyncEnumerator<int>
IAsyncEnumerable<int>.GetAsyncEnumerator(CancellationToken
cancellationToken)
{
GenerateSequenceMethod result;
if (_state ==
-2 && _initialThreadId == Environment.CurrentManagedThreadId)
{
_state = -3;
result = this;
_disposeMode = false;
}
else
{
result = new
GenerateSequenceMethod(-3);
}
return result;
}
ValueTask<bool>
IAsyncEnumerator<int>.MoveNextAsync()
{
if (_state ==
-2)
{
return default(ValueTask<bool>);
}
_promiseOfValueOrEnd.Reset();
GenerateSequenceMethod stateMachine = this;
_builder.MoveNext(ref
stateMachine);
short version =
_promiseOfValueOrEnd.Version;
if
(_promiseOfValueOrEnd.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
{
return new
ValueTask<bool>(_promiseOfValueOrEnd.GetResult(version));
}
return new
ValueTask<bool>(this, version);
}
bool IValueTaskSource<bool>.GetResult(short token)
{
return
_promiseOfValueOrEnd.GetResult(token);
}
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
{
return
_promiseOfValueOrEnd.GetStatus(token);
}
void IValueTaskSource<bool>.OnCompleted(Action<object>
continuation, object state, short token, ValueTaskSourceOnCompletedFlags
flags)
{
_promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags);
}
void
IValueTaskSource.GetResult(short token)
{
_promiseOfValueOrEnd.GetResult(token);
}
ValueTaskSourceStatus
IValueTaskSource.GetStatus(short token)
{
return
_promiseOfValueOrEnd.GetStatus(token);
}
void
IValueTaskSource.OnCompleted(Action<object>
continuation, object state, short token, ValueTaskSourceOnCompletedFlags
flags)
{
_promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags);
}
ValueTask IAsyncDisposable.DisposeAsync()
{
if (_state
>= -1)
{
throw new
NotSupportedException();
}
if (_state ==
-2)
{
return default(ValueTask);
}
_disposeMode = true;
_promiseOfValueOrEnd.Reset();
GenerateSequenceMethod stateMachine = this;
_builder.MoveNext(ref
stateMachine);
return new ValueTask(this,
_promiseOfValueOrEnd.Version);
}
}
This
class GenerateSequenceMethod is no so big and it is not hard to read, because
90% of this code was implemented in function MoveNext(). This MoveNext()
function implements for loop with jump table (state table). This code
also has some magic code with calling function AwaitUnsafeOnCompleted. I
will describe what this function do when I describe current state manager and
it works.
So,
the next logic was generated for Main function (await foreach)
and I’ve decided to rename this class to MainFunctionClass (sorry about
this terrible naming convention) and refactor existing class such as removing
redundant code and provide appropriate naming convention for better
understanding.
public sealed class MainFunctionClass :
IAsyncStateMachine
{
public int _state;
public
AsyncTaskMethodBuilder _builder;
private IAsyncEnumerator<int> _collectionAsyncEnumerator;
private Exception _exception;
private int _number;
private bool
_checkResult;
private ValueTaskAwaiter<bool>
_taskAwaiter;
private ValueTaskAwaiter
_firstCallAwaiter;
private static int i = 0;
private void MoveNext()
{
Console.WriteLine($"MoveNext
Calls {i++}");
int num =
_state;
try
{
ValueTaskAwaiter awaiter;
if (num != 0)
{
if (num == 1)
{
awaiter = _firstCallAwaiter;
_firstCallAwaiter = default(ValueTaskAwaiter);
_state = -1;
goto
GETFIRSTRUNRESULT;
}
_collectionAsyncEnumerator =
Program.GenerateSequence().GetAsyncEnumerator(default(CancellationToken));
_exception = null;
}
try
{
if (num != 0)
{
goto
GETNEXTAWAITER;
}
ValueTaskAwaiter<bool>
awaiter2 = _taskAwaiter;
_taskAwaiter = default(ValueTaskAwaiter<bool>);
_state = -1;
goto GETRESULT;
GETNEXTAWAITER:
awaiter2 =
_collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
if
(!awaiter2.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter2;
MainFunctionClass
stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter2, ref
stateMachine);
return;
}
GETRESULT:
_checkResult =
awaiter2.GetResult();
if
(_checkResult)
{
_number =
_collectionAsyncEnumerator.Current;
Console.WriteLine(_number);
goto
GETNEXTAWAITER;
}
}
catch (Exception
obj)
{
_exception = obj;
}
if
(_collectionAsyncEnumerator != null)
{
awaiter =
_collectionAsyncEnumerator.DisposeAsync().GetAwaiter();
if
(!awaiter.IsCompleted)
{
_state = 1;
_firstCallAwaiter =
awaiter;
MainFunctionClass stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref
stateMachine);
return;
}
goto
GETFIRSTRUNRESULT;
}
goto EXCEPTIONCHECK;
GETFIRSTRUNRESULT:
awaiter.GetResult();
EXCEPTIONCHECK:
if
(_exception != null)
{
ExceptionDispatchInfo.Capture(_exception).Throw();
}
_exception = null;
_collectionAsyncEnumerator = null;
}
catch (Exception
exception)
{
_state = -2;
_builder.SetException(exception);
return;
}
_state = -2;
_builder.SetResult();
}
void
IAsyncStateMachine.MoveNext()
{
//ILSpy generated this
explicit interface implementation from .override directive in MoveNext
this.MoveNext();
}
[DebuggerHidden]
private void
SetStateMachine(IAsyncStateMachine stateMachine)
{
}
void
IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
//ILSpy generated this
explicit interface implementation from .override directive in SetStateMachine
this.SetStateMachine(stateMachine);
}
}
This
class MainFunctionClass also has all logic located in one method called MoveNext().
This method do 3 different things: the first one is initializing Program.GenerateSequence().GetAsyncEnumerator,
the second thing is calling DisposeAsync() for deleting resource and
MoveNextAsync() for calling next element. And the third thing I will call
“magic” for now and that just simple call AwaitUnsafeOnCompleted(ref awaiter,
ref stateMachine) method.
Also
I’ve updated existing Main method with supporting AsyncStateMachine classes and
their logic.
public class Program
{
static Task Main(string[] args)
{
MainFunctionClass stateMachine = new
MainFunctionClass();
stateMachine._builder =
AsyncTaskMethodBuilder.Create();
stateMachine._state = -1;
AsyncTaskMethodBuilder builder =
stateMachine._builder;
builder.Start(ref
stateMachine);
return
stateMachine._builder.Task;
}
[AsyncIteratorStateMachine(typeof(GenerateSequenceMethod))]
public static
IAsyncEnumerable<int> GenerateSequence()
{
return new
GenerateSequenceMethod(-2);
}
}
It’s
high time to deep dive into this code to understand what happens here under the
hood.
MainFunctionClass
stateMachine = new MainFunctionClass();
stateMachine._builder
= AsyncTaskMethodBuilder.Create();
stateMachine._state =
-1;
In
the first line we have created state machine for our await foreach
loop. If you does not have much experience with state machine pattern, probably
it would be better to you understand this part as design pattern “state”. All
this logic generated by compiler can be refactored to State pattern (GOF patterns). The second line of code we just
create struct AsyncTaskMethodBuilder.
/// <summary>Initializes a new <see cref="AsyncTaskMethodBuilder"/>.</summary>
/// <returns>The initialized <see cref="AsyncTaskMethodBuilder"/>.</returns>
public static
AsyncTaskMethodBuilder Create()
{
return default(AsyncTaskMethodBuilder);
// Note: If
ATMB<T>.Create is modified to do any initialization, this
// method needs to be updated to do
m_builder = ATMB<T>.Create().
}
I’ve
copied this source code from referencesource.microsoft.com
for show you that’s simple factory method for creating AsyncTaskMethodBuilder.
There are two similar types which helps you to work with asynchronous stream.
The first one AsyncTaskMethodBuilder you will see above (you can consider this
class as a manger and a wrapper under Task and Task<Result>) and AsyncVoidMethodBuilder
provides a builder for asynchronous methods that return void. The next line of
source code run our state machine. For better understanding these two classes
generated by compiler when you use structure such as async void (AsyncVoidMethodBuilder)
or async Task (AsyncTaskMethodBuilder) or async
Task<T> (AsyncTaskMethodBuilder<T>)
builder.Start(ref stateMachine);
Below you will find Start method from AsyncTaskMethodBuilder
/// <summary>Initiates the builder's execution with the associated state machine.</summary>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="stateMachine">The state machine instance, passed by reference.</param>
[SecuritySafeCritical]
[DebuggerStepThrough]
public void
Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
// See comment
on AsyncMethodBuilderCore.Start
//
AsyncMethodBuilderCore.Start(ref stateMachine);
if (stateMachine == null) throw new ArgumentNullException("stateMachine");
Contract.EndContractBlock();
// Run the
MoveNext method within a copy-on-write ExecutionContext scope.
// This allows
us to undo any ExecutionContext changes made in MoveNext,
// so that they
won't "leak" out of the first await.
ExecutionContextSwitcher ecs = default(ExecutionContextSwitcher);
RuntimeHelpers.PrepareConstrainedRegions();
try
{
ExecutionContext.EstablishCopyOnWriteScope(ref ecs);
stateMachine.MoveNext();
}
finally
{
ecs.Undo();
}
}
Therefore,
the main idea of this method just runs the MoveNext() method within a
copy-on-write ExecutionContext scope. If we translate this code to our logic
described above, that would be something like stateMachine.MoveNext() call for MainFunctionClass
instance. Just pay attention that initial _state for out MainFunctionClass
is stetted to -1.
stateMachine._state
= -1;
For
better understanding what happens for the first call I will left only logic
which would be enough for understanding and do not distract your attention.
private void MoveNext()
{
int num = _state;
if (num != 0)
{
_collectionAsyncEnumerator =
Program.GenerateSequence().GetAsyncEnumerator(default(CancellationToken));
_exception = null;
}
ValueTaskAwaiter<bool> awaiter2 =
_collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
if
(!awaiter2.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter2;
MainFunctionClass stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter2, ref
stateMachine);
return;
}
}
I’ve
removed GetResult() call from the first run because in original code we
have Task.Delay(100).GetAwaiter() and it is not possible to get true after calls MoveNextAsync() for GenerateSequenceMethod
state machine
ValueTaskAwaiter<bool> awaiter2 = _collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
if
(!awaiter2.IsCompleted)
This
part has been removed because when we have the first run for our state machine,
this code does not execute
GETFIRSTRUNRESULT:
awaiter.GetResult();
So, I would suggest leaving as minimum code as possible for better understanding the whole picture. Because this is first run, we called GetAsyncEnumerator() (Return IAsyncEnumerator<int> interface) for GenerateSequenceMethod class.
IAsyncEnumerator<int>
IAsyncEnumerable<int>.GetAsyncEnumerator(CancellationToken
cancellationToken)
{
GenerateSequenceMethod result;
if (_state == -2
&& _initialThreadId == Environment.CurrentManagedThreadId)
{
_state = -3;
result = this;
_disposeMode = false;
}
else
{
result = new
GenerateSequenceMethod(-3);
}
return result;
}
Because
for the first run _state filed is assigned to -2 and _initialThreadId ==
Environment.CurrentManagedThreadId this code runs if block and return as
result this (binding for himself) and current _state was changed to -3.
We have such result because GenerateSequenceMethod has only one constructor
with parameter state
public GenerateSequenceMethod(int state)
{
_state = state;
_initialThreadId =
Environment.CurrentManagedThreadId;
_builder = AsyncIteratorMethodBuilder.Create();
}
And
the second reason why we have -2 as state parameter, because our original
call runs this function like Program.GenerateSequence(). Below you will find
how this function has been generated in our code.
[AsyncIteratorStateMachine(typeof(GenerateSequenceMethod))]
public static
IAsyncEnumerable<int> GenerateSequence()
{
return new
GenerateSequenceMethod(-2);
}
Let’s
continue with current logic and take a look more precise to the next code.
ValueTaskAwaiter<bool> awaiter2 =
_collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
if
(!awaiter2.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter2;
MainFunctionClass stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter2, ref
stateMachine);
return;
}
This
lines called MoveNextAsync() method from GenerateSequenceMethod class.
ValueTask<bool>
IAsyncEnumerator<int>.MoveNextAsync()
{
if (_state == -2)
{
return default(ValueTask<bool>);
}
_promiseOfValueOrEnd.Reset();
GenerateSequenceMethod stateMachine = this;
_builder.MoveNext(ref
stateMachine);
if
(_promiseOfValueOrEnd.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
{
return new
ValueTask<bool>(_promiseOfValueOrEnd.GetResult(version));
}
return new
ValueTask<bool>(this, version);
}
Here
is the total new functionality, which has been added in .net framework 4.8 and
.net core 3.0. Partially you are already see some part of this logic like this in Main function.
GenerateSequenceMethod
stateMachine = this;
_builder.MoveNext(ref
stateMachine);
We
also use here two new structures. Structure ValueTask<bool> for
performance improvement and ManualResetValueTaskSourceCore<bool> (promiseOfValueOrEnd
variable) for improving works with Task<TResult>. Under the hood, this
class ManualResetValueTaskSourceCore<bool> hides logic related to Tasks
like
private void
InvokeContinuation()
{
switch (_capturedContext)
{
case null:
if (RunContinuationsAsynchronously)
{
Task.Factory.StartNew(_continuation, _continuationState,
CancellationToken.None, TaskCreationOptions.DenyChildAttach,
TaskScheduler.Default);
}
else
{
_continuation(_continuationState);
}
break;
case SynchronizationContext sc:
sc.Post(s =>
{
var state = (Tuple<Action<object>, object>)s;
state.Item1(state.Item2);
}, Tuple.Create(_continuation,
_continuationState));
break;
case TaskScheduler ts:
Task.Factory.StartNew(_continuation, _continuationState,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
break;
}
}
Also
this structure implement mechanism something like implemented in class TaskCompletionSource<TResult>.
public static
Task<Socket> AcceptAsync(this Socket socket)
{
if (socket == null)
throw new
ArgumentNullException("socket");
var tcs = new
TaskCompletionSource<Socket>();
socket.BeginAccept(asyncResult =>
{
try
{
var s =
asyncResult.AsyncState as Socket;
var client =
s.EndAccept(asyncResult);
tcs.SetResult(client);
}
catch (Exception
ex)
{
tcs.SetException(ex);
}
}, socket);
return tcs.Task;
}
The
different between ManualResetValueTaskSourceCore<bool> and TaskCompletionSource<TResult>
that ManualResetValueTaskSourceCore has
additional logic to validation current status of your task execution.
So,
we already described about ManualResetValueTaskSourceCore and slightly
explained why this need and what the reason of using this structure. I would
suggest proceeding to dig deeper
our code.
GenerateSequenceMethod
stateMachine = this;
builder.MoveNext(ref
stateMachine);
Here
the compiler runs another MoveNext() method from GenerateSequenceMethod
class. Below you will find how this code were generated by .net compiler. I’ve just refactored this code slightly for
better understanding how it works.
private void
MoveNext()
{
int num = _state;
try
{
TaskAwaiter awaiter;
switch (num)
{
default:
if (!_disposeMode)
{
_state = -1;
_number = 0;
goto LOOP;
}
goto ENDTRYBLOCK;
case 0:
awaiter = _taskAwaiter;
_taskAwaiter = default(TaskAwaiter);
_state = -1;
break;
case -4:
{
_state = -1;
if (!_disposeMode)
{
_number++;
goto LOOP;
}
goto ENDTRYBLOCK;
}
LOOP:
if (_number < 20)
{
awaiter =
Task.Delay(100).GetAwaiter();
if (!awaiter.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter;
GenerateSequenceMethod
stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref
stateMachine);
return;
}
break;
}
goto ENDTRYBLOCK;
}
awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;
ENDTRYBLOCK:;
}
catch (Exception exception)
{
_state = -2;
_promiseOfValueOrEnd.SetException(exception);
return;
}
_state = -2;
_promiseOfValueOrEnd.SetResult(false);
return;
COMPLETED:
_promiseOfValueOrEnd.SetResult(true);
}
I
will left for you only part which runs for the first call.
private void
MoveNext()
{
TaskAwaiter awaiter;
_state = -1;
_number = 0;
if (_number < 20)
{
awaiter = Task.Delay(100).GetAwaiter();
if (!awaiter.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter;
GenerateSequenceMethod stateMachine
= this;
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
return;
}
}
}
In
original MoveNext() the compiler runs switch(-1) block where -1 is our default _state type. We set our number equal to 0, get awaiter for Task.Delay(100) call and enter into if (!awaiter.IsCompleted) block. Here
we changed our _state to 0 and call this magic function AwaitUnsafeOnCompleted
/// <summary>
/// Schedules the specified state machine to be pushed forward when
the specified awaiter completes.
/// </summary>
/// <typeparam name="TAwaiter">Specifies the type of the awaiter.</typeparam>
/// <typeparam name="TStateMachine">Specifies the type of the state machine.</typeparam>
/// <param name="awaiter">The awaiter.</param>
/// <param name="stateMachine">The state machine.</param>
[SecuritySafeCritical]
public void
AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
ref TAwaiter awaiter, ref TStateMachine stateMachine)
where TAwaiter : ICriticalNotifyCompletion
where TStateMachine : IAsyncStateMachine
{
try
{
AsyncMethodBuilderCore.MoveNextRunner
runnerToInitialize = null;
var continuation = m_coreState.GetCompletionAction(AsyncCausalityTracer.LoggingOn ? this.Task : null, ref runnerToInitialize);
Contract.Assert(continuation != null, "GetCompletionAction
should always return a valid action.");
// If this is
our first await, such that we've not yet boxed the state machine, do so now.
if (m_coreState.m_stateMachine
== null)
{
// Force the
Task to be initialized prior to the first suspending await so
// that the
original stack-based builder has a reference to the right Task.
var builtTask = this.Task;
// Box the
state machine, then tell the boxed instance to call back into its own builder,
// so we can
cache the boxed reference.
Contract.Assert(!Object.ReferenceEquals((object)stateMachine, (object)stateMachine), "Expected an unboxed
state machine reference");
m_coreState.PostBoxInitialization(stateMachine, runnerToInitialize, builtTask);
}
awaiter.UnsafeOnCompleted(continuation);
}
catch (Exception e)
{
AsyncMethodBuilderCore.ThrowAsync(e, targetContext: null);
}
}
Here
we can see that .net compiler relies indirectly on AsyncMethodBuilderCore, and
this does cause the execution context to flow, through its MoveNextRunner
helper class. This manages to do it, in this case, more efficiently, by only
storing a single reference to the execution context, rather than one reference
for each continuation.
/// <summary>Provides the ability to invoke a state machine's MoveNext method
under a supplied ExecutionContext.</summary>
internal sealed class MoveNextRunner
{
/// <summary>The context with which to run MoveNext.</summary>
private readonly ExecutionContext m_context;
/// <summary>The state machine whose MoveNext method should be invoked.</summary>
internal IAsyncStateMachine m_stateMachine;
/// <summary>Initializes the runner.</summary>
/// <param name="context">The context with which to run MoveNext.</param>
[SecurityCritical] // Run needs to be SSC to map to Action delegate, so to prevent
misuse, we only allow construction through SC
internal MoveNextRunner(ExecutionContext context, IAsyncStateMachine stateMachine)
{
m_context = context;
m_stateMachine = stateMachine;
}
/// <summary>Invokes MoveNext under the provided context.</summary>
[SecuritySafeCritical]
internal void Run()
{
Contract.Assert(m_stateMachine != null, "The
state machine must have been set before calling Run.");
if (m_context != null)
{
try
{
// Get
the callback, lazily initializing it as necessary
ContextCallback callback =
s_invokeMoveNext;
if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }
// Use
the context and callback to invoke m_stateMachine.MoveNext.
ExecutionContext.Run(m_context,
callback, m_stateMachine, preserveSyncCtx: true);
}
finally { m_context.Dispose(); }
}
else
{
m_stateMachine.MoveNext();
}
}
/// <summary>Cached delegate used with ExecutionContext.Run.</summary>
[SecurityCritical]
private static
ContextCallback s_invokeMoveNext; //
lazily-initialized due to SecurityCritical attribution
/// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
/// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
[SecurityCritical] // necessary for ContextCallback in CoreCLR
private static void InvokeMoveNext(object stateMachine)
{
((IAsyncStateMachine)stateMachine).MoveNext();
}
}
Just
look on this MoveNextRunner helper class. In constructor we save the context with
which to run MoveNext() method and we save stateMachine. (Related to our
example it is GenerateSequenceMethod or MainFunctionClass). This helper class
has only one method called Run(), and what this method does it just simple call
MoveNext() for our state machine depending on executing context.
/// <summary>Invokes MoveNext under the provided context.</summary>
[SecuritySafeCritical]
internal void Run()
{
Contract.Assert(m_stateMachine != null, "The
state machine must have been set before calling Run.");
if (m_context != null)
{
try
{
// Get the
callback, lazily initializing it as necessary
ContextCallback callback =
s_invokeMoveNext;
if (callback == null) { s_invokeMoveNext = callback = InvokeMoveNext; }
// Use the
context and callback to invoke m_stateMachine.MoveNext.
ExecutionContext.Run(m_context,
callback, m_stateMachine, preserveSyncCtx: true);
}
finally { m_context.Dispose(); }
}
else
{
m_stateMachine.MoveNext();
}
}
So,
let me explain what happens her in simple words. Depending on how we run our
code in sync or async mode, we will call directly m_stateMachine.MoveNext();
in sync mode, and in async mode we will bind our code to executing context and
call method InvokeMoveNext()
/// <summary>Invokes the MoveNext method on the supplied IAsyncStateMachine.</summary>
/// <param name="stateMachine">The IAsyncStateMachine machine instance.</param>
[SecurityCritical]
// necessary for ContextCallback in CoreCLR
private static void InvokeMoveNext(object stateMachine)
{
((IAsyncStateMachine)stateMachine).MoveNext();
}
I
suggest to take a look one more time to original AwaitUnsafeOnCompleted
call.
GenerateSequenceMethod
stateMachine = this;
builder.AwaitUnsafeOnCompleted(ref awaiter, ref
stateMachine);
So,
I think after such detailed digging into this method we have finally understand
how binds our state machine to executing context and how called MoveNext()
method in callback in async mode. Interesting magic I think and I like this
implementation. So, for our case let’s imaging that we have got this callback stateMachine.MoveNext()
let’s see what happened next in GenerateSequenceMethod instance.
private void MoveNext()
{
int num = _state;
try
{
TaskAwaiter awaiter;
switch (num)
{
case 0:
awaiter = _taskAwaiter;
_taskAwaiter = default(TaskAwaiter);
_state = -1;
break;
}
awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;
}
catch (Exception exception)
{
_state = -2;
_promiseOfValueOrEnd.SetException(exception);
return;
}
COMPLETED:
_promiseOfValueOrEnd.SetResult(true);
}
Because
_state variable has been changed to
0 in scope of block if (!awaiter.IsCompleted) (previous step), we called
switch(0) statement. Here we assign for_taskAwaiter default
value and after that, we have break
logic and called statement after switch
block.
awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;
So
here, we assign _number to _current field. In or case because we
are working with sequence of int, so we have number here (in our case 0). In
you example it can be anything, because IAsyncEnumerator<out T> is
generic interface
int
IAsyncEnumerator<int>.Current
{
[DebuggerHidden]
get
{
return _current;
}
}
Also
the compiler assign here _state to
-4 for navigation to LOOP label in our next MoveNext() action call. And
we notified our upper call (MainFunctionClass instance) that we are done with
result (see the code sample below to understand what compiler did).
COMPLETED:
_promiseOfValueOrEnd.SetResult(true);
I
would suggest to finish with this _state
= -4 and take a look what happens with next MoveNext() action call
before we switching back to MainFunctionClass.MoveNext() function.
private void MoveNext()
{
TaskAwaiter awaiter;
_state = -1;
if (!_disposeMode)
{
_number++; //increment our value
here
}
if (_number < 20)
{
awaiter = Task.Delay(100).GetAwaiter();
if
(!awaiter.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter;
GenerateSequenceMethod stateMachine
= this;
_builder.AwaitUnsafeOnCompleted(ref awaiter, ref
stateMachine);
return;
}
}
}
Here
in code I have removed switch (-4) block and just left which
construction will be called. So, our “magic”
function AwaitUnsafeOnComple
called the same block code
awaiter.GetResult();
_current = _number;
_state = -4;
goto COMPLETED;
because
when we get inside if (!awaiter.IsCompleted) logic, we always change our
_state variable to 0. The code above will run while this if (_number < 20) clock return
false. Then compiler calls this last part
_state = -2;
_promiseOfValueOrEnd.SetResult(false);
return;
This
code let the upper call MainFunctionClass.MoveNext() knows that the iteration
is over. So, let come back to MainFunctionClass.MoveNext() and take a look what
happens there. Because we already described how MoveNext() method works, I
think you will understand what happens here without any difficulties.
if (num != 0)
{
goto
GETNEXTAWAITER;
}
ValueTaskAwaiter<bool>
awaiter2 = _taskAwaiter;
_taskAwaiter = default(ValueTaskAwaiter<bool>);
_state = -1;
goto GETRESULT;
GETNEXTAWAITER:
awaiter2 =
_collectionAsyncEnumerator.MoveNextAsync().GetAwaiter();
if
(!awaiter2.IsCompleted)
{
_state = 0;
_taskAwaiter = awaiter2;
MainFunctionClass stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter2, ref
stateMachine);
return;
}
GETRESULT:
_checkResult = awaiter2.GetResult();
if (_checkResult)
{
_number =
_collectionAsyncEnumerator.Current;
Console.WriteLine(_number);
goto
GETNEXTAWAITER;
}
The
same trick uses here with changing _state
field to 0;
if
(!awaiter2.IsCompleted)
{
_state
= 0;
_taskAwaiter = awaiter2;
MainFunctionClass stateMachine = this;
_builder.AwaitUnsafeOnCompleted(ref awaiter2, ref
stateMachine);
return;
}
And
you already know about “magic” function AwaitUnsafeOnCompleted. Current state
machine life-circle looks like described below:
- Set up _state to 0
- Call AwaitUnsafeOnCompleted function
- Callback for MoveNext() function
- Change _state to -1
- Navigate to GETRESULT label
- Save result to _checkResult variable (this result compiler get from function _promiseOfValueOrEnd.GetResult() in class GenerateSequenceMethod)
- Call WriteLine function for Current element
- Navigate to next MoveNextAsync() calls
Is
it very complicate to understand what happens there under the hood? I do not
think so. In additional I want to show you final sequence diagram with our
state machines. I will left only important things for better understanding.
Conclusions:
This
is one of the greatest things which Microsoft has been implemented in C# 8. I
hope, you will use these asynchronous steams\iterators in your application. And
I expect more that my article helps you to understand how these async streams
works under the hood.
No comments:
Post a Comment