1. 程式人生 > >圖解Janusgraph系列-併發安全:鎖機制(本地鎖+分散式鎖)分析

圖解Janusgraph系列-併發安全:鎖機制(本地鎖+分散式鎖)分析

# 圖解Janusgraph系列-併發安全:鎖機制(本地鎖+分散式鎖)分析 大家好,我是`洋仔`,JanusGraph圖解系列文章,`實時更新`~ #### 圖資料庫文章總目錄: * **整理所有圖相關文章,請移步(超鏈):**[圖資料庫系列-文章總目錄 ](https://liyangyang.blog.csdn.net/article/details/111031257) * **地址:**[https://liyangyang.blog.csdn.net/article/details/111031257](https://liyangyang.blog.csdn.net/article/details/111031257) > **`原始碼分析相關可檢視github(碼文不易,求個star~)`**: [https://github.com/YYDreamer/janusgraph](https://github.com/YYDreamer/janusgraph) > 下述流程高清大圖地址:[https://www.processon.com/view/link/5f471b2e7d9c086b9903b629](https://www.processon.com/view/link/5f471b2e7d9c086b9903b629) > 版本:JanusGraph-0.5.2 **轉載文章請保留以下宣告:** >作者:洋仔聊程式設計 >微信公眾號:匠心Java >原文地址:[https://liyangyang.blog.csdn.net/](https://liyangyang.blog.csdn.net/) ---- 在分散式系統中,難免涉及到對同一資料的併發操作,如何保證分散式系統中資料的併發安全呢?**分散式鎖!** ## 一:分散式鎖 常用的分散式鎖實現方式: 1、基於資料庫實現分散式鎖 ​ 針對於資料庫實現的分散式鎖,如mysql使用使用`for update`共同競爭一個行鎖來實現; 在JanusGraph中,也是基於資料庫實現的分散式鎖,這裡的`資料庫`指的是我們當前使用的第三方`backend storage`,具體的實現方式也和mysql有所不同,具體我們會在下文分析 2、基於Redis實現的分散式鎖 ​ 基於`lua指令碼`+`setNx`實現 3、基於zk實現的分散式鎖 ​ 基於`znode`的有序性和`臨時節點`+zk的`watcher`機制實現 4、MVCC多版本併發控制樂觀鎖實現 > 本文主要介紹Janusgraph的鎖機制,其他的實現機制就不在此做詳解了 下面我們來分析一下`JanusGraph`的`鎖機制`實現~ ## 二:JanusGraph鎖機制 在JanusGraph中使用的鎖機制是:`本地鎖` + `分散式鎖`來實現的; ### 2.1 一致性行為 在`JanusGraph`中主要有三種`一致性修飾詞(Consistency Modifier)`來表示3種不同的`一致性行為`,來控制相簿使用過程中的併發問題的控制程度; ```java public enum ConsistencyModifier { DEFAULT, LOCK, FORK } ``` 原始碼中`ConsistencyModifier`列舉類主要作用:用於控制JanusGraph在`最終一致或其他非事務性後端系統`上的一致性行為!其作用分別為: * **DEFAULT**:預設的一致性行為,不使用分散式鎖進行控制,對配置的儲存後端使用由封閉事務保證的預設一致性模型,一致性行為主要取決於儲存後端的配置以及封閉事務的(可選)配置;無需顯示配置即可使用 * **LOCK**:在儲存後端支援鎖的前提下,顯示的獲取分散式鎖以保證一致性!確切的一致性保證取決於所配置的鎖實現;需`management.setConsistency(element, ConsistencyModifier.LOCK);`語句進行配置 * **FORK**:只適用於`multi-edges`和`list-properties`兩種情況下使用;使JanusGraph修改資料時,採用先刪除後新增新的邊/屬性的方式,而不是覆蓋現有的邊/屬性,從而避免潛在的併發寫入衝突;需`management.setConsistency(element, ConsistencyModifier.FORK);`進行配置 #### LOCK 在查詢或者插入資料時,是否使用`分散式鎖`進行併發控制,在圖`shcema`的建立過程中,如上述可以通過配置`schema元素`為`ConsistencyModifier.LOCK`方式控制併發,則在使用過程中就會用`分散式鎖`進行併發控制; 為了提高效率,JanusGraph預設不使用鎖定。 因此,使用者必須為定義`一致性約束`的每個架構元素決定是否使用鎖定。 使用`JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)`顯式啟用對架構元素的鎖定 程式碼如下所示: ```java mgmt = graph.openManagement() name = mgmt.makePropertyKey('consistentName').dataType(String.class).make() index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex() mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph mgmt.commit() ``` #### FORK 由於邊緣作為單個記錄儲存在基礎儲存後端中,因此同時修改單個邊緣將導致衝突。 `FORK`就是為了代替`LOCK`,可以將邊緣標籤配置為使用`ConsistencyModifier.FORK`。 下面的示例建立一個新的edge label,並將其設定為`ConsistencyModifier.FORK` ```java mgmt = graph.openManagement() related = mgmt.makeEdgeLabel('related').make() mgmt.setConsistency(related, ConsistencyModifier.FORK) mgmt.commit() ``` 經過上述配置後,修改標籤配置為`FORK`的edge時,操作步驟為: 1. 首先,刪除該邊 2. 將修改後的邊作為新邊新增 因此,如果兩個併發事務修改了同一邊緣,則提交時將存在邊緣的兩個修改後的副本,可以在查詢遍歷期間根據需要解決這些副本。 注意edge fork僅適用於MULTI edge。 具有多重性約束的邊緣標籤不能使用此策略,因為非MULTI的邊緣標籤定義中內建了一個唯一性約束,該約束需要顯式鎖定或使用基礎儲存後端的衝突解決機制 下面我們具體來看一下`janusgrph`的`鎖機制`的實現: ### 2.2 LoackID 在介紹鎖機制之前,先看一下鎖應該鎖什麼東西呢? 我們都知道在`janusgraph`的底層儲存中,vertexId作為Rowkey,屬性和邊儲存在cell中,由column+value組成 當我們修改`節點的屬性和邊`+`邊的屬性時`,很明顯只要鎖住對應的`Rowkey + Column`即可; 在`Janusgraph`中,這個鎖的標識的基礎部分就是`LockID`: > LockID = RowKey + Column 原始碼如下: ```java KeyColumn lockID = new KeyColumn(key, column); ``` ### 2.3 本地鎖 `本地鎖`是在任何情況下都需要獲取的一個鎖,只有獲取成功後,才會進行下述`分散式鎖`的獲取! `本地鎖`是基於`圖例項`維度存在的;主要作用是保證當前圖例項下的操作中無衝突! 本地鎖的實現是通過`ConcurrentHashMap`資料結構來實現的,在圖例項維度下唯一; 基於當前`事務`+`lockId`來作為`鎖標識`; **獲取的主要流程:** ![image-20200810170411991](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124402643-1357597179.png) **結合原始碼如下:** 上述圖建議依照原始碼一塊分析,原始碼在`LocalLockMediator`類中的下述方法,下面`原始碼分析模組`會詳細分析 ```java public boolean lock(KeyColumn kc, T requester, Instant expires) { } ``` **引入本地鎖機制,主要目的:** 在圖例項維度來做一層鎖判斷,`減少分散式鎖的併發衝突`,減少分散式鎖帶來的效能消耗 ### 2.4 分散式鎖 在`本地鎖`獲取成功之後才會去嘗試獲取`分散式鎖`; 分散式鎖的獲取整體分為兩部分流程: 1. `分散式鎖資訊插入` 2. `分散式鎖資訊狀態判斷` #### 分散式鎖資訊插入 該部分主要是通過`lockID`來構造要插入的`Rowkey和column`並將資料插入到`hbase`中;插入成功即表示這部分處理成功! 具體流程如下: ![2](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124402884-1636920262.png) #### 分散式鎖資訊狀態判斷 該部分在上一部分完成之後才會進行,主要是判斷分散式鎖是否獲取成功! 查詢出當前hbase中對應`Rowkey的所有column`,過濾未過期的column集合,比對集合的第一個column是否等於當前事務插入的column; 等於則獲取成功!不等於則獲取失敗! 具體流程如下: ![3](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124403067-1403246840.png) ## 三:原始碼分析 與 整體流程 原始碼分析已經push到github:https://github.com/YYDreamer/janusgraph **1、獲取鎖的入口** ```java public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { // locker是一個一致性key鎖物件 if (locker != null) { // 獲取當前事務物件 ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh; // 判斷:當前的獲取鎖操作是否當前事務的操作中存在增刪改的操作 if (tx.isMutationStarted()) throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted"); // 使用key+column組裝為lockID,供下述加鎖使用!!!!! KeyColumn lockID = new KeyColumn(key, column); log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue); // 獲取本地當前jvm程序中的寫鎖(看下述的 1:寫鎖獲取分析) // (此處的獲取鎖只是將對應的KLV儲存到Hbase中!儲存成功並不代表獲取鎖成功) // 1. 獲取成功(等同於儲存成功)則繼續執行 // 2. 獲取失敗(等同於儲存失敗),會丟擲異常,丟擲到最上層,列印錯誤日誌“Could not commit transaction ["+transactionId+"] due to exception” 並丟擲對應的異常,本次插入資料結束 locker.writeLock(lockID, tx.getConsistentTx()); // 執行前提:上述獲取鎖成功! // 儲存期望值,此處為了實現當相同的key + value + tx多個加鎖時,只處理第一個 // 儲存在事務物件中,標識在commit判斷鎖是否獲取成功時,當前事務插入的是哪個鎖資訊 tx.storeExpectedValue(this, lockID, expectedValue); } else { // locker為空情況下,直接丟擲一個執行時異常,終止程式 store.acquireLock(key, column, expectedValue, unwrapTx(txh)); } } ``` **2、執行 locker.writeLock(lockID, tx.getConsistentTx()) 觸發鎖獲取** ```java public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException { if (null != tx.getConfiguration().getGroupName()) { MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc(); } // 判斷當前事務是否在圖例項的維度 已經佔據了lockID的鎖 // 此處的lockState在一個事務成功獲取本地鎖+分散式鎖後,以事務為key、value為map,其中key為lockID,value為加鎖狀態(開始時間、過期時間等) if (lockState.has(tx, lockID)) { log.debug("Transaction {} already wrote lock on {}", tx, lockID); return; } // 當前事務沒有佔據lockID對應的鎖 // 進行(lockLocally(lockID, tx) 本地加鎖鎖定操作, if (lockLocally(lockID, tx)) { boolean ok = false; try { // 在本地鎖獲取成功的前提下: // 嘗試獲取基於Hbase實現的分散式鎖; // 注意!!!(此處的獲取鎖只是將對應的KLV儲存到Hbase中!儲存成功並不代表獲取鎖成功) S stat = writeSingleLock(lockID, tx); // 獲取鎖分散式鎖成功後(即寫入成功後),更新本地鎖的過期時間為分散式鎖的過期時間 lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time // 將上述獲取的鎖,儲存在標識當前存在鎖的集