1. 程式人生 > >C#使用互斥量(Mutex)實現多進程並發操作時多進程間線程同步操作(進程同步)

C#使用互斥量(Mutex)實現多進程並發操作時多進程間線程同步操作(進程同步)

互斥 空字符 示例 logfile format ror var mutex類 www

本文主要是實現操作系統級別的多進程間線程同步(進程同步)的示例代碼及測試結果。代碼經過測試,可供參考,也可直接使用。

承接上一篇博客的業務場景[C#使用讀寫鎖三行代碼簡單解決多線程並發寫入文件時線程同步的問題]。

隨著服務進程的增多,光憑進程內的線程同步已經不能滿足現在的需求,導致多進程同時寫入同一個文件時,一樣提示文件被占用的問題。

在這種場景下,跨進程級的鎖是不可避免的。在.NET提供的參考中,進程鎖都繼承了System.Threading.WaitHandle類

而在本文中針對單個文件同一時間僅允許單個進程(線程)操作的場景,System.Threading.Mutex類無疑是最簡單也是最合適的選擇

該類型的對象可以使用命名(字符串)互斥量實現當前會話級或操作系統級的同步需求。我選擇了操作系統級別的同步編寫示例,因為覆蓋面更廣。

下面是實現代碼,註釋很詳細就不細說了:

namespace WaitHandleExample
{
    class Program
    {
        static void Main(string[] args)
        {
            #region 簡單使用
            //var mutexKey = MutexExample.GetFilePathMutexKey("文件路徑");
            
//MutexExample.MutexExec(mutexKey, () => //{ // Console.WriteLine("需要進程同步執行的代碼"); //}); #endregion #region 測試代碼 var filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "test.log").ToUpper(); var mutexKey = MutexExample.GetFilePathMutexKey(filePath);
//同時開啟N個寫入線程 Parallel.For(0, LogCount, e => { //沒使用互斥鎖操作寫入,大量寫入錯誤;FileStream包含FileShare的構造函數也僅實現了進程內的線程同步,多進程同時寫入時也會出錯 //WriteLog(filePath); //使用互斥鎖操作寫入,由於同一時間僅有一個線程操作,所以不會出錯 MutexExample.MutexExec(mutexKey, () => { WriteLog(filePath); }); }); Console.WriteLine(string.Format("Log Count:{0}.\t\tWrited Count:{1}.\tFailed Count:{2}.", LogCount.ToString(), WritedCount.ToString(), FailedCount.ToString())); Console.Read(); #endregion } /// <summary> /// C#互斥量使用示例代碼 /// </summary> /// <remarks>已在經過測試並上線運行,可直接使用</remarks> public static class MutexExample { /// <summary> /// 進程間同步執行的簡單例子 /// </summary> /// <param name="action">同步處理代碼</param> /// <param name="mutexKey">操作系統級的同步鍵 /// (如果將 name 指定為 null 或空字符串,則創建一個局部互斥體。 /// 如果名稱以前綴“Global\”開頭,則 mutex 在所有終端服務器會話中均為可見。 /// 如果名稱以前綴“Local\”開頭,則 mutex 僅在創建它的終端服務器會話中可見。 /// 如果創建已命名 mutex 時不指定前綴,則它將采用前綴“Local\”。)</param> /// <remarks>不重試且不考慮異常情況處理的簡單例子</remarks> [Obsolete(error: false, message: "請使用MutexExec")] public static void MutexExecEasy(string mutexKey, Action action) { //聲明一個已命名的互斥體,實現進程間同步;該命名互斥體不存在則自動創建,已存在則直接獲取 using (Mutex mut = new Mutex(false, mutexKey)) { try { //上鎖,其他線程需等待釋放鎖之後才能執行處理;若其他線程已經上鎖或優先上鎖,則先等待其他線程執行完畢 mut.WaitOne(); //執行處理代碼(在調用WaitHandle.WaitOne至WaitHandle.ReleaseMutex的時間段裏,只有一個線程處理,其他線程都得等待釋放鎖後才能執行該代碼段) action(); } finally { //釋放鎖,讓其他進程(或線程)得以繼續執行 mut.ReleaseMutex(); } } } /// <summary> /// 獲取文件名對應的進程同步鍵 /// </summary> /// <param name="filePath">文件路徑(請註意大小寫及空格)</param> /// <returns>進程同步鍵(互斥體名稱)</returns> public static string GetFilePathMutexKey(string filePath) { //生成文件對應的同步鍵,可自定義格式(互斥體名稱對特殊字符支持不友好,遂轉換為BASE64格式字符串) var fileKey = Convert.ToBase64String(Encoding.Default.GetBytes(string.Format(@"FILE\{0}", filePath))); //轉換為操作系統級的同步鍵 var mutexKey = string.Format(@"Global\{0}", fileKey); return mutexKey; } /// <summary> /// 進程間同步執行 /// </summary> /// <param name="mutexKey">操作系統級的同步鍵 /// (如果將 name 指定為 null 或空字符串,則創建一個局部互斥體。 /// 如果名稱以前綴“Global\”開頭,則 mutex 在所有終端服務器會話中均為可見。 /// 如果名稱以前綴“Local\”開頭,則 mutex 僅在創建它的終端服務器會話中可見。 /// 如果創建已命名 mutex 時不指定前綴,則它將采用前綴“Local\”。)</param> /// <param name="action">同步處理操作</param> public static void MutexExec(string mutexKey, Action action) { MutexExec(mutexKey: mutexKey, action: action, recursive: false); } /// <summary> /// 進程間同步執行 /// </summary> /// <param name="mutexKey">操作系統級的同步鍵 /// (如果將 name 指定為 null 或空字符串,則創建一個局部互斥體。 /// 如果名稱以前綴“Global\”開頭,則 mutex 在所有終端服務器會話中均為可見。 /// 如果名稱以前綴“Local\”開頭,則 mutex 僅在創建它的終端服務器會話中可見。 /// 如果創建已命名 mutex 時不指定前綴,則它將采用前綴“Local\”。)</param> /// <param name="action">同步處理操作</param> /// <param name="recursive">指示當前調用是否為遞歸處理,遞歸處理時檢測到異常則拋出異常,避免進入無限遞歸</param> private static void MutexExec(string mutexKey, Action action, bool recursive) { //聲明一個已命名的互斥體,實現進程間同步;該命名互斥體不存在則自動創建,已存在則直接獲取 //initiallyOwned: false:默認當前線程並不擁有已存在互斥體的所屬權,即默認本線程並非為首次創建該命名互斥體的線程 //註意:並發聲明同名的命名互斥體時,若間隔時間過短,則可能同時聲明了多個名稱相同的互斥體,並且同名的多個互斥體之間並不同步,高並發用戶請另行處理 using (Mutex mut = new Mutex(initiallyOwned: false, name: mutexKey)) { try { //上鎖,其他線程需等待釋放鎖之後才能執行處理;若其他線程已經上鎖或優先上鎖,則先等待其他線程執行完畢 mut.WaitOne(); //執行處理代碼(在調用WaitHandle.WaitOne至WaitHandle.ReleaseMutex的時間段裏,只有一個線程處理,其他線程都得等待釋放鎖後才能執行該代碼段) action(); } //當其他進程已上鎖且沒有正常釋放互斥鎖時(譬如進程忽然關閉或退出),則會拋出AbandonedMutexException異常 catch (AbandonedMutexException ex) { //避免進入無限遞歸 if (recursive) throw ex; //非遞歸調用,由其他進程拋出互斥鎖解鎖異常時,重試執行 MutexExec(mutexKey: mutexKey, action: action, recursive: true); } finally { //釋放鎖,讓其他進程(或線程)得以繼續執行 mut.ReleaseMutex(); } } } } #region 測試寫文件的代碼 static int LogCount = 500; static int WritedCount = 0; static int FailedCount = 0; static void WriteLog(string logFilePath) { try { var now = DateTime.Now; var logContent = string.Format("Tid: {0}{1} {2}.{3}\r\n", Thread.CurrentThread.ManagedThreadId.ToString().PadRight(4), now.ToLongDateString(), now.ToLongTimeString(), now.Millisecond.ToString()); File.AppendAllText(logFilePath, logContent); WritedCount++; } catch (Exception ex) { Console.WriteLine(ex.Message); FailedCount++; } } #endregion } }

測試不使用進程同步,多進程多線程同時寫入文件:

技術分享

技術分享

測試結果:6個進程同時進行3000次寫入請求,僅成功寫入277次

測試使用互斥量進行進程同步,多進程多線程同時寫入文件:

技術分享

技術分享

測試結果:6個進程同時進行3000次寫入請求,全部成功寫入

補充:

進程同步的資源消耗及效率比線程同步要差得多,請根據實際場景合理使用。

本文雖然是用寫入文件作為示例,但進程同步的代碼使用場景與文件操作無關。

Semaphore類(信號燈)雖然可以限制同時操作的線程數,甚至把最大同時操作數設置為1時,行為與Mutex類(互斥量)類似;但是由於信號燈在其他進程中出現異常退出時並不能接收到異常通知,只能通過等待超時觸發異常,並不適合現在的場景,所以並沒講述。

關於進程同步的其他深入了解及應用,請參閱其他資料。

C#使用互斥量(Mutex)實現多進程並發操作時多進程間線程同步操作(進程同步)