Docs-.NET-.NET-指南-非同步程式設計模式:使用基於任務的非同步模式
ylbtech-Docs-.NET-.NET-指南-非同步程式設計模式:使用基於任務的非同步模式 |
1.返回頂部 |
使用基於任務的非同步模式
使用基於任務的非同步模式 (TAP) 處理非同步操作時,可以使用回叫實現等待,而不會阻塞。對於任務,這可通過Task.ContinueWith等方法實現。通過允許在正常控制流中等待非同步操縱,基於語言的非同步支援可隱藏回叫,並且編譯器生成的程式碼可提供此相同 API 級別的支援。
使用 Await 掛起執行
自 .NET Framework 4.5 起,可以使用 C# 中的await關鍵字和 Visual Basic 中的await
表示式的型別為void
。等待Task<TResult>時,await
表示式的型別為TResult
。await
表示式必須出現在非同步方法的正文內。若要詳細瞭解 .NET Framework 4.5 中的 C# 和 Visual Basic 語言支援,請參閱 C# 和 Visual Basic 語言規範。
實際上,await 功能通過使用延續任務在任務上安裝回叫。此回叫在掛起點恢復非同步方法。恢復非同步方法時,如果等待的操作已成功完成且為Task<TResult>,返回的是TResult
Task
可能由於多個異常而出錯,但只會傳播一個異常。不過,Task.Exception屬性會返回包含所有錯誤的AggregateException異常。
如果同步上下文(SynchronizationContext物件)與暫停時正在執行非同步方法的執行緒(例如,SynchronizationContext.Current屬性不是null
呼叫非同步方法時,將同步執行函式的正文,直到遇見尚未完成的可等待例項上的第一個 await 表示式,此時呼叫返回到呼叫方。如果非同步方法不返回void
,將會返回Task或Task<TResult>物件,以表示正在進行的計算。在非 void 非同步方法中,如果遇到 return 語句或到達方法正文末尾,任務就以RanToCompletion最終狀態完成。如果未經處理的異常導致無法控制非同步方法正文,任務就以Faulted狀態結束。如果異常為OperationCanceledException,任務改為以Canceled狀態結束。通過此方式,最終將釋出結果或異常。
此行為有幾種重要特殊情況。出於效能原因,如果任務在等待時已完成,則不會生成控制元件,並且該函式將繼續執行。返回到原始上下文並不總是所需的行為,可對其進行更改;將在下一節中更詳細地描述此內容。
使用 Yield 和 ConfigureAwait 配置掛起和恢復
有多種方法可更好地控制非同步方法的執行。例如,可以使用Task.Yield方法,將暫停點引入非同步方法:
C#public class Task : …
{
public static YieldAwaitable Yield();
…
}
這相當於以非同步方式釋出或計劃返回當前上下文。
C#Task.Run(async delegate
{
for(int i=0; i<1000000; i++)
{
await Task.Yield(); // fork the continuation into a separate work item
...
}
});
還可以使用Task.ConfigureAwait方法,更好地控制非同步方法中的暫停和恢復。如前所述,預設情況下,非同步方法掛起時會捕獲當前上下文,捕獲的上下文用於在恢復時呼叫非同步方法的延續。在多數情況下,這就是你所需的確切行為。在其他情況下,你可能不關心延續上下文,則可以通過避免此類釋出返回原始上下文來獲得更好的效能。若要啟用此功能,請使用Task.ConfigureAwait方法,指示等待操作不要捕獲和恢復上下文,而是繼續執行正在等待完成的所有非同步操作:
C#await someTask.ConfigureAwait(continueOnCapturedContext:false);
取消非同步操作
從 .NET Framework 4 開始,支援取消操作的 TAP 方法提供至少一個接受取消令牌(CancellationToken物件)的過載。
可通過取消令牌源(CancellationTokenSource物件)建立取消令牌。源的Token屬性返回取消令牌,它在源的Cancel方法獲得呼叫時收到訊號。例如,若要下載一個網頁,並且希望能夠取消此操作,請建立CancellationTokenSource物件,將它的令牌傳遞給 TAP 方法,再在準備好取消此操作時,呼叫源的Cancel方法:
C#var cts = new CancellationTokenSource();
string result = await DownloadStringAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();
若要取消多個非同步呼叫,可以將相同令牌傳遞給所有呼叫:
C#var cts = new CancellationTokenSource();
IList<string> results = await Task.WhenAll(from url in urls select DownloadStringAsync(url, cts.Token));
// at some point later, potentially on another thread
…
cts.Cancel();
或者,將相同令牌傳遞給操作的選擇性子集:
C#var cts = new CancellationTokenSource();
byte [] data = await DownloadDataAsync(url, cts.Token);
await SaveToDiskAsync(outputPath, data, CancellationToken.None);
… // at some point later, potentially on another thread
cts.Cancel();
可以從任意執行緒啟動取消請求。
可以將CancellationToken.None值傳遞給接受取消令牌的任何方法,指明絕不會請求取消操作。這會導致CancellationToken.CanBeCanceled屬性返回false
,並且呼叫的方法可以相應地進行優化。出於測試目的,還可以通過傳入預取消標記(該標記使用接受布林值的建構函式進行例項化)來指示是否應以已取消或未取消狀態啟動標記。
使用此方法進行取消具有以下優點:
-
可以將相同的取消標記傳遞給任意數量的非同步和同步操作。
-
相同的取消請求可能會擴充套件到任意數量的偵聽器。
-
非同步 API 的開發人員可完全控制是否可以請求取消以及取消何時生效。
-
使用 API 的程式碼可以選擇性地確定將對其傳播取消請求的非同步呼叫。
監視進度
某些非同步方法通過傳入非同步方法的進度介面來公開進度。例如,設想某個函式以非同步方式下載文字字串,並在該過程中引發包括到目前為止下載完成百分比的進度更新。此類方法可在 Windows Presentation Foundation (WPF) 應用程式中使用,如下所示:
C#private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtResult.Text = await DownloadStringAsync(txtUrl.Text,
new Progress<int>(p => pbDownloadProgress.Value = p));
}
finally { btnDownload.IsEnabled = true; }
}
使用內建的基於任務的連結符
System.Threading.Tasks名稱空間包含多個方法,可用於撰寫和處理任務。
Task.Run
Task類包含多個Run方法,以便於將工作作為Task或Task<TResult>輕鬆解除安裝到執行緒池,例如:
C#public async void button1_Click(object sender, EventArgs e)
{
textBox1.Text = await Task.Run(() =>
{
// … do compute-bound work here
return answer;
});
}
其中部分Run方法(如Task.Run(Func<Task>)過載)以TaskFactory.StartNew方法的簡約表示形式存在。藉助其他過載(如Task.Run(Func<Task>)),可以在解除安裝的工作內使用 await,例如:
C#public async void button1_Click(object sender, EventArgs e)
{
pictureBox1.Image = await Task.Run(async() =>
{
using(Bitmap bmp1 = await DownloadFirstImageAsync())
using(Bitmap bmp2 = await DownloadSecondImageAsync())
return Mashup(bmp1, bmp2);
});
}
此類過載在邏輯上相當於結合使用TaskFactory.StartNew方法和任務並行庫中的Unwrap擴充套件方法。
Task.FromResult
FromResult方法的適用情景為,資料可能已存在,且只需通過提升到Task<TResult>的任務返回方法返回:
C#public Task<int> GetValueAsync(string key)
{
int cachedValue;
return TryGetCachedValue(out cachedValue) ?
Task.FromResult(cachedValue) :
GetValueAsyncInternal();
}
private async Task<int> GetValueAsyncInternal(string key)
{
…
}
Task.WhenAll
WhenAll方法可用於非同步等待多個表示為任務的非同步操作。該方法所具有的多個過載支援一組非泛型任務或一組不統一的常規任務(如非同步等待多個返回 void 的操作,或非同步等待多個返回值的方法,其中每個值可能具有不同型別),並支援一組統一的常規任務(如非同步等待多個TResult
返回方法)。
假設你想要向多個客戶傳送電子郵件。你可以重疊傳送郵件,因此傳送郵件時無需等待上一封郵件完成傳送。還可以檢視傳送操作完成的時間,以及是否發生了錯誤:
C#IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);
此程式碼不顯式處理可能發生的異常,而是通過對WhenAll生成的任務執行await
傳播異常。若要處理該異常,可以使用以下程式碼:
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
...
}
在這種情況下,如果任意非同步操作失敗,所有異常都會合併到AggregateException異常中,此異常儲存在WhenAll方法返回的Task中。但是,僅通過await
關鍵字傳播其中一個異常。如果想要檢查所有異常,可以重寫前面的程式碼,如下所示:
Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{
await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{
foreach(Task faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
讓我們考慮一下以非同步方式從 Web 下載多個檔案的示例。在此示例中,所有非同步操作具有相同的結果型別,並很容易對結果進行訪問:
C#string [] pages = await Task.WhenAll(
from url in urls select DownloadStringAsync(url));
可以使用上一個返回 void 方案中所討論的異常處理技術:
C#Task<string> [] asyncOps =
(from url in urls select DownloadStringAsync(url)).ToArray();
try
{
string [] pages = await Task.WhenAll(asyncOps);
...
}
catch(Exception exc)
{
foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))
{
… // work with faulted and faulted.Exception
}
}
Task.WhenAny
WhenAny方法可用於非同步等待多個表示為要完成的任務的非同步操作之一。此方法適用於四個主要用例:
-
冗餘:多次執行一個操作並選擇最先完成的一次(例如,聯絡將生成一個結果的多個股市行情 Web 服務並選擇最快完成的一個)。
-
交錯:啟動多個操作並等待所有這些操作完成,但在這些操作完成時對其進行處理。
-
限制:允許在其他操作完成時開始附加操作。這是交錯方案的擴充套件。
-
早期釋放:例如,用任務 t1 表示的操作可以與任務 t2 組成WhenAny任務,並且可以等待WhenAny任務。任務 t2 可以表示超時、取消或其他一些導致WhenAny任務先於 t1 完成的訊號。
冗餘
假設你想要決定是否購買股票。你信任一些股票建議 Web 服務,但每個服務最終會在不同的時間段變得很慢,具體取決於每日負載。WhenAny方法可用於在任何操作完成時接收通知:
C#var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol),
GetBuyRecommendation2Async(symbol),
GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);
WhenAll返回已成功完成的所有任務的取消包裝結果。與它不同,WhenAny返回已完成的任意任務。如果任務失敗,重要的是知道該任務失敗,如果任務成功,重要的是知道返回值與哪個任務相關聯。因此,你需要訪問返回任務的結果,或進一步等待,如本示例中所示。
與WhenAll一樣,必須能夠容納異常。因為接收到完成的任務後,可以等待返回的任務傳播錯誤,並適當地進行try/catch
,例如:
Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{
Task<bool> recommendation = await Task.WhenAny(recommendations);
try
{
if (await recommendation) BuyStock(symbol);
break;
}
catch(WebException exc)
{
recommendations.Remove(recommendation);
}
}
此外,即使第一個任務成功完成,後續任務也可能會失敗。此時,有多個用於處理異常的選項:可以等待所有啟動的任務完成,在這種情況下可以使用WhenAll方法;或者做出所有異常都重要且必須記錄的決定。為此,可以使用延續任務以在任務非同步完成時接收通知:
C#foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => { if (t.IsFaulted) Log(t.Exception); });
}
或:
C#foreach(Task recommendation in recommendations)
{
var ignored = recommendation.ContinueWith(
t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}
或者甚至:
C#private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{
foreach(var task in tasks)
{
try { await task; }
catch(Exception exc) { Log(exc); }
}
}
…
LogCompletionIfFailed(recommendations);
最後,若要取消所有剩餘操作:
C#var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{
GetBuyRecommendation1Async(symbol, cts.Token),
GetBuyRecommendation2Async(symbol, cts.Token),
GetBuyRecommendation3Async(symbol, cts.Token)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);
交錯
假設你要從 Web 下載影象,並且處理每個影象(例如,將影象新增到 UI 控制元件)。可以在 UI 執行緒上按順序處理影象,但建議儘可能同時下載影象。此外,建議不要直到所有影象都下載完成才將影象新增到 UI。建議在完成下載時新增它們。
C#List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
還可以將交錯應用於涉及下載影象ThreadPool的計算密集型處理的方案;例如:
C#List<Task<Bitmap>> imageTasks =
(from imageUrl in urls select GetBitmapAsync(imageUrl)
.ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch{}
}
遏制
請考慮交錯示例,因使用者大量下載影象而導致下載必須受到遏制除外;例如,你希望僅能同時下載特定數目的內容。為此,可以啟動非同步操作的子集。操作完成後,你可以啟動其他操作對其進行替代:
C#const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
早期釋放
假設正在非同步等待某個操作完成的同時,對使用者的取消請求(例如,使用者單擊取消按鈕)進行響應。以下程式碼闡釋了此方案:
C#private CancellationTokenSource m_cts;
public void btnCancel_Click(object sender, EventArgs e)
{
if (m_cts != null) m_cts.Cancel();
}
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
if (imageDownload.IsCompleted)
{
Bitmap image = await imageDownload;
panel.AddImage(image);
}
else imageDownload.ContinueWith(t => Log(t));
}
finally { btnRun.Enabled = true; }
}
private static async Task UntilCompletionOrCancellation(
Task asyncOp, CancellationToken ct)
{
var tcs = new TaskCompletionSource<bool>();
using(ct.Register(() => tcs.TrySetResult(true)))
await Task.WhenAny(asyncOp, tcs.Task);
return asyncOp;
}
一旦決定退出,此實現將重新啟用使用者介面,但不會取消基礎非同步操作。另一種選擇是決定退出時,取消掛起的操作,但在操作完成之前不重新建立使用者介面,可能會由於取消請求而提前結束:
C#private CancellationTokenSource m_cts;
public async void btnRun_Click(object sender, EventArgs e)
{
m_cts = new CancellationTokenSource();
btnRun.Enabled = false;
try
{
Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);
await UntilCompletionOrCancellation(imageDownload, m_cts.Token);
Bitmap image = await imageDownload;
panel.AddImage(image);
}
catch(OperationCanceledException) {}
finally { btnRun.Enabled = true; }
}
另一個早期釋放示例涉及結合使用WhenAny方法和Delay方法,下一部分將對此進行介紹。
Task.Delay
Task.Delay方法可用於將暫停引入非同步方法的執行中。這對於許多功能都非常有用,包括構建輪詢迴圈和延遲預定時間段的使用者輸入處理。Task.Delay方法還可以與Task.WhenAny結合使用,以對 await 實現超時。
如果某任務屬於較大型非同步操作(如 ASP.NET Web 服務)中的一部分,由於花費時間過長而不能完成,則整體操作可能會受到影響(尤其是此任務一直不能完成的情況下)。因此,等待非同步操作時可以超時非常重要。雖然同步Task.Wait、Task.WaitAll和Task.WaitAny方法接受超時值,但相應的TaskFactory.ContinueWhenAll/Task.WhenAny和前述Task.WhenAll/Task.WhenAny方法不接受。相反,可以將Task.Delay與Task.WhenAny結合使用,以實現超時。
例如,在 UI 應用程式中,假設你想要下載影象,並在影象下載期間禁用該 UI。但是,如果下載時間過長,你希望重新啟用 UI 並放棄下載:
C#public async void btnDownload_Click(object sender, EventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap> download = GetBitmapAsync(url);
if (download == await Task.WhenAny(download, Task.Delay(3000)))
{
Bitmap bmp = await download;
pictureBox.Image = bmp;
status.Text = "Downloaded";
}
else
{
pictureBox.Image = null;
status.Text = "Timed out";
var ignored = download.ContinueWith(
t => Trace("Task finally completed"));
}
}
finally { btnDownload.Enabled = true; }
}
這同樣適用於多個下載,因為WhenAll返回任務:
C#public async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.Enabled = false;
try
{
Task<Bitmap[]> downloads =
Task.WhenAll(from url in urls select GetBitmapAsync(url));
if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))
{
foreach(var bmp in downloads.Result) panel.AddImage(bmp);
status.Text = "Downloaded";
}
else
{
status.Text = "Timed out";
downloads.ContinueWith(t => Log(t));
}
}
finally { btnDownload.Enabled = true; }
}
構建基於任務的連結符
因為任務可以完全代表非同步操作、提供同步和非同步功能來加入操作、檢索其結果等,所以可以構建組成任務的連結符的庫以構建更大的模式。如前一部分所述,.NET Framework 包括一些內建連結符,但是,你也可以構建自己的連結符。以下各節提供了一些潛在的連結符方法和型別的示例。
RetryOnFault
在許多情況下,如果上次嘗試失敗,你可能想要重試操作。對於同步程式碼,你可能會構建一個幫助器方法來實現此目的,如下例中的RetryOnFault
:
public static T RetryOnFault<T>(
Func<T> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return function(); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
你可以為非同步操作(使用 TAP 實現,因此返回任務)構建幾乎相同的幫助器方法:
C#public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
}
return default(T);
}
然後,可以使用此連結符將重試編碼到應用程式的邏輯中,例如:
C#// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(
() => DownloadStringAsync(url), 3);
可以進一步擴充套件RetryOnFault
函式。例如,該函式可以接受另一個Func<Task>
(在重試間隔期間呼叫以確定何時重試該操作):
public static async Task<T> RetryOnFault<T>(
Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{
for(int i=0; i<maxTries; i++)
{
try { return await function().ConfigureAwait(false); }
catch { if (i == maxTries-1) throw; }
await retryWhen().ConfigureAwait(false);
}
return default(T);
}
重試該操作前,可以使用以下函式等待片刻:
C#// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(
() => DownloadStringAsync(url), 3, () => Task.Delay(1000));
NeedOnlyOne
有時,你可以利用冗餘改進操作延遲和提高成功的可能性。假設有多個 Web 服務提供股票報價,但在一天中的不同時間,每個服務可能提供不同級別的質量和響應時間。為了應對這些波動,你可能會向所有 Web 服務發出請求,並且只要從其中一個獲得響應,立刻取消剩餘的請求。你可以通過 helper 函式更輕鬆地實現此啟動多個操作的通用模式:等待任何操作,然後取消其餘部分。以下示例中的NeedOnlyOne
函式闡釋了這種方案:
public static async Task<T> NeedOnlyOne(
params Func<CancellationToken,Task<T>> [] functions)
{
var cts = new CancellationTokenSource();
var tasks = (from function in functions
select function(cts.Token)).ToArray();
var completed = await Task.WhenAny(tasks).ConfigureAwait(false);
cts.Cancel();
foreach(var task in tasks)
{
var ignored = task.ContinueWith(
t => Log(t), TaskContinuationOptions.OnlyOnFaulted);
}
return completed;
}
然後,你可以使用此函式,如下所示:
C#double currentPrice = await NeedOnlyOne(
ct => GetCurrentPriceFromServer1Async("msft", ct),
ct => GetCurrentPriceFromServer2Async("msft", ct),
ct => GetCurrentPriceFromServer3Async("msft", ct));
交錯操作
處理大型任務集時,如果使用WhenAny方法支援交錯方案,可能存在潛在效能問題。每次呼叫WhenAny都會向每個任務註冊延續。對於 N 個任務,這將導致在交錯操作的操作期間建立 O(N2) 次延續。如果處理大型任務集,則可以使用連結符(以下示例中的Interleaved
)來解決效能問題:
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();
var sources = (from _ in Enumerable.Range(0, inputTasks.Count)
select new TaskCompletionSource<T>()).ToList();
int nextTaskIndex = -1;
foreach (var inputTask in inputTasks)
{
inputTask.ContinueWith(completed =>
{
var source = sources[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsFaulted)
source.TrySetException(completed.Exception.InnerExceptions);
else if (completed.IsCanceled)
source.TrySetCanceled();
else
source.TrySetResult(completed.Result);
}, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return from source in sources
select source.Task;
}
然後,可以在任務完成時,使用連結符來處理任務的結果,例如:
C#IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{
int result = await task;
…
}
WhenAllOrFirstException
在特定的分散/集中情況下,你可能想要等待集中的所有任務,除非某個任務發生錯誤。在這種情況下,你希望在異常發生時停止等待。你可以通過使用連結符方法(如WhenAllOrFirstException
)實現該目的,如下所示:
public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{
var inputs = tasks.ToList();
var ce = new CountdownEvent(inputs.Count);
var tcs = new TaskCompletionSource<T[]>();
Action<Task> onCompleted = (Task completed) =>
{
if (completed.IsFaulted)
tcs.TrySetException(completed.Exception.InnerExceptions);
if (ce.Signal() && !tcs.Task.IsCompleted)
tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
};
foreach (var t in inputs) t.ContinueWith(onCompleted);
return tcs.Task;
}
構建基於任務的資料結構
除了能夠生成基於任務的自定義組合器,在Task和Task<TResult>(表示非同步操作結果和聯接所必需的同步操作結果)中包含資料結構,還可以使其成為功能非常強大的型別,基於該型別可生成在非同步方案中使用的自定義資料結構。
AsyncCache
任務的重要方面之一是,它可能會分發到多個使用者,所有使用者都可以等待任務、向任務註冊延續、獲取任務結果或異常(如果是Task<TResult>的話)等。這樣一來,Task和Task<TResult>就非常適用於非同步快取基礎結構。下面的示例演示了基於Task<TResult>生成的功能非常強大的小型非同步快取:
C#public class AsyncCache<TKey, TValue>
{
private readonly Func<TKey, Task<TValue>> _valueFactory;
private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;
public AsyncCache(Func<TKey, Task<TValue>> valueFactory)
{
if (valueFactory == null) throw new ArgumentNullException("loader");
_valueFactory = valueFactory;
_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();
}
public Task<TValue> this[TKey key]
{
get
{
if (key == null) throw new ArgumentNullException("key");
return _map.GetOrAdd(key, toAdd =>
new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;
}
}
}
AsyncCache<TKey,TValue>類接受需要使用TKey
且返回Task<TResult>的函式作為建構函式的委託。以前從快取訪問的所有值都儲存在內部字典中,AsyncCache
可以確保每個金鑰僅生成一個任務,即便同時訪問快取也是如此。
例如,你可以生成下載網頁的快取:
C#private AsyncCache<string,string> m_webPages =
new AsyncCache<string,string>(DownloadStringAsync);
然後可以在任何需要網頁內容的時候,以非同步方式使用此快取。AsyncCache
類可確保下載儘可能少的頁面,並快取結果。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
btnDownload.IsEnabled = false;
try
{
txtContents.Text = await m_webPages["https://www.microsoft.com"];
}
finally { btnDownload.IsEnabled = true; }
}
AsyncProducerConsumerCollection
你還可以使用任務來構建協調非同步活動的資料結構。請考慮經典的並行設計模式之一:製造者/使用者。在此模式下,製造者生成資料,使用者使用資料,製造者和使用者可能會並行執行。例如,使用者處理之前由製造者生成的第 1 項,而製造者現在正在製造第 2 項。對於製造者/使用者模式,總是需要某種資料結構來儲存製造者建立的工作,以便使用者可以收到新資料的通知並及時發現新資料。
以下是基於任務構建的簡單資料結構,可以將非同步方法用作生成方和使用方:
C#public class AsyncProducerConsumerCollection<T>
{
private readonly Queue<T> m_collection = new Queue<T>();
private readonly Queue<TaskCompletionSource<T>> m_waiting =
new Queue<TaskCompletionSource<T>>();
public void Add(T item)
{
TaskCompletionSource<T> tcs = null;
lock (m_collection)
{
if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();
else m_collection.Enqueue(item);
}
if (tcs != null) tcs.TrySetResult(item);
}
public Task<T> Take()
{
lock (m_collection)
{
if (m_collection.Count > 0)
{
return Task.FromResult(m_collection.Dequeue());
}
else
{
var tcs = new TaskCompletionSource<T>();
m_waiting.Enqueue(tcs);
return tcs.Task;
}
}
}
}
通過該資料結構,可以編寫如下所示的程式碼:
C#private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.Take();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Add(data);
}
System.Threading.Tasks.Dataflow名稱空間包括BufferBlock<T>型別,可以類似方式使用它,但無需生成自定義集合型別:
C#private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{
while(true)
{
int nextItem = await m_data.ReceiveAsync();
ProcessNextItem(nextItem);
}
}
…
private static void Produce(int data)
{
m_data.Post(data);
}
備註
System.Threading.Tasks.Dataflow名稱空間通過 NuGet 可用於 .NET Framework 4.5。若要安裝包含System.Threading.Tasks.Dataflow名稱空間的程式集,請在 Visual Studio 中開啟專案,選擇“專案”選單中的“管理 NuGet 包”,再線上搜尋 Microsoft.Tpl.Dataflow 包。
請參閱
2、2.返回頂部 |
3.返回頂部 |
4.返回頂部 |
5.返回頂部 |
6.返回頂部 |
作者:ylbtech 出處:http://ylbtech.cnblogs.com/ 本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。 |