圖解Janusgraph系列-併發安全:鎖機制(本地鎖+分散式鎖)分析
阿新 • • 發佈:2020-12-17
# 圖解Janusgraph系列-併發安全:鎖機制(本地鎖+分散式鎖)分析
大家好,我是`洋仔`,JanusGraph圖解系列文章,`實時更新`~
#### 圖資料庫文章總目錄:
* **整理所有圖相關文章,請移步(超鏈):**[圖資料庫系列-文章總目錄 ](https://liyangyang.blog.csdn.net/article/details/111031257)
* **地址:**[https://liyangyang.blog.csdn.net/article/details/111031257](https://liyangyang.blog.csdn.net/article/details/111031257)
> **`原始碼分析相關可檢視github(碼文不易,求個star~)`**: [https://github.com/YYDreamer/janusgraph](https://github.com/YYDreamer/janusgraph)
> 下述流程高清大圖地址:[https://www.processon.com/view/link/5f471b2e7d9c086b9903b629](https://www.processon.com/view/link/5f471b2e7d9c086b9903b629)
> 版本:JanusGraph-0.5.2
**轉載文章請保留以下宣告:**
>作者:洋仔聊程式設計
>微信公眾號:匠心Java
>原文地址:[https://liyangyang.blog.csdn.net/](https://liyangyang.blog.csdn.net/)
----
在分散式系統中,難免涉及到對同一資料的併發操作,如何保證分散式系統中資料的併發安全呢?**分散式鎖!**
## 一:分散式鎖
常用的分散式鎖實現方式:
1、基於資料庫實現分散式鎖
針對於資料庫實現的分散式鎖,如mysql使用使用`for update`共同競爭一個行鎖來實現; 在JanusGraph中,也是基於資料庫實現的分散式鎖,這裡的`資料庫`指的是我們當前使用的第三方`backend storage`,具體的實現方式也和mysql有所不同,具體我們會在下文分析
2、基於Redis實現的分散式鎖
基於`lua指令碼`+`setNx`實現
3、基於zk實現的分散式鎖
基於`znode`的有序性和`臨時節點`+zk的`watcher`機制實現
4、MVCC多版本併發控制樂觀鎖實現
> 本文主要介紹Janusgraph的鎖機制,其他的實現機制就不在此做詳解了
下面我們來分析一下`JanusGraph`的`鎖機制`實現~
## 二:JanusGraph鎖機制
在JanusGraph中使用的鎖機制是:`本地鎖` + `分散式鎖`來實現的;
### 2.1 一致性行為
在`JanusGraph`中主要有三種`一致性修飾詞(Consistency Modifier)`來表示3種不同的`一致性行為`,來控制相簿使用過程中的併發問題的控制程度;
```java
public enum ConsistencyModifier {
DEFAULT,
LOCK,
FORK
}
```
原始碼中`ConsistencyModifier`列舉類主要作用:用於控制JanusGraph在`最終一致或其他非事務性後端系統`上的一致性行為!其作用分別為:
* **DEFAULT**:預設的一致性行為,不使用分散式鎖進行控制,對配置的儲存後端使用由封閉事務保證的預設一致性模型,一致性行為主要取決於儲存後端的配置以及封閉事務的(可選)配置;無需顯示配置即可使用
* **LOCK**:在儲存後端支援鎖的前提下,顯示的獲取分散式鎖以保證一致性!確切的一致性保證取決於所配置的鎖實現;需`management.setConsistency(element, ConsistencyModifier.LOCK);`語句進行配置
* **FORK**:只適用於`multi-edges`和`list-properties`兩種情況下使用;使JanusGraph修改資料時,採用先刪除後新增新的邊/屬性的方式,而不是覆蓋現有的邊/屬性,從而避免潛在的併發寫入衝突;需`management.setConsistency(element, ConsistencyModifier.FORK);`進行配置
#### LOCK
在查詢或者插入資料時,是否使用`分散式鎖`進行併發控制,在圖`shcema`的建立過程中,如上述可以通過配置`schema元素`為`ConsistencyModifier.LOCK`方式控制併發,則在使用過程中就會用`分散式鎖`進行併發控制;
為了提高效率,JanusGraph預設不使用鎖定。 因此,使用者必須為定義`一致性約束`的每個架構元素決定是否使用鎖定。
使用`JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)`顯式啟用對架構元素的鎖定
程式碼如下所示:
```java
mgmt = graph.openManagement()
name = mgmt.makePropertyKey('consistentName').dataType(String.class).make()
index = mgmt.buildIndex('byConsistentName', Vertex.class).addKey(name).unique().buildCompositeIndex()
mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
mgmt.commit()
```
#### FORK
由於邊緣作為單個記錄儲存在基礎儲存後端中,因此同時修改單個邊緣將導致衝突。
`FORK`就是為了代替`LOCK`,可以將邊緣標籤配置為使用`ConsistencyModifier.FORK`。
下面的示例建立一個新的edge label,並將其設定為`ConsistencyModifier.FORK`
```java
mgmt = graph.openManagement()
related = mgmt.makeEdgeLabel('related').make()
mgmt.setConsistency(related, ConsistencyModifier.FORK)
mgmt.commit()
```
經過上述配置後,修改標籤配置為`FORK`的edge時,操作步驟為:
1. 首先,刪除該邊
2. 將修改後的邊作為新邊新增
因此,如果兩個併發事務修改了同一邊緣,則提交時將存在邊緣的兩個修改後的副本,可以在查詢遍歷期間根據需要解決這些副本。
注意edge fork僅適用於MULTI edge。 具有多重性約束的邊緣標籤不能使用此策略,因為非MULTI的邊緣標籤定義中內建了一個唯一性約束,該約束需要顯式鎖定或使用基礎儲存後端的衝突解決機制
下面我們具體來看一下`janusgrph`的`鎖機制`的實現:
### 2.2 LoackID
在介紹鎖機制之前,先看一下鎖應該鎖什麼東西呢?
我們都知道在`janusgraph`的底層儲存中,vertexId作為Rowkey,屬性和邊儲存在cell中,由column+value組成
當我們修改`節點的屬性和邊`+`邊的屬性時`,很明顯只要鎖住對應的`Rowkey + Column`即可;
在`Janusgraph`中,這個鎖的標識的基礎部分就是`LockID`:
> LockID = RowKey + Column
原始碼如下:
```java
KeyColumn lockID = new KeyColumn(key, column);
```
### 2.3 本地鎖
`本地鎖`是在任何情況下都需要獲取的一個鎖,只有獲取成功後,才會進行下述`分散式鎖`的獲取!
`本地鎖`是基於`圖例項`維度存在的;主要作用是保證當前圖例項下的操作中無衝突!
本地鎖的實現是通過`ConcurrentHashMap`資料結構來實現的,在圖例項維度下唯一;
基於當前`事務`+`lockId`來作為`鎖標識`;
**獲取的主要流程:**
![image-20200810170411991](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124402643-1357597179.png)
**結合原始碼如下:**
上述圖建議依照原始碼一塊分析,原始碼在`LocalLockMediator`類中的下述方法,下面`原始碼分析模組`會詳細分析
```java
public boolean lock(KeyColumn kc, T requester, Instant expires) {
}
```
**引入本地鎖機制,主要目的:** 在圖例項維度來做一層鎖判斷,`減少分散式鎖的併發衝突`,減少分散式鎖帶來的效能消耗
### 2.4 分散式鎖
在`本地鎖`獲取成功之後才會去嘗試獲取`分散式鎖`;
分散式鎖的獲取整體分為兩部分流程:
1. `分散式鎖資訊插入`
2. `分散式鎖資訊狀態判斷`
#### 分散式鎖資訊插入
該部分主要是通過`lockID`來構造要插入的`Rowkey和column`並將資料插入到`hbase`中;插入成功即表示這部分處理成功!
具體流程如下:
![2](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124402884-1636920262.png)
#### 分散式鎖資訊狀態判斷
該部分在上一部分完成之後才會進行,主要是判斷分散式鎖是否獲取成功!
查詢出當前hbase中對應`Rowkey的所有column`,過濾未過期的column集合,比對集合的第一個column是否等於當前事務插入的column;
等於則獲取成功!不等於則獲取失敗!
具體流程如下:
![3](https://img2020.cnblogs.com/other/1252473/202012/1252473-20201217124403067-1403246840.png)
## 三:原始碼分析 與 整體流程
原始碼分析已經push到github:https://github.com/YYDreamer/janusgraph
**1、獲取鎖的入口**
```java
public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
// locker是一個一致性key鎖物件
if (locker != null) {
// 獲取當前事務物件
ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
// 判斷:當前的獲取鎖操作是否當前事務的操作中存在增刪改的操作
if (tx.isMutationStarted())
throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
// 使用key+column組裝為lockID,供下述加鎖使用!!!!!
KeyColumn lockID = new KeyColumn(key, column);
log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
// 獲取本地當前jvm程序中的寫鎖(看下述的 1:寫鎖獲取分析)
// (此處的獲取鎖只是將對應的KLV儲存到Hbase中!儲存成功並不代表獲取鎖成功)
// 1. 獲取成功(等同於儲存成功)則繼續執行
// 2. 獲取失敗(等同於儲存失敗),會丟擲異常,丟擲到最上層,列印錯誤日誌“Could not commit transaction ["+transactionId+"] due to exception” 並丟擲對應的異常,本次插入資料結束
locker.writeLock(lockID, tx.getConsistentTx());
// 執行前提:上述獲取鎖成功!
// 儲存期望值,此處為了實現當相同的key + value + tx多個加鎖時,只處理第一個
// 儲存在事務物件中,標識在commit判斷鎖是否獲取成功時,當前事務插入的是哪個鎖資訊
tx.storeExpectedValue(this, lockID, expectedValue);
} else {
// locker為空情況下,直接丟擲一個執行時異常,終止程式
store.acquireLock(key, column, expectedValue, unwrapTx(txh));
}
}
```
**2、執行 locker.writeLock(lockID, tx.getConsistentTx()) 觸發鎖獲取**
```java
public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {
if (null != tx.getConfiguration().getGroupName()) {
MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();
}
// 判斷當前事務是否在圖例項的維度 已經佔據了lockID的鎖
// 此處的lockState在一個事務成功獲取本地鎖+分散式鎖後,以事務為key、value為map,其中key為lockID,value為加鎖狀態(開始時間、過期時間等)
if (lockState.has(tx, lockID)) {
log.debug("Transaction {} already wrote lock on {}", tx, lockID);
return;
}
// 當前事務沒有佔據lockID對應的鎖
// 進行(lockLocally(lockID, tx) 本地加鎖鎖定操作,
if (lockLocally(lockID, tx)) {
boolean ok = false;
try {
// 在本地鎖獲取成功的前提下:
// 嘗試獲取基於Hbase實現的分散式鎖;
// 注意!!!(此處的獲取鎖只是將對應的KLV儲存到Hbase中!儲存成功並不代表獲取鎖成功)
S stat = writeSingleLock(lockID, tx);
// 獲取鎖分散式鎖成功後(即寫入成功後),更新本地鎖的過期時間為分散式鎖的過期時間
lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time
// 將上述獲取的鎖,儲存在標識當前存在鎖的集