Task 代表一個任務,其具體類型可能是多種多樣的,且有時候?qū)ξ覀儊碚f完全是個黑盒。這個任務可以有結(jié)果,可以沒有結(jié)果,我們能知道這個任務什么時候執(zhí)行完成,并進行相應的后續(xù)處理。
Task 生命周期可以分為任務執(zhí)行和回調(diào)執(zhí)行兩個主要的階段。上回講了 Task 的執(zhí)行階段,這次我們就接著來看下回調(diào)執(zhí)行階段。
Task 將回調(diào)函數(shù)維護在 m_continuationObject 字段上,并通過 TrySetResult 等方法對外(這個對外僅限r(nóng)untime里Task相關的其他代碼)暴露回調(diào)的觸發(fā)方式。
由于 Task 的設計過于復雜,我的理解可能有錯誤,以后的版本可能會和現(xiàn)在有所出入。本文僅供參考學習,希望大家不要太過于糾結(jié)細節(jié),了解設計思路比實現(xiàn)細節(jié)更重要。
class Task
{
// 保存一個或一組回調(diào)
private volatile object? m_continuationObject;
internal bool TrySetResult()
{
// ...
FinishContinuations();
// ...
}
internal void FinishContinuations()
{
// 處理回調(diào)的執(zhí)行
}
}
class Task<TResult> : Task
{
internal bool TrySetResult(TResult result)
{
// ...
this.m_result = result;
// 復用父類的邏輯
FinishContinuations();
// ...
}
}
本文要討論的其實就是對上述的補充:
- Task 在把回調(diào)函數(shù)保存到 m_continuationObject 之前,對回調(diào)函數(shù)進行了什么樣的包裝處理?
- Task 的 回調(diào)函數(shù)是在什么時候被觸發(fā)的,也就是 Task 的完成與回調(diào)的執(zhí)行是如何進行銜接的?
- Task 所保存的回調(diào)函數(shù)會在哪里執(zhí)行?
Task.ContinueWith
往一個 Task 注冊回調(diào),有兩種方式:直接調(diào)用 Task 實例的 ContinueWith 方法,或者使用 await 關鍵詞。我們先看一下前者,await 放在后面單獨講。
ContinueWith 的產(chǎn)物:ContinuationTask
調(diào)用 ContinueWith 本質(zhì)上是創(chuàng)建了一個新的 Task(后面簡稱為 ContinuationTask),而這個 ContinuationTask 的執(zhí)行時間就是 原Task(后面簡稱為 AntecedentTask) 完成之后。
作為 Task ContinueWith 的返回值的 Task 的子類有以下四個,分別對應四種用法:
- ContinuationTaskFromTask
向 Task 注冊一個回調(diào)
Task task = Task.Run(() => Console.WriteLine("Hello"))
.ContinueWith(t => Console.WriteLine("World"));
// System.Threading.Tasks.ContinuationTaskFromTask
Console.WriteLine(task.GetType());
- ContinuationResultTaskFromTask<TResult>
向 Task 注冊一個回調(diào),并在回調(diào)里返回一個新值作為 新Task 的返回值
Task task = Task.Run(() => Console.WriteLine("Hello"))
.ContinueWith(t => "World");
// System.Threading.Tasks.ContinuationResultTaskFromTask`1[System.String]
Console.WriteLine(task.GetType());
- ContinuationTaskFromResultTask<TAntecedentResult>
向 Task<TResult> 注冊一個回調(diào), 并且 Task 獲取返回值
Task task = Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// System.Threading.Tasks.ContinuationTaskFromResultTask`1[System.String]
Console.WriteLine(task.GetType());
- ContinuationResultTaskFromResultTask<TAntecedentResult, TResult>
向 Task<TResult> 注冊一個回調(diào),并在回調(diào)里返回一個新值作為 新Task 的返回值
Task task = Task.Run(() => "Hello")
.ContinueWith(t => $"{t.Result} World");
// System.Threading.Tasks.ContinuationResultTaskFromResultTask`2[System.String,System.String]
Console.WriteLine(task.GetType());
因為 Task.ContinueWith 的結(jié)果依舊是一個 Task,這個鏈式的回調(diào)注冊可以無限地進行。
Task.Run(() => Console.WriteLine(1))
.ContinueWith(t => Console.WriteLine(2))
.ContinueWith(t => Console.WriteLine(3))
.ContinueWith(t => Console.WriteLine(4));
額外的參數(shù)
class Task
{
public Task ContinueWith(
Action<Task> continuationAction,
CancellationToken cancellationToken,
TaskContinuationOptions continuationOptions,
TaskScheduler scheduler)
{
// ...
}
}
我們還可以通過 ContinueWith 的重載向其傳入回調(diào)函數(shù)外的三個參數(shù):
- CancellationToken:協(xié)作式取消 Task 的執(zhí)行,本文暫不展開。
- TaskContinuationOptions:
前一部分和 TaskCreationOptions 的值完全一致。
如果設置的是這一部分的值,就會直接轉(zhuǎn)換為 ContinuationTask 的 TaskCreationOptions。TaskScheduler 識別過后進行相應的處理。
如果設置的是后一部分的值,那么 runtime 在決定把 Task 交給 TaskScheduler 去調(diào)度執(zhí)行前,會根據(jù)設置的值做相應的預判邏輯。例如 OnlyOnFaulted 代表在 AntecedentTask 執(zhí)行過程拋出了異常,runtime 才會去執(zhí)行 ContinuationTask。
public enum TaskCreationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16, // 0x00000010
RunContinuationsAsynchronously = 64, // 0x00000040
}
public enum TaskContinuationOptions
{
None = 0,
PreferFairness = 1,
LongRunning = 2,
AttachedToParent = 4,
DenyChildAttach = 8,
HideScheduler = 16, // 0x00000010
LazyCancellation = 32, // 0x00000020
RunContinuationsAsynchronously = 64, // 0x00000040
// ---------- 分界線 ----------
NotOnRanToCompletion = 65536, // 0x00010000
NotOnFaulted = 131072, // 0x00020000
NotOnCanceled = 262144, // 0x00040000
OnlyOnRanToCompletion = NotOnCanceled | NotOnFaulted, // 0x00060000
OnlyOnFaulted = NotOnCanceled | NotOnRanToCompletion, // 0x00050000
OnlyOnCanceled = NotOnFaulted | NotOnRanToCompletion, // 0x00030000
ExecuteSynchronously = 524288, // 0x00080000
}
- TaskScheduler:可以之指定 TaskScheduler 去調(diào)度 Task。
默認是 TaskScheduler.Current,而 TaskScheduler.Current 的默認值是 ThreadPoolTaskScheduler,可以修改成其他實現(xiàn)。
回調(diào)的容器:TaskContinuation
我們注意到 m_continuationObject 字段的類型是 object,而 object 類型在數(shù)據(jù)的存儲上有更多的靈活性。
class Task
{
// 保存一個或一組回調(diào)
private volatile object m_continuationObject;
}
我們看下下面的代碼
var antecedentTask = Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("Antecedent Task Completed");
});
PrintContinuationObjectType(antecedentTask);
antecedentTask.ContinueWith(_ => Console.WriteLine("Continuation Task1 Completed"));
PrintContinuationObjectType(antecedentTask);
antecedentTask.ContinueWith(_ => Console.WriteLine("Continuation Task2 Completed"));
PrintContinuationObjectType(antecedentTask);
Console.ReadLine();
void PrintContinuationObjectType(Task task)
{
var continuationObject = typeof(Task)
.GetField("m_continuationObject",
BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(task);
var type = continuationObject?.GetType().FullName ?? "null";
if (continuationObject is IEnumerable enumerable)
{
type += $", Element type: {enumerable.Cast<object>().First().GetType().FullName}";
}
Console.WriteLine(type);
}
執(zhí)行結(jié)果如下
null
System.Threading.Tasks.ContinueWithTaskContinuation
System.Collections.Generic.List`1[[System.Object, System.Private.CoreLib, >Version=6.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]], Element type: >System.Threading.Tasks.ContinueWithTaskContinuation
Antecedent Task Completed
Continuation Task1 Completed
Continuation Task2 Completed
隨著回調(diào)函數(shù)注冊數(shù)量的增加,m_continuationObject 保存的數(shù)據(jù)類型也在變化
- 沒有注冊時:null
- 一個回調(diào)時:ContinueWithTaskContinuation 實例
- 超過一個回調(diào)時:元素類型是 ContinueWithTaskContinuation 的 List<object>
實際上 m_continuationObject 還有別的類型:
class Task
{
private void RunContinuations(object continuationObject) // separated out of FinishContinuations to enable it to be inlined
{
Debug.Assert(continuationObject != null);
TplEventSource log = TplEventSource.Log;
bool etwIsEnabled = log.IsEnabled();
if (etwIsEnabled)
log.TraceSynchronousWorkBegin(this.Id, CausalitySynchronousWork.CompletionNotification);
bool canInlineContinuations =
(m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) == 0 &&
RuntimeHelpers.TryEnsureSufficientExecutionStack();
switch (continuationObject)
{
// Handle the single IAsyncStateMachineBox case. This could be handled as part of the ITaskCompletionAction
// but we want to ensure that inlining is properly handled in the face of schedulers, so its behavior
// needs to be customized ala raw Actions. This is also the most important case, as it represents the
// most common form of continuation, so we check it first.
case IAsyncStateMachineBox stateMachineBox:
AwaitTaskContinuation.RunOrScheduleAction(stateMachineBox, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single Action case.
case Action action:
AwaitTaskContinuation.RunOrScheduleAction(action, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single TaskContinuation case.
case TaskContinuation tc:
tc.Run(this, canInlineContinuations);
LogFinishCompletionNotification();
return;
// Handle the single ITaskCompletionAction case.
case ITaskCompletionAction completionAction:
RunOrQueueCompletionAction(completionAction, canInlineContinuations);
LogFinishCompletionNotification();
return;
}
}
ContinueWithTaskContinuation 的父類 TaskContinuation 是一個抽象類。除了 ContinueWithTaskContinuation,還有別的實現(xiàn)。
internal abstract class TaskContinuation
{
internal abstract void Run(Task completedTask, bool canInlineContinuationTask);
}
ContinueWithTaskContinuation 維護著 Task 執(zhí)行相關的兩個核心對象,一個是 Task 本身,另一是 TaskScheduler。真正執(zhí)行回調(diào)之前,需要先調(diào)用 TaskContinuation.Run。
internal sealed class ContinueWithTaskContinuation : TaskContinuation
{
internal Task? m_task;
internal readonly TaskContinuationOptions m_options;
private readonly TaskScheduler m_taskScheduler;
internal ContinueWithTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
{
m_task = task;
m_options = options;
m_taskScheduler = scheduler;
}
internal override void Run(Task completedTask, bool canInlineContinuationTask)
{
// ...
}
}
Task.ContinueWith 回調(diào)的生命周期
階段一 將回調(diào)封裝進 ContinueWithTaskContinuation
我們向 Task 注冊的回調(diào)回調(diào)最終會以 ContinueWithTaskContinuation 的形式保存在 Task 之中,相關的代碼摘錄如下。其他 public 的 ContinueWith 可以看做是對這些 private 方法的封裝。
class Task
{
private Task ContinueWith(Action<Task> continuationAction, TaskScheduler scheduler,
CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);
Task continuationTask = new ContinuationTaskFromTask(
this, continuationAction, null,
creationOptions, internalOptions
);
ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
return continuationTask;
}
private Task<TResult> ContinueWith<TResult>(Func<Task, TResult> continuationFunction, TaskScheduler scheduler,
CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
{
CreationOptionsFromContinuationOptions(continuationOptions, out TaskCreationOptions creationOptions, out InternalTaskOptions internalOptions);
Task<TResult> continuationTask = new ContinuationResultTaskFromTask<TResult>(
this, continuationFunction, null,
creationOptions, internalOptions
);
ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
return continuationTask;
}
internal void ContinueWithCore(Task continuationTask,
TaskScheduler scheduler,
CancellationToken cancellationToken,
TaskContinuationOptions options)
{
// ...
AddTaskContinuation(continuation);
// ...
}
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
// ...
AddTaskContinuationComplex(tc, addBeforeOthers);
// ...
}
private bool AddTaskContinuationComplex(object tc)
{
List<object?>? list = m_continuationObject as List<object?>;
// ...
list.Add(tc);
// ...
}
}
internal sealed class ContinuationTaskFromTask : Task
{
private Task? m_antecedent;
public ContinuationTaskFromTask(
Task antecedent, Delegate action, object? state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions) :
base(action, state, Task.InternalCurrentIfAttached(creationOptions), default, creationOptions, internalOptions, null)
{
m_antecedent = antecedent;
}
internal override void InnerInvoke()
{
if (m_action is Action<Task> action)
{
action(antecedent);
return;
}
if (m_action is Action<Task, object?> actionWithState)
{
actionWithState(antecedent, m_stateObject);
return;
}
}
}
子流程整理如下:
- 將委托包裝到具體的 ContinuationTask 實例里(ContinuationTaskFromTask等 Task 的子類實例),
定義 Task 子類的目的是為了將 AntecedentTask 的引用保存起來,以便在執(zhí)行 ContinuationTask 將 AntecedentTask 作為委托的參數(shù)傳入。 - 將 ContinuationTask 包裝到 ContinueWithTaskContinuation 實例中
- 將 ContinueWithTaskContinuation 添加到 TaskContinuation 列表里(m_continuationObject)
階段二 回調(diào)的觸發(fā)
這一部分其實就是上回 Task 可以封裝任何類型的別的任務 這一節(jié)提到的的流程:
- 調(diào)度器在執(zhí)行完 AntecedentTask 之后,會去調(diào)用 AntecedentTask.TrySetResult()
- 在 TrySetResult 方法里,最終會去調(diào)用 TaskContinuation.Run()
- ContinueWithTaskContinuation 里會把 ContinuationTask 放入 ContinueWithTaskContinuation 里維護的 TaskScheduler 里調(diào)度執(zhí)行。

回調(diào)執(zhí)行真正的決定者:ContinueWithTaskContinuation
在 ContinueWithTaskContinuation 中維護著待執(zhí)行的 ContinuationTask 以及決定 ContinuationTask 最終執(zhí)行方式的 TaskContinuationOptions 和 TaskScheduler。
internal sealed class ContinueWithTaskContinuation : TaskContinuation
{
internal Task? m_task;
internal readonly TaskContinuationOptions m_options;
private readonly TaskScheduler m_taskScheduler;
internal ContinueWithTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
{
m_task = task;
m_options = options;
m_taskScheduler = scheduler;
}
internal override void Run(Task completedTask, bool canInlineContinuationTask)
{
Task? continuationTask = m_task;
m_task = null;
// 檢查任務的完成狀態(tài),如果不符合 TaskContinuationOptions 的設置,回調(diào)就不會被執(zhí)行
TaskContinuationOptions options = m_options;
bool isRightKind =
completedTask.IsCompletedSuccessfully ?
(options & TaskContinuationOptions.NotOnRanToCompletion) == 0 :
(completedTask.IsCanceled ?
(options & TaskContinuationOptions.NotOnCanceled) == 0 :
(options & TaskContinuationOptions.NotOnFaulted) == 0);
// 任務完成狀態(tài)符合要求,回調(diào)執(zhí)行。
if (isRightKind)
{
continuationTask.m_taskScheduler = m_taskScheduler;
// 直接執(zhí)行回調(diào)或?qū)⑵渑抨牭却龍?zhí)行,具體取決于是否需要同步或異步執(zhí)行。
// 默認執(zhí)行路徑,上層傳的是 true。
if (canInlineContinuationTask && // 調(diào)用Run方法的內(nèi)部方法傳了允許內(nèi)聯(lián)
(options & TaskContinuationOptions.ExecuteSynchronously) != 0) // 注冊回調(diào)的實際用戶設置了同步執(zhí)行
{
InlineIfPossibleOrElseQueue(continuationTask, needsProtection: true);
}
else
{
try { continuationTask.ScheduleAndStart(needsProtection: true); }
catch (TaskSchedulerException)
{
// 如果 Task 執(zhí)行失敗了,ScheduleAndStart 方法會將 Task 標記為失敗,
// 這里是runtime設計的時候保證不會有意外的錯誤發(fā)生,僅做catch,不做處理
}
}
}
else
{
Task.ContingentProperties? cp = continuationTask.m_contingentProperties;
if (cp is null || cp.m_cancellationToken == default)
{
continuationTask.InternalCancelContinueWithInitialState();
}
else
{
continuationTask.InternalCancel();
}
}
}
}
所謂的 Inline 是指在觸發(fā)回調(diào)的線程中直接執(zhí)行回調(diào)。
像 Task.Run 創(chuàng)建的 Task(由 ThreadPoolTaskScheduler 調(diào)度,也就是由線程池調(diào)度) 的回調(diào)如果是 Inline 執(zhí)行的話,那執(zhí)行回調(diào)的線程和執(zhí)行傳給 Task.Run 的委托的線程,就會是同一個線程池線程。因為線程池在執(zhí)行完委托之后,就會觸發(fā)回調(diào)執(zhí)行。
我們注冊的 TaskScheduler 可以選擇是否只是 Inline。
public abstract class TaskScheduler
{
// 如果不是 Inline 執(zhí)行,就是走這個方法執(zhí)行回調(diào)
// 如果沒有傳
protected internal abstract void QueueTask(Task task);
// 如果返回 false,就算參數(shù)要求 Inline ,也會走 QueueTask 執(zhí)行回調(diào)
protected abstract bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued);
// 獲取所有調(diào)度到該 TaskScheduler 的 Task
protected abstract IEnumerable<Task>? GetScheduledTasks();
}
執(zhí)行回調(diào)的線程
根據(jù)上文的吻戲 Task.ContinueWith 的回調(diào)最終在哪執(zhí)行取決于 TaskContinuationOptions 和 TaskScheduler。
下面是幾個典型的例子:
- Inline
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}"),
TaskContinuationOptions.ExecuteSynchronously);
Console.ReadKey();
前后線程永遠不會發(fā)生變化
Task Run, ThreadId: 6
Task OnCompleted, ThreadId: 6
- 調(diào)度到 ThreadPool 本地隊列
下面的例子里,也就是調(diào)度到執(zhí)行前一個執(zhí)行前一個委托的線程池線程的本地隊列里
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}")); // 默認是 TaskContinuationOptions.None
Console.ReadKey();
有可能前后是一個線程,也有可能不是,可以多執(zhí)行幾次看看。
更多說明請看 ThreadPool 的博客中偷竊機制。
- 調(diào)度到 ThreadPool 全局隊列
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Task Run, ThreadId: {Environment.CurrentManagedThreadId}");
})
.ContinueWith(t => Console.WriteLine($"Task OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}"),
TaskContinuationOptions.PreferFairness);
將回調(diào)調(diào)度到全局隊列,等待線程池線程領取并執(zhí)行。
Task 與 await
與 ContinueWith 相比,await 給我們提供了更加簡單的 Task 的使用方式。
Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// 等效于
var result = await Task.Run(() => "Hello");
Console.WriteLine($"{result} World");
Awaiter
我們可以通過 Task.GetAwaiter 從 Task 實例上獲取到 Task 對應的 TaskAwaiter 對象。并且可以通過 TaskAwaiter.OnCompleted 方法注冊回調(diào),其執(zhí)行結(jié)果與 Task.ContinueWith 一致。
TaskAwaiter awaiter1 = Task.Run(()=> Console.WriteLine("Hello")).GetAwaiter();
awaiter1.OnCompleted(()=> Console.WriteLine("World"));
TaskAwaiter<string> awaiter2 = Task.Run(()=> "Hello").GetAwaiter();
awaiter2.OnCompleted(()=> Console.WriteLine($"{awaiter2.GetResult()} World"));
Console.ReadKey();
Hello World
Hello
World
注意:直接調(diào)用 TaskAwaiter.GetResult 會阻塞調(diào)用線程直至 Task 執(zhí)行完成。
TaskAwaiter 本質(zhì)上可以理解成在 await 語法糖編譯成的代碼中,為了解耦 Task 和狀態(tài)機,而創(chuàng)建的一個隔離層,內(nèi)部對 Task 進行了包裝。
public class Task<TResult>
{
public TaskAwaiter<TResult> GetAwaiter() => new TaskAwaiter<TResult>(this);
internal void SetContinuationForAwait(
Action continuationAction, bool continueOnCapturedContext, bool flowExecutionContext)
{
TaskContinuation? tc = null;
if (continueOnCapturedContext)
{
SynchronizationContext? syncCtx = SynchronizationContext.Current;
if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
{
tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, continuationAction, flowExecutionContext);
}
else
{
TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
if (scheduler != null && scheduler != TaskScheduler.Default)
{
tc = new TaskSchedulerAwaitTaskContinuation(scheduler, continuationAction, flowExecutionContext);
}
}
}
if (tc == null && flowExecutionContext)
{
tc = new AwaitTaskContinuation(continuationAction, flowExecutionContext: true);
}
if (tc != null)
{
if (!AddTaskContinuation(tc, addBeforeOthers: false))
tc.Run(this, canInlineContinuationTask: false);
}
else
{
if (!AddTaskContinuation(continuationAction, addBeforeOthers: false))
AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
}
}
}
public readonly struct TaskAwaiter<TResult> :
ICriticalNotifyCompletion,
INotifyCompletion,
ITaskAwaiter
{
private readonly Task<TResult> m_task;
internal TaskAwaiter(Task task)
{
m_task = task;
}
public bool IsCompleted => m_task.IsCompleted;
public void OnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true,
flowExecutionContext: true);
}
public void UnsafeOnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true,
flowExecutionContext: false);
}
[StackTraceHidden]
public TResult GetResult()
{
TaskAwaiter.ValidateEnd((Task)this.m_task);
return this.m_task.ResultOnSuccess;
}
internal static void OnCompletedInternal(
Task task,
Action continuation,
bool continueOnCapturedContext,
bool flowExecutionContext)
{
task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}
}
可以看到 TaskAwaiter.OnCompleted 就是往 Task 注冊回調(diào),而 await 關鍵詞的本質(zhì)就是把 await 后面的代碼變成了回調(diào)并注冊到了 Task 上。
Task.Run(() => "Hello")
.ContinueWith(t => Console.WriteLine($"{t.Result} World"));
// 等效于
var result = await Task.Run(() => "Hello");
Console.WriteLine($"{result} World");
// 等效于
Task.Run(()=> "Hello").GetAwaiter().OnCompleted(()=> Console.WriteLine("World"));
至于 TaskAwaiter.UnsafeOnCompleted 我們稍后解釋。
await Anything
C# 編譯器并沒有限制 await 關鍵詞只能用在 Task 上。例如 Task.Yield()
的返回值 YieldAwaitable,既不是 Task 也不是 Task 的子類。
public readonly struct YieldAwaitable
{
public YieldAwaitable.YieldAwaiter GetAwaiter() => new YieldAwaitable.YieldAwaiter();
public readonly struct YieldAwaiter :
ICriticalNotifyCompletion,
INotifyCompletion
{
public bool IsCompleted => false;
public void OnCompleted(Action continuation) => YieldAwaitable.YieldAwaiter.QueueContinuation(continuation, true);
public void UnsafeOnCompleted(Action continuation) => YieldAwaitable.YieldAwaiter.QueueContinuation(continuation, false);
public void GetResult()
{
}
}
}
Task 和 YieldAwaitable 都提供了一個 GetAwaiter 方法。
返回的 XXXAwaiter 需滿足以下兩個條件:
- ICriticalNotifyCompletion,INotifyCompletion 這兩個接口。而 ICriticalNotifyCompletion 是 INotifyCompletion 的子接口。
public interface INotifyCompletion
{
void OnCompleted(Action continuation);
}
public interface ICriticalNotifyCompletion : INotifyCompletion
{
void UnsafeOnCompleted(Action continuation);
}
- 提供 IsCompleted 屬性 和 void GetResult() / TResult GetResult() 方法。GetResult 方法是否有返回值取決于 await XXXAwaitable 是否想提供返回值。

實際上,我們自己想要實現(xiàn)一個 Awaitable 的話,Awaiter 只需要實現(xiàn) INotifyCompletion 接口或者 ICriticalNotifyCompletion 就可以了。
首先,我們需要準備好一個 Awaitable。
class FooAwaitable<TResult>
{
// 回調(diào),簡化起見,未將其包裹到 TaskContinuation 這樣的容器里
private Action _continuation;
private TResult _result;
private volatile bool _completed;
public bool IsCompleted => _completed;
// Awaitable 中的關鍵部分,提供 GetAwaiter 方法
public FooAwaiter<TResult> GetAwaiter() => new FooAwaiter<TResult>(this);
public void Run(Func<TResult> func)
{
new Thread(() =>
{
var result = func();
TrySetResult(result);
})
{
IsBackground = true
}.Start();
}
private bool AddFooContinuation(Action action)
{
if (_completed)
{
return false;
}
_continuation += action;
return true;
}
private void TrySetResult(TResult result)
{
_result = result;
_completed = true;
_continuation?.Invoke();
}
// TODO: 實現(xiàn)一個 FooAwaiter 作為 FooAwaitable 內(nèi)部類
// public struct FooAwaiter<TResult> : INotifyCompletion Or ICriticalNotifyCompletion
// {
// }
}
實現(xiàn) INotifyCompletion 接口的 Awaiter 示例
var fooAwaitable = new FooAwaitable<string>();
fooAwaitable.Run(() =>
{
// 可以把Sleep去掉看看
Thread.Sleep(100);
Console.WriteLine("Hello");
return "World";
});
var x = await fooAwaitable;
Console.WriteLine(x);
Console.ReadKey();
class FooAwaitable<TResult>
{
// ...
// 上面所展示的 FooAwaitable 里的代碼,此處省略
// ...
// 1. 實現(xiàn) INotifyCompletion
public struct FooAwaiter<TResult> : INotifyCompletion
{
private readonly FooAwaitable<TResult> _fooAwaitable;
// 2. 實現(xiàn) IsCompleted 屬性
public bool IsCompleted => _fooAwaitable.IsCompleted;
public FooAwaiter(FooAwaitable<TResult> fooAwaitable)
{
_fooAwaitable = fooAwaitable;
}
public void OnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.OnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.OnCompleted: added continuation");
}
else
{
// 試著把上面的 Thread.Sleep(100) 刪掉看看,就有可能會執(zhí)行到這里
// 也就是回調(diào)的注冊時間有可能晚于任務完成的時間
Console.WriteLine("FooAwaiter.OnCompleted: already completed, invoking continuation");
continuation();
}
}
// 3. 實現(xiàn) GetResult 方法
public TResult GetResult()
{
Console.WriteLine("FooAwaiter.GetResult");
return _fooAwaitable._result;
}
}
}
執(zhí)行結(jié)果如下:
FooAwaiter.OnCompleted
FooAwaiter.OnCompleted: added continuation
Hello
FooAwaiter.GetResult
World
實現(xiàn) ICriticalNotifyCompletion 接口的 Awaiter 示例
var fooAwaitable = new FooAwaitable<string>();
fooAwaitable.Run(() =>
{
Thread.Sleep(100);
Console.WriteLine("Hello");
return "World";
});
var x = await fooAwaitable;
Console.WriteLine(x);
Console.ReadKey();
class FooAwaitable<TResult>
{
// ...
// 上面所展示的 FooAwaitable 里的代碼,此處省略
// ...
// 1 實現(xiàn) ICriticalNotifyCompletion
public struct FooAwaiter<TResult> : ICriticalNotifyCompletion
{
private readonly FooAwaitable<TResult> _fooAwaitable;
// 2 實現(xiàn) IsCompleted 屬性
public bool IsCompleted => _fooAwaitable.IsCompleted;
public FooAwaiter(FooAwaitable<TResult> fooAwaitable)
{
_fooAwaitable = fooAwaitable;
}
public void OnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.OnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.OnCompleted: added continuation");
}
else
{
Console.WriteLine("FooAwaiter.OnCompleted: already completed, invoking continuation");
continuation();
}
}
public void UnsafeOnCompleted(Action continuation)
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted");
if (_fooAwaitable.AddFooContinuation(continuation))
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted: added continuation");
}
else
{
Console.WriteLine("FooAwaiter.UnsafeOnCompleted: already completed, invoking continuation");
continuation();
}
}
// 3. 實現(xiàn) GetResult 方法
public TResult GetResult()
{
Console.WriteLine("FooAwaiter.GetResult");
return _fooAwaitable._result;
}
}
}
執(zhí)行結(jié)果如下:
FooAwaiter.UnsafeOnCompleted
FooAwaiter.UnsafeOnCompleted: added continuation
Hello
FooAwaiter.GetResult
World
一旦實現(xiàn)了 ICriticalNotifyCompletion(INotifyCompletion 的子接口),注冊回調(diào)走的是 UnsafeOnCompleted 方法。如果同時實現(xiàn)兩個方法,也還是以ICriticalNotifyCompletion 的規(guī)則優(yōu)先。
INotifyCompletion VS ICriticalNotifyCompletion
既然實現(xiàn) Awaitable 只要實現(xiàn)兩個接口之一,那為什么要區(qū)分出這兩個接口呢。
我們來看看 TaskAwaiter 里的實現(xiàn)是什么樣。
public readonly struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion
{
private readonly Task<TResult> m_task;
internal TaskAwaiter(Task<TResult> task)
{
m_task = task;
}
// ...
public void OnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: true);
}
public void UnsafeOnCompleted(Action continuation)
{
TaskAwaiter.OnCompletedInternal(m_task, continuation, continueOnCapturedContext: true, flowExecutionContext: false);
}
internal static void OnCompletedInternal(
Task task,
Action continuation,
bool continueOnCapturedContext,
bool flowExecutionContext)
{
m_task.SetContinuationForAwait(continuation, continueOnCapturedContext, flowExecutionContext);
}
// ...
}
OnCompleted 和 UnsafeOnCompleted 的唯一區(qū)別是在調(diào)用 TaskAwaiter.OnCompletedInternal 時,flowExecutionContext 這個參數(shù)有所不同。
ExecutionContext 的本質(zhì)是一個線程私有變量,維護著我們常用 AsyncLocal 的數(shù)據(jù),例如 Thread.CurrentThread.CurrentCulture 其實就是一個 AsyncLocal 變量。
runtime 中會在發(fā)生線程切換的地方,將 ExecutionContext 從前一個線程拷貝到后一個線程。那么第二個線程里也就可以拿到在第一個線程里設置好的 AsyncLocal 變量。
就算線程沒有發(fā)生切換,runtime 里有的地方也會通過清空 ExecutionContext 來阻止其往后傳播。
更多 ExcutionContext 和 AsyncLocal 的解析,請參考我之前的一篇博客:
https://www.cnblogs.com/eventhorizon/p/12240767.html
也就是說 OnCompleted 會保證 ExecutionContext 往后傳播。而 UnsafeOnCompleted 則不會。我們來看下面這個示例。
class Program
{
private static readonly AsyncLocal<string> AsyncLocal = new AsyncLocal<string>();
static void Main(string[] args)
{
AsyncLocal.Value = "Hello World";
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine(
$"Task1 Run, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}");
})
.GetAwaiter()
.OnCompleted(() =>
Console.WriteLine(
$"Task1 OnCompleted, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}"));
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine(
$"Task2 Run, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}");
})
.GetAwaiter()
.UnsafeOnCompleted(() =>
Console.WriteLine(
$"Task2 UnsafeOnCompleted, ThreadId: {Environment.CurrentManagedThreadId}, AsyncLocal: {AsyncLocal.Value}"));
Console.ReadKey();
}
}
Task1 Run, ThreadId: 6, AsyncLocal: Hello World
Task2 Run, ThreadId: 7, AsyncLocal: Hello World
Task1 OnCompleted, ThreadId: 6, AsyncLocal: Hello World
Task2 UnsafeOnCompleted, ThreadId: 7, AsyncLocal:
如果使用了 UnsafeOnCompleted 注冊回調(diào),也就是 flowExecutionContext: false
,則 ExecutionContext 不會往后繼續(xù)傳播。
同一個 Task 回調(diào)執(zhí)行前后線程沒變是因為 TaskSchedulerAwaitTaskContinuation 里優(yōu)先 Inline 執(zhí)行回調(diào),暫不展開。
AsyncTaskMethodBuilder 是狀態(tài)機的一個重要組成部分,負責 狀態(tài)機與 awaiter 的銜接工作,更詳細的功能我們下篇博客再敘述,這邊只簡單提一下。
AsyncTaskMethodBuilder 主要負責以下功能:
- 將 async 方法內(nèi)部的返回值封裝到 async 方法的最終所返回的 Task 中,并作為這個 Task 的返回值。
- 將 async 方法內(nèi)部發(fā)生的異常 封裝到 async 方法的最終所返回的 Task 中。
- 將狀態(tài)機待執(zhí)行的動作作為回調(diào) 向 awaiter 注冊(awaiter 內(nèi)部再向 Task 注冊)。
我們可以給 async 方法內(nèi)部的狀態(tài)機自己綁定 AsyncMethodBuilder。在自定義的 AsyncTaskMethodBuilder 里可以決定要不要往后傳 ExecutionContext.UnsafeOnCompleted 這個方法的存在意義就是為了在我們不像往后傳 ExecutionContext 的時候使用。
async 方法 內(nèi)的 AsyncMethodBuilder 和 async 方法的返回值有關,AsyncMethodBuilder 綁定在作為返回值的 Awaitable 上,下篇再講。
就目前 .NET 6 的代碼來說, async Task FooAsync(){}
這樣的以 Task 作為返回值的 async 方法中的狀態(tài)機來說,Task 方法所綁定的 AsyncMethodBuilder 內(nèi)并沒有調(diào)用 TaskAwaiter.UnsafeOnCompleted 方法,而是通過其他方式注冊的回調(diào),大致的流程和使用 TaskAwaiter.UnsafeOnCompleted 進行注冊時類似的。
如果像上文那樣自己實現(xiàn) Awaitable,會調(diào)用 TaskAwaiter.OnCompleted 或者 TaskAwaiter.OnCompleted 方法。這個和 AsyncMethodBuilder 內(nèi)部的實現(xiàn)有關。(手動狗頭,設計的太復雜了)
有限元狀態(tài)機
下面是摘自百度百科的關于狀態(tài)機的說明:
狀態(tài)機可歸納為4個要素,即現(xiàn)態(tài)、條件、動作、次態(tài)。這樣的歸納,主要是出于對狀態(tài)機的內(nèi)在因果關系的考慮。“現(xiàn)態(tài)”和“條件”是因,“動作”和“次態(tài)”是果。詳解如下:
- 現(xiàn)態(tài):是指當前所處的狀態(tài)。
- 條件:又稱為“事件”,當一個條件被滿足,將會觸發(fā)一個動作,或者執(zhí)行一次狀態(tài)的遷移。
- 動作:條件滿足后執(zhí)行的動作。動作執(zhí)行完畢后,可以遷移到新的狀態(tài),也可以仍舊保持原狀態(tài)。動作不是必需的,當條件滿足后,也可以不執(zhí)行任何動作,直接遷移到新狀態(tài)。
- 次態(tài):條件滿足后要遷往的新狀態(tài)。“次態(tài)”是相對于“現(xiàn)態(tài)”而言的,“次態(tài)”一旦被激活,就轉(zhuǎn)變成新的“現(xiàn)態(tài)”了。
而有限元狀態(tài)機的有限是指狀態(tài)的有限。
觀察下面這么一個常見的 await 使用場景,可以將 FooAsync 方法內(nèi)部的邏輯分為三種狀態(tài)(即 三個階段):
- 初始化狀態(tài)
- 等待 BarAsync 執(zhí)行完成的狀態(tài)
- 執(zhí)行結(jié)束狀態(tài)
class Program
{
static async Task Main(string[] args)
{
var a = 1;
Console.WriteLine(await FooAsync(a));
}
static async Task<int> FooAsync(int a)
{
int b = 2;
int c = await BarAsync();
return a + b + c;
}
static async Task<int> BarAsync()
{
await Task.Delay(100);
return 3;
}
}
由 FooAsync 編譯成的 IL 代碼經(jīng)整理后的等效 C# 代碼如下:
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var a = 1;
Console.WriteLine(await FooAsync(a));
}
static Task<int> FooAsync(int a)
{
var stateMachine = new FooStateMachine
{
_asyncTaskMethodBuilder = AsyncTaskMethodBuilder<int>.Create(),
_state = -1, // 初始化狀態(tài)
_a = a // 將實參拷貝到狀態(tài)機字段
};
// 開始執(zhí)行狀態(tài)機
stateMachine._asyncTaskMethodBuilder.Start(ref stateMachine);
return stateMachine._asyncTaskMethodBuilder.Task;
}
static async Task<int> BarAsync()
{
await Task.Delay(100);
return 3;
}
public class FooStateMachine : IAsyncStateMachine
{
// 方法的參數(shù)和局部變量被編譯會字段
public int _a;
public AsyncTaskMethodBuilder<int> _asyncTaskMethodBuilder;
private int _b;
private int _c;
// -1: 初始化狀態(tài)
// 0: 等到 Task 執(zhí)行完成
// -2: 狀態(tài)機執(zhí)行完成
public int _state;
private TaskAwaiter<int> _taskAwaiter;
public void MoveNext()
{
var result = 0;
TaskAwaiter<int> taskAwaiter;
try
{
// 狀態(tài)不是0,代表 Task 未完成
if (_state != 0)
{
// 初始化局部變量
_b = 2;
taskAwaiter = Program.BarAsync().GetAwaiter();
if (!taskAwaiter.IsCompleted)
{
// state: -1 => 0,異步等待 Task 完成
_state = 0;
_taskAwaiter = taskAwaiter;
var stateMachine = this;
// 內(nèi)部會調(diào)用 將 stateMachine.MoveNext 注冊為 Task 的回調(diào)
_asyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref taskAwaiter, ref stateMachine);
return;
}
}
else
{
taskAwaiter = _taskAwaiter;
// TaskAwaiter 是個結(jié)構體,這邊相當于是個清空 _taskAwaiter 字段的操作
_taskAwaiter = new TaskAwaiter<int>();
// state: 0 => -1,狀態(tài)機恢復到初始化狀態(tài)
_state = -1;
}
_c = taskAwaiter.GetResult();
result = _a + _b + _c;
}
catch (Exception e)
{
// state: any => -2,狀態(tài)機執(zhí)行完成
_state = -2;
_asyncTaskMethodBuilder.SetException(e);
return;
}
// state: -1 => -2,狀態(tài)機執(zhí)行完成
_state = -2;
// 將 result 設置為 FooAsync 方法的返回值
_asyncTaskMethodBuilder.SetResult(result);
}
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
}
}
}
編譯器在 Program 中創(chuàng)建了一個內(nèi)部類,也就是 FooStateMachine 這個狀態(tài)機,而 FooAsync
方法則變成了對這個狀態(tài)機的使用。
AsyncTaskMethodBuilder 的作用解釋放到下一篇文章再解釋,這邊簡單理解成 AsyncTaskMethodBuilder.SetResult 就是 FooAsync
return 返回值,AsyncTaskMethodBuilder.SetException 就是 FooAsync
內(nèi)部往外扔異常。
完整的流程如下圖所示:

一個方法中就算有個 await,這個方法也只會有一個對應的狀態(tài)機。就.NET 6 SDK 的編譯結(jié)果來看,state 會出現(xiàn) -1 => 0(等待第一個Task異步執(zhí)行完成) => -1 => 0(等待第二個Task異步執(zhí)行完成)這樣的流程。
AsyncStateMachineBox
前文講過 awaiter 往 Task 注冊回調(diào)的邏輯里,可能不會直接傳遞 ExcutionContext。
而這個 AsyncStateMachineBox 是對 AsyncStateMachine 和 ExcutionContext 的包裝,這邊通過這樣的方式往后傳遞 ExcutionContext。
await Task 的回調(diào)在哪執(zhí)行
回憶一下上文 Task.ContinueWith 講回調(diào)最終封裝到了 ContinueWithTaskContinuation。
返回值是 Task 的情況下狀態(tài)機所綁定的 AsyncTaskMethodBuilder 的所會調(diào)用 Task.UnsafeSetContinuationForAwait 實例方法。里面會根據(jù)不同的條件創(chuàng)建不同的 TaskContinuation。
UnsafeSetContinuationForAwait 中的邏輯和后續(xù)回調(diào)執(zhí)行流程大致如下:

值得注意的是,當前上下文設置的 TaskScheduler 并不能通過 TaskScheduler.Current 獲取,而應該通過 TaskScheduler.InternalCurrent 獲取。
TaskScheduler InternalCurrentTaskScheduler()
{
var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
return (TaskScheduler)propertyInfo.GetValue(null);
}
如果 TaskScheduler.InternalCurrent 為 null,TaskScheduler.Current 會返回 TaskScheduler.Default,也就是 ThreadPoolTaskScheduler。
public class TaskScheduler
{
public static TaskScheduler Current => InternalCurrent ?? Default;
}
同步上下文(SynchronizationContext)導致的死鎖問題與 Task.ConfigureAwait(continueOnCapturedContext:false)
如果存在 SynchronizationContext,回調(diào)會優(yōu)先在 SynchronizationContext 上執(zhí)行。而 SynchronizationContext 也是一種任務調(diào)度器,其存在時間應該是早于 Task 的。
在 .NET Framework 時代的 WPF、Windows Form、Asp.NET Web Form 這些框架里,都有 SynchronizationContext 的存在。
下面是一個 SynchronizationContext 的實現(xiàn)示例:
class SingleThreadedSynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<(SendOrPostCallback Callback, object State)> _queue = new BlockingCollection<(SendOrPostCallback Callback, object State)>();
public override void Send(SendOrPostCallback d, object state) // Sync operations
{
throw new NotSupportedException($"{nameof(SingleThreadedSynchronizationContext)} does not support synchronous operations.");
}
public override void Post(SendOrPostCallback d, object? state) // Async operations
{
_queue.Add((d, state));
}
public static void Run(Action action)
{
var previous = Current;
var context = new SingleThreadedSynchronizationContext();
SetSynchronizationContext(context);
try
{
Console.WriteLine("Executing first action, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
action();
while (context._queue.TryTake(out var item))
{
Console.WriteLine("Executing callback, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
item.Callback(item.State);
}
}
finally
{
context._queue.CompleteAdding();
SetSynchronizationContext(previous);
}
}
}
WPF 這些框架里,UI 只允許 UI 線程去更新。這些 SynchronizationContext 有個特點,就是一次只允許一個任務執(zhí)行。
class Program
{
private static void Main(string[] args)
{
new Thread(() =>
{
Console.WriteLine("Thread started, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
SingleThreadedSynchronizationContext.Run(Test);
})
{
IsBackground = true
}.Start();
Console.ReadKey();
}
private static void Test()
{
Console.WriteLine("Test: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"Test.SynchronizationContext1: {SynchronizationContext.Current}");
// 時間點一:這里把唯一的執(zhí)行線程給阻塞住了,會導致死鎖
DoSthAsync().GetAwaiter().GetResult();
Console.WriteLine($"Test.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("Test: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
private static async Task DoSthAsync()
{
Console.WriteLine("DoSthAsync: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"DoSthAsync.SynchronizationContext1: {SynchronizationContext.Current}");
// await 后面的代碼作為 Task.Delay 的回調(diào),
// 等待 Task.Delay 結(jié)束后會由 MaxConcurrencySynchronizationContext 進行調(diào)度執(zhí)行
await Task.Delay(100);
// 時間點二:MaxConcurrencySynchronizationContext 唯一的線程已經(jīng)被阻塞住了,死鎖開始
Console.WriteLine($"DoSthAsync.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("DoSthAsync: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
}
執(zhí)行結(jié)果如下:
Thread started, CurrentThreadId: 10
Executing first action, CurrentThreadId: 10
Test: START, CurrentThreadId: 10
Test.SynchronizationContext1: SingleThreadedSynchronizationContext
DoSthAsync: START, CurrentThreadId: 10
DoSthAsync.SynchronizationContext1: SingleThreadedSynchronizationContext
await Task.Delay(100)
的回調(diào)將無法被執(zhí)行。
那么如何在這些 UI 框架里避免死鎖呢?我們只需要將 await Task.Delay(100)
改為 await Task.Delay(100).ConfigureAwait(continueOnCapturedContext:false)
class Program
{
private static void Main(string[] args)
{
new Thread(() =>
{
Console.WriteLine("Thread started, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
SingleThreadedSynchronizationContext.Run(Test);
})
{
IsBackground = true
}.Start();
Console.ReadKey();
}
private static void Test()
{
Console.WriteLine("Test: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"Test.SynchronizationContext1: {SynchronizationContext.Current}");
// 時間點一:這里把唯一的執(zhí)行線程給阻塞住了,但不會導致死鎖
DoSthAsync().GetAwaiter().GetResult();
Console.WriteLine($"Test.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("Test: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
private static async Task DoSthAsync()
{
Console.WriteLine("DoSthAsync: START, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
Console.WriteLine($"DoSthAsync.SynchronizationContext1: {SynchronizationContext.Current}");
// await 后面的代碼作為 Task.Delay 的回調(diào),
// 等待 Task.Delay 結(jié)束后會由 線程池 進行調(diào)度執(zhí)行
await Task.Delay(100).ConfigureAwait(false);
// 時間點二:線程池執(zhí)行回調(diào),這邊已經(jīng)不存在 SynchronizationContext 了
Console.WriteLine($"DoSthAsync.SynchronizationContext2: {SynchronizationContext.Current}");
Console.WriteLine("DoSthAsync: END, CurrentThreadId: {0}", Environment.CurrentManagedThreadId);
}
}
執(zhí)行修改后的代碼:
Test: START, CurrentThreadId: 10
Test.SynchronizationContext1: SingleThreadedSynchronizationContext
DoSthAsync: START, CurrentThreadId: 10
DoSthAsync.SynchronizationContext1: SingleThreadedSynchronizationContext
DoSthAsync.SynchronizationContext2:
DoSthAsync: END, CurrentThreadId: 6
Test.SynchronizationContext2: SingleThreadedSynchronizationContext
Test: END, CurrentThreadId: 10
ConfigureAwait 方法返回了一個 ConfiguredTaskAwaitable 對象,對原有的 Task 進行了包裝,后續(xù)創(chuàng)建 TaskContinuation 的流程里會走 continueOnCapturedContext: false 的分支。
class Task
{
public ConfiguredTaskAwaitable ConfigureAwait(bool continueOnCapturedContext)
{
return new ConfiguredTaskAwaitable(this, continueOnCapturedContext);
}
}
為什么沒有同步上下文也會死鎖
我們的 Web Api 項目中,默認是不存在 SynchronizationContext 的。那為什么有的同學還會遇到死鎖問題呢,而且主要是高并發(fā)的情況下,本地可能沒辦法復現(xiàn)。
這個和 ThreadPool 中的 Starvation Avoidance 機制有關。
DoSthAsync().GetAwaiter().GetResult()
會阻塞線程池線。.NET 6之前極端情況導致線程池無可用線程,導致所謂的“死鎖”。
總結(jié)
- TaskContinuation:維護回調(diào)和調(diào)度回調(diào)。
- Awaiter:對 Awaitable 進行封裝,負責與狀態(tài)機進行交互。
- 狀態(tài)機:由編譯器生成,每個 async 方法 有且僅有一個,await 后面的代碼會被編譯到 狀態(tài)機 的 MoveNext 方法中,注冊為 Task 的回調(diào)。
- AsyncMethodBuilder:狀態(tài)機的重要組成部分,async 方法內(nèi)外溝通的橋梁,和 async 方法的返回值類型綁定。
- 無論何時,都謹慎使用
DoSthAsync().GetAwaiter().GetResult()
這樣的代碼。
轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/15912383.html
該文章在 2025/8/8 9:32:58 編輯過