1. 程式人生 > >異步編程(二)用戶模式線程同步

異步編程(二)用戶模式線程同步

new 如何 int32 style type mem windows 模式 內存

基元線程同步構造

  多個線程同時訪問共享數據時,線程同步能防止數據損壞。不需要線程同步最理想的情況,因為線程同步存在許多問題。

第一個問題就是它比較繁瑣,而且很容易寫錯。

第二個問題是,他們會損害性能。獲取和釋放鎖是需要時間的。

第三個問題是,他們一次只允許一個線程訪問資源,就可能導致其他線程被阻塞,使用多線程是為了提高效率,而阻塞無疑降低了你的效率。

綜上所述,線程同步是一件不好的事情,所以在設計自己的應用程序時,應盡可能避免進行線程同步。具體就是避免使用像靜態字段這樣的共享數據。線程用new操作符構造對象時,new操作符會返回新對象的引用,如果能避免將這個引用傳給可能同時使用對象的另一個線程,就不必同步對該對象進行訪問。可試著使用值類型,因為他們總是被賦值,每個我線程操作的都是它自己的副本。最後,多個線程同時對共享數據進行制度訪問是沒有任何問題的。

基元用戶模式和內核模式構造

基元(primitive)是指可以在代碼中使用的最簡單的構造。有兩種基元構造:用戶模式(user mode)和內核模式(kernel mode)。

  用戶模式構造

  應盡量使用基元用戶模式構造,他們的速度要顯著快於內核模式的構造。這是因為他們使用了特殊cpu指令來協調線程。這意味著協調實在硬件中發生的(所以才這麽塊)。但這也意味著windows操作系統永遠檢測不到一個線程在紀元用戶模式的構造上阻塞了。由於在用戶模式的基元構造上阻塞的線程池線程永遠不認為已阻塞,所以線程池不會創建新的線程來替換這種臨時阻塞的線程。此外,這些cpu指令只阻塞相當短的時間。

這也是我認為較好的構造方式,CLR Via C# 的作者Jeffrey Richter也建議盡量使用用戶模式。但用戶模式也有一個缺點:只有windows操作系統內核才能停止一個線程的運行(防止它浪費cpu時間)。在用戶模式中運行的線程可能被系統搶占(preempted),想要取得資源但暫時無法取到的線程會一直在用戶模式中“自旋”。這回浪費大量cpu時間。

  內核模式構造

內核模式的構造是是由windows操作系統自身提供的。所以,他們要求在應用程序的線程中調用由操作系統內核實現的函數。將線程從用戶模式切換到內核模式(或相反)會導致巨大的性能損失,這正式為什麽要避免使用內核模式構造的原因。但它們有一個重要的有點:線程通過內核模式的構造獲取其他線程擁有的資源時,windows會阻塞線程以避免它浪費cpu時間。當資源變得可用時,windows會恢復線程,允許它訪問資源。

對於一個等待的線程,如果不釋放它,它就一直阻塞。如果是用戶模式,線程將一直在cpu上運行,我們稱為“活鎖”。如果是內核模式,線程將一直阻塞,我們稱為“死鎖”。兩種情況都不好,但在兩者之間,死鎖總是優於活鎖,因為活鎖既浪費cpu時間,又浪費內存(線程棧等),而死鎖只浪費內存。

  理想中的模式

構造應該兼具上面兩種模式的長處。也就是說,在沒有競爭的情況下,應該快而不會阻塞(用戶模式)。但如果存在競爭,我希望它被操作系統內核阻塞。這種構造,我們稱為混合構造(hybrid construct)。應用程序使用混合構造是很常見的現象。

用戶模式構造

  易變字段 VOLATILE

靜態system.threading.volatile類提供了兩個靜態方法

技術分享圖片

這個方法比較特殊,他們事實上會禁止c#編譯器、jit編譯器和cpu平常執行的一些優化。下面描述了這些方法是如何工作的。

1 Volatile.Write方法強迫location中的值在調用時寫入。此外,按照編碼順序,之前的加載和存儲操作必須在調用Volatile.Write之前發生

2 Volatile.Read方法強迫location中的值在調用時讀取。此外,按照編碼順序,之後的加載和存儲操作必須必須在調用Volatile.Read之後發生。

這樣會避免編譯器對你的代碼進行了過度的優化,提前賦值數據。當然,你也可以使用volatile關鍵字,不過我並不喜歡這麽做,因為大多時候,你的讀取或寫入順序都可以按照正常方式進行,這樣效率更高。你可以在用必要的時候顯示調用Volatile類的方法,這樣程序的性能更好。

  互鎖構造 interlocked

  volatile的read方法執行一次原子性的讀取操作,write方法執行一次原子性的寫入操作。本節我們討論靜態system.threading.interlocked類提供的方法。interlocked類中的每個方法都執行一次院子讀取以及寫入操作。此外,interlocked的所有方法都建立了完整的內存柵欄(memory fence)。換言之,調用某個interlocked方法之前的任何變量寫入都在這個interlocked方法調用之前執行;而這個調用之後的任何變量讀取都在這個調用之後讀取。

技術分享圖片

interlocked方法的運行速度相當快,而且能做不少事情。下面我們有一個簡單的例子,使用interlocked方法異步查詢幾個web服務器,並同時處理返回數據。代碼很短,而且不阻塞任何線程,而且使用線程池來實現自動伸縮。

internal enum CoordinationStatus
{
    AllDone,
    Timeout,
    Cancel
}
internal sealed class MultiWebRequests
{
    //這個輔助類用於協調所有異步操作
    private AsyncCoordinator m_ac = new AsyncCoordinator();

    //這是想要查詢的web服務器及其響應(異常或int32)的集合
    //註意:多個線程訪問該字典不需要以同步方式進行
    //因為構造後鍵就是只讀的
    private Dictionary<String, Object> m_servers = new Dictionary<string, object>
    {
        {"https://www.baidu.com/" ,null},
        {"https://www.microsoft.com/zh-cn/",null},
        {"https://www.taobao.com/",null}
    };
    public MultiWebRequests(Int32 timeout=Timeout.Infinite)
    {
        var httpClient = new HttpClient();
        foreach (var server in m_servers.Keys)
        {
            m_ac.AboutToBegin(1);
            httpClient.GetByteArrayAsync(server).ContinueWith(task => ComputeResult(server, task));//task是Task<byte[]>類型  
        }

        //告訴AsyncCoordinator所有操作都已發起,並在所有操作完成
        //調用cancel或者發生超時的時候調用AllDone
        m_ac.AllBegun(AllDone, timeout);
    }
    //將結果保存到集合中,然後將完成狀態進行通知
    private void ComputeResult(string server, Task<Byte[]> task)
    {
        object result;
        if (task.Exception!=null)
        {
            result = task.Exception.InnerException;
        }
        else
        {
            //線程池線程處理I/O完成
            //在此添加自己的計算密集型算法。。。。
            result = task.Result.Length;
        }
        //保存結果(exception/sum),指出一個操作完成
        m_servers[server] = result;
        m_ac.JustEnded();
    }

    //調用這個方法指出結果已無關緊要
    public void Cancel()
    {
        m_ac.Cancel();
    }

    //所有web服務器都響應、調用了cancel或者發生超時,就調用該方法,顯示執行結果
    private void AllDone(CoordinationStatus status)
    {
        switch (status)
        {
            case CoordinationStatus.Cancel:
                Console.WriteLine("operation canceled");
                break;
            case CoordinationStatus.Timeout:
                Console.WriteLine("operation time-out");
                break;
            case CoordinationStatus.AllDone:
                Console.WriteLine("operation completed;results below;");
                foreach (var server in m_servers)
                {
                    Console.WriteLine("{0}",server.Key);
                    object result = server.Value;
                    if (result is Exception)
                    {
                        Console.WriteLine("failed due to {0}",result.GetType().Name);
                    }
                    else
                    {
                        Console.WriteLine("returned {0} bytes",result);
                    }
                }
                break;
        }
    }
}

可以看出,上述代碼並沒有直接使用interlocked的任何方法,因為我將所有協調代碼都放到可重用的AsyncCoordinator類中。如下

internal sealed class AsyncCoordinator
{
    //AllBegun內部調用justended來遞減它
    private Int32 m_opCount = 1;
    //0 = false  1= true
    private Int32 m_statusReported = 0;
    private Action<CoordinationStatus> m_callback;
    private Timer m_timer;
    //該方法必須在發起一個操作之前調用
    public void AboutToBegin(Int32 opsToAdd=1)
    {
        //返回的是計算之後的m_opCount的值
        Interlocked.Add(ref m_opCount, opsToAdd);
    }

    //該方法必須在處理好一個操作的結果之後調用
    public void JustEnded()
    {
        if (Interlocked.Decrement(ref m_opCount)==0)    //返回的是計算之後的m_opCount的值
        {
            ReportStatus(CoordinationStatus.AllDone);
        }
    }

    //該方法必須在發起所有操作之後調用
    public void AllBegun(Action<CoordinationStatus> callback,Int32 timeout=Timeout.Infinite)
    {
        m_callback = callback;
        if (timeout!=Timeout.Infinite)
        {
            m_timer = new Timer(TimeExpired, null, timeout, Timeout.Infinite);
        }
    //相當於多減了一次,對沖初始化把m_opCount設置為1的多出來的1
        JustEnded();
    }

    private void TimeExpired(object o)
    {
        ReportStatus(CoordinationStatus.Timeout);
    }

    public void Cancel()
    {
        ReportStatus(CoordinationStatus.Cancel);
    }

    private void ReportStatus(CoordinationStatus status)
    {
        //這個用來判斷狀態是否是從未報告過;只有第一次調用這個方法的狀態才會被記錄
        if (Interlocked.Exchange(ref m_statusReported,1)==0)//這個將m_statusReported的值變為1,並返回m_statusReported原有的值
        {
            m_callback(status);
        }
    }
}

執行結果

技術分享圖片

  構造一個MultiWebRequests時,會先初始化一個AsyncCoordinator和包含了一組服務器uri的字典。然後,它以異步方式一個接一個地發出所有web請求。為此,他首先調用AsyncCoordinator的AboutToBegin方法,想他傳遞要發出的請求數量(這裏也可以一次把所有要執行請求的數量發給AboutToBegin)。然後,他調用httpClient.GetByteArrayAsync(server)初始化請求,這回返回一個task,ContinueWith執行computeResult方法,它可以並發處理結果。所有請求都發出後,將調用AsyncCoordinator的AllBegun方法,向他傳遞要在所有操作完成後執行的方法(AllDone)以及一個超時值。每收到一個響應,線程池都會調用computeResult進行後續處理任務,computeResult保存請求結果之後最後會調用JustEnded,使AsyncCoordinator知道一個對象已經執行完成。

JustEnded方法判斷出所有任務都已經執行完成後,會調用回調(AllDone)處理來自所有web服務器的結果。執行AllDone方法的線程就是獲取最後一個web服務器響應的那個線程池線程。但如果發生超時或者調用cancel方法的那個線程,調用AllDone的線程就是向asyncCoordinator通知超時的那個線程池線程,或者調用cancel方法的那個線程。

註意,這裏存在競態條件,因為以下事情可能恰好同時發生:所有web服務器請求完成、調用Allbegun、發生超時以及調用cancel。這時,AsyncCoordinator會選擇1個贏家和3個輸家,確保alldone不被多次調用。贏家是通過傳給AllDone的status實參來識別的。

AsyncCoordinator類封裝了所有線程協調邏輯。他用interlocked提供的方法來操作一切,確保代碼以極快速度運行,同時並沒有線程會被阻塞。

技術分享圖片

AsyncCoordinator類最重要的字段就是m_opCount,用於跟蹤仍在進行的一步操作的數量。每個異步操作開始前都會調用AboutToBegin。該方法調用interlocked.Add,以院子方式將傳給它的數字加到m_opCount字段上,m_opCount上的運算必須以原子方式進行。處理好web服務器的響應之後會調用justEnded,該方法調用interlocked.Decerment,以院子方式從m_opCount上減1.當opCount等於0時,由這個線程調用ReportStatus。

註意:m_opCount字段初始化為1(而非0),這一點很重要。執行構造器方法的線程在發出web服務器請求期間,由於m_opCount字段位1,所以能保證AllDone不會被調用。構造器調用AllBegun之前,m_opCount永遠不可能變為0。構造器調用allBegun時,會執行一次justEnded方法來遞減m_opCount,所以事實上撤掉了把它初始化為1的效果。

  實現簡單的自旋鎖

Interlocked的方法很好用,但是主要用於操作Int值。如果需要原子性地操作類對象中的一組字段,又該怎麽辦呢?在這種情況,需要采取一個辦法阻止所有線程,只允許其中一個進入對字段進行操作的。可以使用Interlocked的方法構造一個線程同步塊。

internal struct SimpleSpinLock
{
    private Int32 m_ResourceInUse;// 0=false(默認)  1 =true
    public void Enter()
    {
        while (true)
        {
            //總是將資源設為“正在使用”(1)
            //只有從“未使用”編程“正在使用”才會返回
            if (Interlocked.Exchange(ref m_ResourceInUse,1)==0)
            {
                return;
            }
            //在這裏添加“黑科技”
        }
    }
    public void Leave()
    {
        //將資源標記為“未使用”
        Volatile.Write(ref m_ResourceInUse, 0);
    }
}

下面這個類展示了如何使用SimpleSpinLock

public sealed class SomeResource
{
    private SimpleSpinLock m_sl = new SimpleSpinLock();
    public void AccessResource()
    {
        m_sl.Enter();
        //一次只有一個線程才能進入這裏訪問資源
        m_sl.Leave();
    }
}

這個鎖很簡單,他的最大問題是會造成線程“自旋”,自旋會浪費cpu時間。

  SpinLock是.net已經實現的自旋鎖,他和我們前面舉例的SimpleSpinLock類似,只是使用了spinwait結構來增強性能(SpinWait在自旋中加入sleep方法,使他在一段時間內不占用cpu時間),還增加了超時支持。

這篇我們暫時介紹以上概念,下篇文字我們一起了解內核模式。

異步編程(二)用戶模式線程同步