異步編程(二)用戶模式線程同步
基元線程同步構造
多個線程同時訪問共享數據時,線程同步能防止數據損壞。不需要線程同步是最理想的情況,因為線程同步存在許多問題。
第一個問題就是它比較繁瑣,而且很容易寫錯。
第二個問題是,他們會損害性能。獲取和釋放鎖是需要時間的。
第三個問題是,他們一次只允許一個線程訪問資源,就可能導致其他線程被阻塞,使用多線程是為了提高效率,而阻塞無疑降低了你的效率。
綜上所述,線程同步是一件不好的事情,所以在設計自己的應用程序時,應盡可能避免進行線程同步。具體就是避免使用像靜態字段這樣的共享數據。線程用new操作符構造對象時,new操作符會返回新對象的引用,如果能避免將這個引用傳給可能同時使用對象的另一個線程,就不必同步對該對象進行訪問。可試著使用值類型,因為他們總是被賦值,每個我線程操作的都是它自己的副本。最後,多個線程同時對共享數據進行制度訪問是沒有任何問題的。
基元用戶模式和內核模式構造
基元(primitive)是指可以在代碼中使用的最簡單的構造。有兩種基元構造:用戶模式(user mode)和內核模式(kernel mode)。
用戶模式構造
應盡量使用基元用戶模式構造,他們的速度要顯著快於內核模式的構造。這是因為他們使用了特殊cpu指令來協調線程。這意味著協調實在硬件中發生的(所以才這麽塊)。但這也意味著windows操作系統永遠檢測不到一個線程在紀元用戶模式的構造上阻塞了。由於在用戶模式的基元構造上阻塞的線程池線程永遠不認為已阻塞,所以線程池不會創建新的線程來替換這種臨時阻塞的線程。此外,這些cpu指令只阻塞相當短的時間。
這也是我認為較好的構造方式,CLR Via C# 的作者Jeffrey Richter也建議盡量使用用戶模式。但用戶模式也有一個缺點:只有windows操作系統內核才能停止一個線程的運行(防止它浪費cpu時間)。在用戶模式中運行的線程可能被系統搶占(preempted),想要取得資源但暫時無法取到的線程會一直在用戶模式中“自旋”。這回浪費大量cpu時間。
內核模式構造
內核模式的構造是是由windows操作系統自身提供的。所以,他們要求在應用程序的線程中調用由操作系統內核實現的函數。將線程從用戶模式切換到內核模式(或相反)會導致巨大的性能損失,這正式為什麽要避免使用內核模式構造的原因。但它們有一個重要的有點:線程通過內核模式的構造獲取其他線程擁有的資源時,windows會阻塞線程以避免它浪費cpu時間。當資源變得可用時,windows會恢復線程,允許它訪問資源。
對於一個等待的線程,如果不釋放它,它就一直阻塞。如果是用戶模式,線程將一直在cpu上運行,我們稱為“活鎖”。如果是內核模式,線程將一直阻塞,我們稱為“死鎖”。兩種情況都不好,但在兩者之間,死鎖總是優於活鎖,因為活鎖既浪費cpu時間,又浪費內存(線程棧等),而死鎖只浪費內存。
理想中的模式
構造應該兼具上面兩種模式的長處。也就是說,在沒有競爭的情況下,應該快而不會阻塞(用戶模式)。但如果存在競爭,我希望它被操作系統內核阻塞。這種構造,我們稱為混合構造(hybrid construct)。應用程序使用混合構造是很常見的現象。
用戶模式構造
易變字段 VOLATILE
靜態system.threading.volatile類提供了兩個靜態方法
這個方法比較特殊,他們事實上會禁止c#編譯器、jit編譯器和cpu平常執行的一些優化。下面描述了這些方法是如何工作的。
1 Volatile.Write方法強迫location中的值在調用時寫入。此外,按照編碼順序,之前的加載和存儲操作必須在調用Volatile.Write之前發生
2 Volatile.Read方法強迫location中的值在調用時讀取。此外,按照編碼順序,之後的加載和存儲操作必須必須在調用Volatile.Read之後發生。
這樣會避免編譯器對你的代碼進行了過度的優化,提前賦值數據。當然,你也可以使用volatile關鍵字,不過我並不喜歡這麽做,因為大多時候,你的讀取或寫入順序都可以按照正常方式進行,這樣效率更高。你可以在用必要的時候顯示調用Volatile類的方法,這樣程序的性能更好。
互鎖構造 interlocked
volatile的read方法執行一次原子性的讀取操作,write方法執行一次原子性的寫入操作。本節我們討論靜態system.threading.interlocked類提供的方法。interlocked類中的每個方法都執行一次院子讀取以及寫入操作。此外,interlocked的所有方法都建立了完整的內存柵欄(memory fence)。換言之,調用某個interlocked方法之前的任何變量寫入都在這個interlocked方法調用之前執行;而這個調用之後的任何變量讀取都在這個調用之後讀取。
interlocked方法的運行速度相當快,而且能做不少事情。下面我們有一個簡單的例子,使用interlocked方法異步查詢幾個web服務器,並同時處理返回數據。代碼很短,而且不阻塞任何線程,而且使用線程池來實現自動伸縮。
internal enum CoordinationStatus { AllDone, Timeout, Cancel } internal sealed class MultiWebRequests { //這個輔助類用於協調所有異步操作 private AsyncCoordinator m_ac = new AsyncCoordinator(); //這是想要查詢的web服務器及其響應(異常或int32)的集合 //註意:多個線程訪問該字典不需要以同步方式進行 //因為構造後鍵就是只讀的 private Dictionary<String, Object> m_servers = new Dictionary<string, object> { {"https://www.baidu.com/" ,null}, {"https://www.microsoft.com/zh-cn/",null}, {"https://www.taobao.com/",null} }; public MultiWebRequests(Int32 timeout=Timeout.Infinite) { var httpClient = new HttpClient(); foreach (var server in m_servers.Keys) { m_ac.AboutToBegin(1); httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));//task是Task<byte[]>類型 } //告訴AsyncCoordinator所有操作都已發起,並在所有操作完成 //調用cancel或者發生超時的時候調用AllDone m_ac.AllBegun(AllDone, timeout); } //將結果保存到集合中,然後將完成狀態進行通知 private void ComputeResult(string server, Task<Byte[]> task) { object result; if (task.Exception!=null) { result = task.Exception.InnerException; } else { //線程池線程處理I/O完成 //在此添加自己的計算密集型算法。。。。 result = task.Result.Length; } //保存結果(exception/sum),指出一個操作完成 m_servers[server] = result; m_ac.JustEnded(); } //調用這個方法指出結果已無關緊要 public void Cancel() { m_ac.Cancel(); } //所有web服務器都響應、調用了cancel或者發生超時,就調用該方法,顯示執行結果 private void AllDone(CoordinationStatus status) { switch (status) { case CoordinationStatus.Cancel: Console.WriteLine("operation canceled"); break; case CoordinationStatus.Timeout: Console.WriteLine("operation time-out"); break; case CoordinationStatus.AllDone: Console.WriteLine("operation completed;results below;"); foreach (var server in m_servers) { Console.WriteLine("{0}",server.Key); object result = server.Value; if (result is Exception) { Console.WriteLine("failed due to {0}",result.GetType().Name); } else { Console.WriteLine("returned {0} bytes",result); } } break; } } }
可以看出,上述代碼並沒有直接使用interlocked的任何方法,因為我將所有協調代碼都放到可重用的AsyncCoordinator類中。如下
internal sealed class AsyncCoordinator { //AllBegun內部調用justended來遞減它 private Int32 m_opCount = 1; //0 = false 1= true private Int32 m_statusReported = 0; private Action<CoordinationStatus> m_callback; private Timer m_timer; //該方法必須在發起一個操作之前調用 public void AboutToBegin(Int32 opsToAdd=1) { //返回的是計算之後的m_opCount的值 Interlocked.Add(ref m_opCount, opsToAdd); } //該方法必須在處理好一個操作的結果之後調用 public void JustEnded() { if (Interlocked.Decrement(ref m_opCount)==0) //返回的是計算之後的m_opCount的值 { ReportStatus(CoordinationStatus.AllDone); } } //該方法必須在發起所有操作之後調用 public void AllBegun(Action<CoordinationStatus> callback,Int32 timeout=Timeout.Infinite) { m_callback = callback; if (timeout!=Timeout.Infinite) { m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite); } //相當於多減了一次,對沖初始化把m_opCount設置為1的多出來的1 JustEnded(); } private void TimeExpired(object o) { ReportStatus(CoordinationStatus.Timeout); } public void Cancel() { ReportStatus(CoordinationStatus.Cancel); } private void ReportStatus(CoordinationStatus status) { //這個用來判斷狀態是否是從未報告過;只有第一次調用這個方法的狀態才會被記錄 if (Interlocked.Exchange(ref m_statusReported,1)==0)//這個將m_statusReported的值變為1,並返回m_statusReported原有的值 { m_callback(status); } } }
執行結果
構造一個MultiWebRequests時,會先初始化一個AsyncCoordinator和包含了一組服務器uri的字典。然後,它以異步方式一個接一個地發出所有web請求。為此,他首先調用AsyncCoordinator的AboutToBegin方法,想他傳遞要發出的請求數量(這裏也可以一次把所有要執行請求的數量發給AboutToBegin)。然後,他調用httpClient.GetByteArrayAsync(server)初始化請求,這回返回一個task,ContinueWith執行computeResult方法,它可以並發處理結果。所有請求都發出後,將調用AsyncCoordinator的AllBegun方法,向他傳遞要在所有操作完成後執行的方法(AllDone)以及一個超時值。每收到一個響應,線程池都會調用computeResult進行後續處理任務,computeResult保存請求結果之後最後會調用JustEnded,使AsyncCoordinator知道一個對象已經執行完成。
JustEnded方法判斷出所有任務都已經執行完成後,會調用回調(AllDone)處理來自所有web服務器的結果。執行AllDone方法的線程就是獲取最後一個web服務器響應的那個線程池線程。但如果發生超時或者調用cancel方法的那個線程,調用AllDone的線程就是向asyncCoordinator通知超時的那個線程池線程,或者調用cancel方法的那個線程。
註意,這裏存在競態條件,因為以下事情可能恰好同時發生:所有web服務器請求完成、調用Allbegun、發生超時以及調用cancel。這時,AsyncCoordinator會選擇1個贏家和3個輸家,確保alldone不被多次調用。贏家是通過傳給AllDone的status實參來識別的。
AsyncCoordinator類封裝了所有線程協調邏輯。他用interlocked提供的方法來操作一切,確保代碼以極快速度運行,同時並沒有線程會被阻塞。
AsyncCoordinator類最重要的字段就是m_opCount,用於跟蹤仍在進行的一步操作的數量。每個異步操作開始前都會調用AboutToBegin。該方法調用interlocked.Add,以院子方式將傳給它的數字加到m_opCount字段上,m_opCount上的運算必須以原子方式進行。處理好web服務器的響應之後會調用justEnded,該方法調用interlocked.Decerment,以院子方式從m_opCount上減1.當opCount等於0時,由這個線程調用ReportStatus。
註意:m_opCount字段初始化為1(而非0),這一點很重要。執行構造器方法的線程在發出web服務器請求期間,由於m_opCount字段位1,所以能保證AllDone不會被調用。構造器調用AllBegun之前,m_opCount永遠不可能變為0。構造器調用allBegun時,會執行一次justEnded方法來遞減m_opCount,所以事實上撤掉了把它初始化為1的效果。
實現簡單的自旋鎖
Interlocked的方法很好用,但是主要用於操作Int值。如果需要原子性地操作類對象中的一組字段,又該怎麽辦呢?在這種情況,需要采取一個辦法阻止所有線程,只允許其中一個進入對字段進行操作的。可以使用Interlocked的方法構造一個線程同步塊。
internal struct SimpleSpinLock { private Int32 m_ResourceInUse;// 0=false(默認) 1 =true public void Enter() { while (true) { //總是將資源設為“正在使用”(1) //只有從“未使用”編程“正在使用”才會返回 if (Interlocked.Exchange(ref m_ResourceInUse,1)==0) { return; } //在這裏添加“黑科技” } } public void Leave() { //將資源標記為“未使用” Volatile.Write(ref m_ResourceInUse, 0); } }
下面這個類展示了如何使用SimpleSpinLock
public sealed class SomeResource { private SimpleSpinLock m_sl = new SimpleSpinLock(); public void AccessResource() { m_sl.Enter(); //一次只有一個線程才能進入這裏訪問資源 m_sl.Leave(); } }
這個鎖很簡單,他的最大問題是會造成線程“自旋”,自旋會浪費cpu時間。
SpinLock是.net已經實現的自旋鎖,他和我們前面舉例的SimpleSpinLock類似,只是使用了spinwait結構來增強性能(SpinWait在自旋中加入sleep方法,使他在一段時間內不占用cpu時間),還增加了超時支持。
這篇我們暫時介紹以上概念,下篇文字我們一起了解內核模式。
異步編程(二)用戶模式線程同步