hbase原始碼系列(十二)Get、Scan在服務端是如何處理?
繼上一篇講了Put和Delete之後,這一篇我們講Get和Scan, 因為我發現這兩個操作幾乎是一樣的過程,就像之前的Put和Delete一樣,上一篇我本來只打算寫Put的,結果發現Delete也可以走這個過程,所以就一起寫了。
Get
我們開啟HRegionServer找到get方法。Get的方法處理分兩種,設定了ClosestRowBefore和沒有設定的,一般來講,我們都是知道了明確的rowkey,不太會設定這個引數,它預設是false的。
if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { byte[] row = get.getRow().toByteArray(); byte[] family = get.getColumn(0).getFamily().toByteArray(); r = region.getClosestRowBefore(row, family); } else { Get clientGet = ProtobufUtil.toGet(get); if (existence == null) { r = region.get(clientGet); } }
所以我們走的是HRegion的get方法,殺過去。
public Result get(final Get get) throws IOException {
checkRow(get.getRow(), "Get");
// 檢查列族,以下省略程式碼一百字
List<Cell> results = get(get, true);
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null);
}
先檢查get的row是否在這個region裡面,然後檢查列族,如果沒有的話,它會根據表定義給補全的,然後它轉身又進入了另外一個get方法,真是狠心啊!
List<Cell> results = new ArrayList<Cell>();
Scan scan = new Scan(get);
RegionScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
從上面可以看得出來,為什麼我要把get和Scanner一起講了吧,因為get也是一種特殊的Scan的方法,它只尋找一個row的資料。
Scan
下面開始講Scan,在《HTable探祕》裡面有個細節不知道注意到沒,在查詢之前,它要先OpenScanner獲得要給ScannerId,這個OpenScanner其實也呼叫了scan方法,但是它過去不是幹活的,而是先過去註冊一個Scanner,訂個租約,然後再把這個返回的ScannerId再次傳送一個scan請求,這次才開始呼叫開始掃描。
掃描的時候,走的是這一段
if (!done) {
long maxResultSize = scanner.getMaxResultSize();
if (maxResultSize <= 0) {
maxResultSize = maxScannerResultSize;
}
List<Cell> values = new ArrayList<Cell>();
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
region.startRegionOperation(Operation.SCAN);
try {
int i = 0;
synchronized(scanner) {
for (; i < rows && currentScanResultSize < maxResultSize; i++) {
// 它用的是這個nextRaw方法
boolean moreRows = scanner.nextRaw(values);
if (!values.isEmpty()) {
results.add(Result.create(values));
}
if (!moreRows) {
break;
}
values.clear();
}
}
} finally {
region.closeRegionOperation();
}
}
// 沒找到設定moreResults為false,找到了把結果新增到builder裡面去
if (scanner.isFilterDone() && results.isEmpty()) {
moreResults = false;
results = null;
} else {
addResults(builder, results, controller);
}
}
}
這裡面有controller和result,這塊的話,我求證了一下RpcServer那塊,如果Rpc傳輸的時候使用了codec來壓縮的話,就用controller返回結果,否則用response返回。 這塊就不管了不是重點,下面我們看一下RegionScanner。
RegionScanner詳解與程式碼拆分
我們衝過去看RegionScannerImpl吧,它在HRegion裡面,我們直接去看nextRaw方法就可以了,get方法的那個next方法也是呼叫了nextRaw方法。
if (outResults.isEmpty()) {
// 把結果存到outResults當中
returnResult = nextInternal(outResults, limit);
} else {
List<Cell> tmpList = new ArrayList<Cell>();
returnResult = nextInternal(tmpList, limit);
outResults.addAll(tmpList);
}
去nextInternal方法吧,這方法真大,尼瑪,我要歇菜了,我們進入下一個階段吧。
/** 把查詢出來的結果儲存到results當中 */
private boolean nextInternal(List<Cell> results, int limit)
throws IOException {
while (true) {
//從storeHeap裡面取出一個來
KeyValue current = this.storeHeap.peek();
byte[] currentRow = null;
int offset = 0;
short length = 0;
if (current != null) {
currentRow = current.getBuffer();
offset = current.getRowOffset();
length = current.getRowLength();
}
//檢查一下到這個row是否應該停止了
boolean stopRow = isStopRow(currentRow, offset, length);
if (joinedContinuationRow == null) {
// 如果要停止了,就用filter的filterRowCells過濾一下results.
if (stopRow) {
if (filter != null && filter.hasFilterRow()) {
//使用filter過濾掉一些cells
filter.filterRowCells(results);
}
return false;
}
// 如果有filter的話,過濾通過
if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
continue;
}
//把結果儲存到results當中
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,
length);
// Ok, we are good, let's try to get some results from the main heap.
// 在populateResult找到了足夠limit數量的
if (nextKv == KV_LIMIT) {
if (this.filter != null && filter.hasFilterRow()) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");
}
return true; // We hit the limit.
}
stopRow = nextKv == null ||
isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());
// save that the row was empty before filters applied to it.
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List). 過濾一下剛才找出來的
if (filter != null && filter.hasFilterRow()) {
filter.filterRowCells(results);
}
//如果result的空的,啥也沒找到,這是。。。悲劇啊
if (isEmptyRow) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
results.clear();
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!stopRow) continue;
return false;
}
// Ok, we are done with storeHeap for this row.
// Now we may need to fetch additional, non-essential data into row.
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
KeyValue nextJoinedKv = joinedHeap.peek();
// If joinedHeap is pointing to some other row, try to seek to a correct one.
boolean mayHaveData =
(nextJoinedKv != null && nextJoinedKv.matchingRow(currentRow, offset, length))
|| (this.joinedHeap.requestSeek(KeyValue.createFirstOnRow(currentRow, offset, length),
true, true)
&& joinedHeap.peek() != null
&& joinedHeap.peek().matchingRow(currentRow, offset, length));
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, limit);
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, limit);
}
// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
return true;
}
// Finally, we are done with both joinedHeap and storeHeap.
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
boolean moreRows = nextRow(currentRow, offset, length);
if (!moreRows) return false;
if (!stopRow) continue;
}
// We are done. Return the result.
return !stopRow;
}
}
上面那段程式碼真的很長很臭,尼瑪。。被我摺疊起來了,有興趣的看一眼就行,我們先分解開來看吧,這裡面有兩個Heap,一個是storeHeap,一個是JoinedHeap,他們啥時候用呢?看一下它的構造方法吧
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
//遍歷列族和列的對映關係,設定store相關的內容
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
}
}
如果joinedScanners不空的話,就new一個joinedHeap出來,但是我們看看它的成立條件,有點兒難吧。
1、filter不為null
2、scan設定了doLoadColumnFamiliesOnDemand為true
3、設定了的filter的isFamilyEssential方法返回false,這個估計得自己寫一個,因為我剛才去看了幾個filter的這個方法預設都是用的FilterBase的方法返回false。
好的,到這裡我們有可以把上面那段程式碼砍掉很大一部分了,它的成立條件比較困難,所以很難出現了,那我們就挑重點的storeHeap來講吧,我們先看著這三行。
Store store = stores.get(entry.getKey());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
this.storeHeap = new KeyValueHeap(scanners, comparator);
通過列族獲得相應的Store,然後通過getScanner返回scanner加到KeyValueHeap當中,我們應該去刺探一下HStore的getScanner方法,它new了一個StoreScanner返回,繼續看StoreScanner。
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns) throws IOException {
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);
// 返回MemStore、所有StoreFile的Scanner.
List<KeyValueScanner> scanners = getScannersNoCompaction();
//explicitColumnQuery:是否過濾列族 lazySeekEnabledGlobally預設是true 如果檔案數量超過1個,isParallelSeekEnabled就是true
if (explicitColumnQuery && lazySeekEnabledGlobally) {
for (KeyValueScanner scanner : scanners) {
scanner.requestSeek(matcher.getStartKey(), false, true);
}
} else {
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
} else {
//一般走這裡,並行查
parallelSeek(scanners, matcher.getStartKey());
}
}
// 一個堆裡面包括了兩個scanner,MemStore、StoreFile的Scanner
heap = new KeyValueHeap(scanners, store.getComparator());
this.store.addChangedReaderObserver(this);
}
對上面的程式碼,我們再慢慢來分解。
1、先new了一個ScanQueryMatcher,它是一個用來過濾的類,傳引數的時候,需要傳遞scan和oldestUnexpiredTS進去,oldestUnexpiredTS是個引數,是(當前時間-列族的生存週期),小於這個時間戳的kv視為已經過期了,在它初始化的時候,我們注意一下它的startKey和stopRow,這個startKey要注意,它可不是我們設定的那個startRow,而是用這個startRow來new了一個DeleteFamily型別的KeyValue。
this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow())
2、接著我們看getScannersNoCompaction這個方法,它這裡是返回了兩個Scanner,MemStoreScanner和所有StoreFile的Scanner,在從StoreHeap中peak出來一個kv的時候,是從他們當中交替取出kv來的,StoreHeap從它的名字上面來看像是用了堆排序的演算法,它的peek方法和next方法真有點兒複雜,下一章講MemStore的時候再講吧。
//獲取所有的storefile,預設的實現沒有用上startRow和stopRow
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners();
預設的getStoreFileManager的getFilesForScanOrGet是返回了所有的StoreFile的Scanner,而不是通過startRow和stopRow做過濾,它的註釋裡面給出的解釋,裡面的files預設是按照seq id來排序的,而不是startKey,需要優化的可以從這裡下手。
3、然後就開始先seek一下,而不是全表掃啊!
//過濾列族的情況
scanner.requestSeek(matcher.getStartKey(), false, true);
//一般走這裡,並行查
parallelSeek(scanners, matcher.getStartKey());
scanner.requestSeek不是所有情況都要seek,是查詢Delete的時候,如果查詢的kv的時間戳比檔案的最大時間戳小,就seek到上次未查詢到的kv;它這裡可能會用上DeleteFamily刪除真個family這種情況。
parallelSeek就是開多執行緒去呼叫Scanner的seek方法, MemStore的seek很簡單,因為它的kv集合是一個排序好的集合,HFile的seek比較複雜,下面我用一個圖來表達吧。
在搜尋HFile的時候,key先從一級索引找,通過它定位到細的二級索引,然後再定位到具體的block上面,到了HFileBlock之後,就不是seek了,就是遍歷,遍歷沒什麼好說的,不熟悉的朋友建議先回去看看《StoreFile儲存格式》。注意哦,這個key就是我們的startKey哦,所以大家知道為什麼要在scan的時候要設定StartKey了嗎?
nextInternal的流程
通過前面的分析,我們可以把nextInternal分解與拆分、抹去一些不必要的程式碼,我發現程式碼還是很難懂,所以我畫了一個過程圖出來代替那段程式碼。
特別注意事項:
1、這個圖是被我處理過的簡化之後的圖,還有在放棄該row的kv們 之後並非都要進行是StopRow的判斷,只是為了合併這個流程,我加上去的isStopRow的判斷,但並不影響整個流程。
2、!isStopRow代表返回程式碼的(!isStopRow)的意思, 根據isStopRow的當前值來返回true或者false
3、true意味著退出,並且還有結果,false意味著退出,沒有結果
誒,看到這裡,還是沒看到它是怎麼用ScanQueryMatcher去過濾被刪除的kv們啊,好,接下來我們重點考察這個問題。
ScanQueryMatcher如何過濾已經被刪除的KeyValue
這個過程遮蔽在了filterRow之後通過的把該row的kv接到結果集的這一步裡面去了。它在裡面不停的呼叫KeyValueHeap的next方法,match的呼叫正好在這個方法。我們現在就去追蹤這遺失的部分。
我們直接去看它的match方法就好了,別的不用看了,它處理的情況好多好多,尼瑪,這是要死人的節奏啊。
ScanQueryMatcher是用來處理一行資料之間的版本問題的,在每遇到一個新的row的時候,它都會先被設定matcher.setRow(row, offset, length)。
if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
matcher.rowOffset, matcher.rowLength)) {
this.countPerRow = 0;
matcher.setRow(row, offset, length);
}
上面這段程式碼在StoreScanner的next方法裡面,每當一行結束之後,都會呼叫這個方法。
在講match方法之前,我先講一下rowkey的排序規則,rowkey 正序->family 正序->qualifier 正序->ts 降序->type 降序,那麼對於同一個行、列族、列的資料,時間越近的排在前面,型別越大的排在前面,比如Delete就在Put前面,下面是它的型別表。
//search用
Minimum((byte)0),
Put((byte)4),
Delete((byte)8),
DeleteFamilyVersion((byte)10),
DeleteColumn((byte)12),
DeleteFamily((byte)14),
//search用
Maximum((byte)255);
為什麼這裡先KeyValue的排序規則呢,這當然有關係了,這關係著掃描的時候,誰先誰後的問題,如果時間戳小的在前面,下面這個過濾就不生效了。
下面我們看看它的match方法的檢查規則。
1、和當前行比較
//和當前的行進行比較,只有相等才繼續,大於當前的行就要跳到下一行,小於說明有問題,停止
int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength);
if (ret <= -1) {
return MatchCode.DONE;
} else if (ret >= 1) {
return MatchCode.SEEK_NEXT_ROW;
}
2、檢查是否所有列都查過了
//所有的列都掃描過來
if (this.columns.done()) {
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
}
3、檢查列的時間戳是否過期
long timestamp = kv.getTimestamp();
// 檢查列的時間是否過期
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
4a、如果是Delete的型別,加到ScanDeleteTraker。
if (kv.isDelete()) {
this.deletes.add(bytes, offset, qualLength, timestamp, type);
}
4b、如果不是,如果ScanDeleteTraker裡面有Delete,就要讓它經歷ScanDeleteTraker的檢驗了(進宮前先驗一下身)
DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
timestamp);
switch (deleteResult) {
case FAMILY_DELETED:
case COLUMN_DELETED:
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
case VERSION_DELETED:
case FAMILY_VERSION_DELETED:
return MatchCode.SKIP;
case NOT_DELETED:
break;
default:
throw new RuntimeException("UNEXPECTED");
}
這裡就要說一下剛才那幾個Delete的了:
1)DeleteFamily是最凶狠的,生命週期也長,整個列族全刪,基本上會一直存在
2)DeleteColum只刪掉一個列,出現這個列的都會被幹掉
3)DeleteFamilyVersion沒遇到過
4)Delete最差勁兒了,只能刪除指定時間戳的,時間戳一定要對哦,否則一旦發現不對的,這個Delete就失效了,可以說,生命週期只有一次,下面是原始碼。
public DeleteResult isDeleted(byte [] buffer, int qualifierOffset,
int qualifierLength, long timestamp) {
//時間戳小於刪除列族的時間戳,說明這個列族被刪掉是後來的事情
if (hasFamilyStamp && timestamp <= familyStamp) {
return DeleteResult.FAMILY_DELETED;
}
//檢查時間戳
if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
return DeleteResult.FAMILY_VERSION_DELETED;
}
if (deleteBuffer != null) {
int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength,
buffer, qualifierOffset, qualifierLength);
if (ret == 0) {
if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
return DeleteResult.COLUMN_DELETED;
}
// 坑爹的Delete它只刪除相同時間戳的,遇到不想的它就pass了
if (timestamp == deleteTimestamp) {
return DeleteResult.VERSION_DELETED;
}
//時間戳不對,這個Delete失效了
deleteBuffer = null;
} else if(ret < 0){
// row比當前的大,這個Delete也失效了
deleteBuffer = null;
} else {
throw new IllegalStateException(...);
}
}
return DeleteResult.NOT_DELETED;
上一章說過,Delete new出來之後什麼都不設定,就是DeleteFamily級別的選手,所以在它之後的會全部被幹掉,所以你們懂的,我們也會用DeleteColum來刪除某一列資料,只要時間戳在它之前的kv就會被幹掉,刪某個指定版本的少,因為你得知道具體的時間戳,否則你刪不了。
例子詳解DeleteFamily
假設我們有這些資料
KeyValue [] kvs1 = new KeyValue[] {
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.DeleteFamily, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now-500, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now+500, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R1", "cf", "a", now, KeyValue.Type.Put, "dont-care"),
KeyValueTestUtil.create("R2", "cf", "z", now, KeyValue.Type.Put, "dont-care")
};
Scan的引數是這些。
Scan scanSpec = new Scan(Bytes.toBytes("R1"));
scanSpec.setMaxVersions(3);
scanSpec.setBatch(10);
StoreScanner scan = new StoreScanner(scanSpec, scanInfo, scanType, getCols("a","z"), scanners);
然後,我們先將他們排好序,是這樣的。
R1/cf:a/1400602376242(now+500)/Put/vlen=9/mvcc=0,
R1/cf:a/1400602375742(now)/DeleteFamily/vlen=9/mvcc=0,
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0,
R1/cf:a/1400602375742(now)/Put/vlen=9/mvcc=0,
R1/cf:a/1400602375242(now-500)/Put/vlen=9/mvcc=0,
R2/cf:z/1400602375742(now)/Put/vlen=9/mvcc=0
所以到最後,黃色的三行會被刪除,只剩下第一行和最後一行,但是最後一行也會被排除掉,因為它已經換行了,不是同一個行的,不在這一輪進行比較,返回MatchCode.DONE。
---->回到前面是match過程
5、檢查時間戳,即設定給Scan的時間戳,這個估計一般很少設定,時間戳過期,就返回下一個MatchCode.SEEK_NEXT_ROW。
6、檢查列是否是Scan裡面設定的需要查詢的列。
7、檢查列的版本,Scan設定的MaxVersion,超過了這個version就要趕緊閃人了哈,返回MatchCode.SEEK_NEXT_COL。
對於match的結果,有幾個常見的:
1、MatchCode.INCLUDE_AND_SEEK_NEXT_COL 包括當前這個,跳到下一列,會引發StoreScanner的reseek方法。
2、MatchCode.SKIP 忽略掉,繼續呼叫next方法。
3、MatchCode.SEEK_NEXT_ROW 不包括當前這個,繼續呼叫next方法。
4、MatchCode.SEEK_NEXT_COL 不包括它,跳過下一列,會引發StoreScanner的reseek方法。
5、MatchCode.DONE rowkey變了,要留到下次進行比較了
講到這裡基本算結束了。
關於測試
呵呵,有興趣測試的童鞋可以開啟下hbase原始碼,找到TestStoreScanner這個類自己除錯看下結果。