hbase原始碼系列(十)HLog與日誌恢復
HLog概述
hbase在寫入資料之前會先寫入MemStore,成功了再寫入HLog,當MemStore的資料丟失的時候,還可以用HLog的資料來進行恢復,下面先看看HLog的圖。
舊版的HLog是實際上是一個SequceneFile,0.96的已經使用Protobuf來進行序列化了。從Writer和Reader上來看HLog的都是Entry的,換句話說就是,它的每一條記錄就是一個Entry。
class Entry implements Writable {
private WALEdit edit;
private HLogKey key;
}
所以上面那個圖已經不準確了,HLogKey沒變,但是Value缺不是KeyValue,而是WALEdit。
下面我們看看HLogKey的五要素,region、tableName、log的順序、寫入時間戳、叢集id。
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, final long now, List<UUID> clusterIds){ init(encodedRegionName, tablename, logSeqNum, now, clusterIds); } protected void init(final byte [] encodedRegionName, final TableName tablename, long logSeqNum, final long now, List<UUID> clusterIds) { this.logSeqNum = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; this.encodedRegionName = encodedRegionName; this.tablename = tablename; }
下面看看WALEdit的屬性, 這裡只列出來一個重要的,它是內部持有的一群KeyValue。。
public class WALEdit implements Writable, HeapSize {
......private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
HLog的具體實現類是FSHLog,一個Region Server有兩個FSHLog,一個負責RS上面所有的使用者region的日誌,一個負責RS上面的META表的region的日誌。
對於日誌來說,我們關心的是它如何保證一致性和準確性,在需要它的時候可以發揮救命作用。
HLog同步
對於meta region的HLog寫入之後,它會立即同步到硬碟,非meta表的region,它會先把Entry新增到一個佇列裡面等待同步。
while(!this.isInterrupted() && !closeLogSyncer.get()) {
try {
if (unflushedEntries.get() <= syncedTillHere) {
synchronized (closeLogSyncer) {
closeLogSyncer.wait(this.optionalFlushInterval);
}
}// 同步已經新增的entry
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
Threads.sleep(this.optionalFlushInterval);
}
}
它這裡是有一個判斷條件的,如果判斷條件不成立就立即同步,等待this.optionalFlushInterval時間,預設的同步間隔是1000,它是通過引數hbase.regionserver.optionallogflushinterval設定。unflushedEntries是一個AtomicLong在寫入entry的時候遞增,syncedTillHere是一個volatile long,同步完成之後也是變大,因為可能被多個執行緒呼叫同步操作,所以它是volatile的,從條件上來看,如果沒有日誌需要同步就等待一秒再進行判斷,如果有日誌需要同步,也是立馬就寫入硬碟的,如果發生錯誤,就是呼叫requestLogRoll方法,進行回滾,這個回滾比較有意思,它是跑過去flush掉MemStore中的資料,把他們寫入硬碟。
下面是回滾的方法。中間我忽略了幾步,然後找到LogRoller中的這段程式碼。
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
找出來需要flush的region,然後計劃flush。
regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
this.oldestUnflushedSeqNums);
static byte[][] findMemstoresWithEditsEqualOrOlderThan(
final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
List<byte[]> regions = null;
for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
//逐個對比,找出小於已輸出為檔案的最小的seq id的region
if (e.getValue().longValue() <= walSeqNum) {
if (regions == null) regions = new ArrayList<byte[]>();
regions.add(e.getKey());
}
}
return regions == null ? null : regions
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
逐個對比,找出來未flush MemStore的比輸出的檔案的HLog流水號還小的region,當它準備flush MemStore之前會呼叫startCacheFlush方法來把region從oldestUnflushedSeqNums這個map當中去除,新增到已經flush的map當中。
從日誌恢復
看過《HMaster啟動過程》的童鞋都知道,如果之前有region失敗的話,在啟動之前會把之前的HLog進行split,把屬於該region的為flush過的日誌提取出來,然後生成一個新的HLog到recovered.edits目錄下,中間的過程控制那塊有點兒類似於snapshot的那種,在zk裡面建立一個splitWAL節點,在這個節點下面建立任務,不一樣的是,snapshot那塊是自己處理自己的,這裡是別人的閒事它也管,處理完了之後就更新這個任務的狀態了,沒有snapshot那麼複雜的互動過程。
那啥時候會用到這個呢,在region開啟的時候,我們從HRegionServer的openRegion方法一路跟蹤,中間歷經OpenMetaHandler,再到HRegion.openHRegion方法,終於在initializeRegionStores方法裡面找到了那麼一句話。
// 如果recovered.edits有日誌的話,就恢復日誌
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
高潮來了!!!
HLog.Reader reader = null;
try {
//建立reader讀取hlog
reader = HLogFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
HLog.Entry entry;
Store store = null;
boolean reported_once = false;
try {//逐個讀取
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
//例項化firstSeqIdInLog
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// 從WALEdits裡面取出kvs
if (kv.matchingFamily(WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (compaction != null) {
//完成compaction未完成的事情,校驗輸入輸出檔案,完成檔案替換等操作
completeCompactionMarker(compaction);
}
skippedEdits++;
continue;
}
// 獲得kv對應的store
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// 應該不會發生,缺少它對應的列族
skippedEdits++;
continue;
}
// seq id小,呵呵,說明已經被處理過了這個日誌
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
// 這個就是我們要處理的日誌,新增到MemStore裡面就ok了
flush = restoreEdit(store, kv);
editsCount++;
}
//MemStore太大了,需要flush掉
if (flush) internalFlushcache(null, currentEditSeqId, status);
}
} catch (IOException ioe) {
// 就是把名字改了,然後在後面加上".時間戳",這個有毛意思?
if (ioe.getCause() instanceof ParseException) {
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
} else {// 不知道是啥錯誤,拋錯誤吧,處理不了
throw ioe;
}
}
status.markComplete(msg);
return currentEditSeqId;
} finally {
status.cleanup();
if (reader != null) {
reader.close();
}
}
呵呵,讀取recovered.edits下面的日誌,符合條件的就加到MemStore裡面去,完成之後,就把這些檔案刪掉。大家也看到了,這裡通篇講到一個logSeqNum,哪裡都有它的身影,它實際上是FSHLog當中的一個遞增的AtomicLong,每當往FSLog裡面寫入一條日誌的時候,它都會加一,然後MemStore請求flush的時候,會呼叫FSLog的startCacheFlush方法,獲取(logSeqNum+1)回來,然後寫入到StoreFile的sequenceid欄位,再次拿出來的時候,就遍歷這個HStore下面的StoreFile的logSeqNum,取出來最大的跟它比較,小於它的都已經寫過了,沒必要再寫了。
好了,HLog結束了,累死我了,要睡了。