LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

C#如何在long-running task中調(diào)用async方法

freeflydom
2025年8月8日 9:11 本文熱度 365

什么是 long-running thread

long-running task 是指那些長時(shí)間運(yùn)行的任務(wù),比如在一個(gè) while True 中執(zhí)行耗時(shí)較長的同步處理。

下面的例子中,我們不斷從隊(duì)列中嘗試取出數(shù)據(jù),并對這些數(shù)據(jù)進(jìn)行處理,這樣的任務(wù)就適合交給一個(gè) long-running task 來處理。

var queue = new BlockingCollection<string>();

Task.Factory.StartNew(() =>
{
    while (true)
    {
        // BlockingCollection<T>.Take() 方法會阻塞當(dāng)前線程,直到隊(duì)列中有數(shù)據(jù)可以取出。
        var input = queue.Take();
        Console.WriteLine($"You entered: {input}");
    }
}, TaskCreationOptions.LongRunning);


while (true)
{
    var input = Console.ReadLine();
    queue.Add(input);
}

在 .NET 中,我們可以使用 Task.Factory.StartNew 方法并傳入 TaskCreationOptions.LongRunning 來創(chuàng)建一個(gè) long-running task。

雖然這種方式創(chuàng)建的 long-running task 和默認(rèn)創(chuàng)建的 task 一樣,都是分配給 ThreadPoolTaskScheduler 來調(diào)度的, 但 long-running task 會被分配到一個(gè)新的 Background 線程上執(zhí)行,而不是交給 ThreadPool 中的線程來執(zhí)行。

class ThreadPoolTaskScheduler : TaskScheduler
{
    // ...
    protected internal override void QueueTask(Task task)
    {
        TaskCreationOptions options = task.Options;
        if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
        {
            // 在一個(gè)新的 Background 線程上執(zhí)行 long-running task。
            new Thread(s_longRunningThreadWork)
            {
                IsBackground = true,
                Name = ".NET Long Running Task"
            }.UnsafeStart(task);
        }
        else
        {
            // 非 long-running task 交給 ThreadPool 中的線程來執(zhí)行。
            ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
        }
    }
    // ...
}

為什么long-running task要和普通的task分開調(diào)度

如果一個(gè)task持續(xù)占用一個(gè)線程,那么這個(gè)線程就不能被其他的task使用,這和 ThreadPool 的設(shè)計(jì)初衷是相違背的。

如果在 ThreadPool 中創(chuàng)建了大量的 long-running task,那么就會導(dǎo)致
ThreadPool 中的線程不夠用,從而影響到其他的 task 的執(zhí)行。

在 long-running task await 一個(gè) async 方法后會發(fā)生什么

有時(shí)候,我們需要在 long-running task 中調(diào)用一個(gè) async 方法。比如下面的例子中,我們需要在 long-running task 中調(diào)用一個(gè) async
的方法來處理數(shù)據(jù)。

var queue = new BlockingCollection<string>();

Task.Factory.StartNew(async () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        await ProcessAsync(input);
        Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, TaskCreationOptions.LongRunning);

async Task ProcessAsync(string input)
{
    // 模擬一個(gè)異步操作。
    await Task.Delay(100);
    Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}

while (true)
{
    var input = Console.ReadLine();

    queue.Add(input);
}

TaskScheduler InternalCurrentTaskScheduler()
{
    var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
    return (TaskScheduler)propertyInfo.GetValue(null);
}

連續(xù)輸入 1、2、3、4,輸出如下:

1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
2
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 2, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
3
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 3, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
4
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 4, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True

從執(zhí)行結(jié)果中可以看出,第一次 await 之前,當(dāng)前線程是 long-running task 所在的線程(thread id: 9),此后就變成了 ThreadPool
中的線程(thread id: 4)。

至于為什么之后一直是 ThreadPool 中的線程(thread id: 4),這邊做一下簡單的解釋。在我以前一篇介紹 await 的文章中介紹了 await 的執(zhí)行過程,以及 await 之后的代碼會在哪個(gè)線程上執(zhí)行。

?

https://www.cnblogs.com/eventhorizon/p/15912383.html

  1. 第一次 await 前,當(dāng)前線程是 long-running task 所在的線程(thread id: 9),綁定了 TaskScheduler(ThreadPoolTaskScheduler),也就是說 await 之后的代碼會被調(diào)度到 ThreadPool 中執(zhí)行。
  2. 第一次 await 之后的代碼被調(diào)度到 ThreadPool 中的線程(thread id: 4)上執(zhí)行。
  3. ThreadPool 中的線程不會綁定 TaskScheduler,也就意味著之后的代碼還是會在 ThreadPool 中的線程上執(zhí)行,并且是本地隊(duì)列優(yōu)先,所以一直是 thread id: 4 這個(gè)線程在從本地隊(duì)列中取出任務(wù)在執(zhí)行。

線程池的介紹請參考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/15316955.html

回到本文的主題,如果在 long-running task 使用了 await 調(diào)用一個(gè) async 方法,就會導(dǎo)致為 long-running task 分配的獨(dú)立線程提前退出,和我們的預(yù)期不符。

long-running task 中 調(diào)用 一個(gè) async 方法的可能姿勢

使用 Task.Wait

在 long-running task 中調(diào)用一個(gè) async 方法,可以使用 Task.Wait 來阻塞當(dāng)前線程,直到 async 方法執(zhí)行完畢。
對于 Task.Factory.StartNew 創(chuàng)建出來的 long-running task 來說,因?yàn)槠浣壎?ThreadPoolTaskScheduler,就算是使用 Task.Wait
阻塞了當(dāng)前線程,也不會導(dǎo)致死鎖。
并且 Task.Wait 會把異常拋出來,所以我們可以在 catch 中處理異常。

// ...
Task.Factory.StartNew( () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        ProcessAsync(input).Wait();
        Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}");
    }
}, TaskCreationOptions.LongRunning);
// ...

輸出如下:

1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 2, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 3, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 4, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False

Task.Wait 并不會對 async 方法內(nèi)部產(chǎn)生影響,所以 async 方法內(nèi)部的代碼還是按照正常的邏輯執(zhí)行。這邊 ProcessAsync 方法內(nèi)部打印的
thread id 沒變純粹是因?yàn)?ThreadPool 目前就只創(chuàng)建了一個(gè)線程,你可以瘋狂輸入看看結(jié)果。

關(guān)于 Task.Wait 的使用,可以參考我另一篇博客
https://www.cnblogs.com/eventhorizon/p/17481757.html

使用自定義的 TaskScheduler 來創(chuàng)建 long-running task

Task.Factory.StartNew(async () =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine(
            $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        await ProcessAsync(input);
        Console.WriteLine(
            $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());

class CustomerTaskScheduler : TaskScheduler
{
    // 這邊的 BlockingCollection 只是舉個(gè)例子,如果是普通的隊(duì)列,配合鎖也是可以的。
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

    public CustomerTaskScheduler()
    {
        var thread = new Thread(() =>
        {
            foreach (var task in _tasks.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
        })
        {
            IsBackground = true
        };
        thread.Start();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}

輸出如下:

1
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False

因?yàn)樾薷牧松舷挛慕壎ǖ?TaskScheduler,會影響到 async 方法內(nèi)部 await 回調(diào)的執(zhí)行。

這種做法不推薦使用,因?yàn)榭赡軙?dǎo)致死鎖。

如果我將 await 改成 Task.Wait,就會導(dǎo)致死鎖。

Task.Factory.StartNew(() =>
{
    while (true)
    {
        var input = queue.Take();
        Console.WriteLine(
            $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        ProcessAsync(input).Wait();
        Console.WriteLine(
            $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());

輸出如下:

1
Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False

后面就沒有輸出了,因?yàn)樗梨i了,除非我們在 ProcessAsync 方法內(nèi)部每個(gè) await 的 Task 后加上ConfigureAwait(false)。

同理,同學(xué)們也可以嘗試用 SynchronizationContext 來實(shí)現(xiàn)類似的效果,同樣有死鎖的風(fēng)險(xiǎn)。

總結(jié)

如果你想要在一個(gè) long-running task 中執(zhí)行 async 方法,使用 await 關(guān)鍵字會導(dǎo)致 long-running task 的獨(dú)立線程提前退出。

比較推薦的做法是使用 Task.Wait。如果連續(xù)執(zhí)行多個(gè) async 方法,建議將這些 async 方法封裝成一個(gè)新方法,然后只 Wait 這個(gè)新方法的 Task。

轉(zhuǎn)自https://www.cnblogs.com/eventhorizon/p/17497359.html


該文章在 2025/8/8 9:12:36 編輯過
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點(diǎn)晴ERP是一款針對中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場、車隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場作業(yè)而開發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉儲管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質(zhì)期管理,貨位管理,庫位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved

黄频国产免费高清视频,久久不卡精品中文字幕一区,激情五月天AV电影在线观看,欧美国产韩国日本一区二区
婷婷网亚洲色偷偷男人的天堂 | 一本久久久综合精品视频 | 亚洲少妇毛多水多视频 | 亚洲欧美日韩动漫一区二区 | 亚洲色偷国产一区二区三区 | 中日AV高清字幕版在线观看 |