LevelDB的源碼閱讀(四) Compaction操作
leveldb的數據存儲采用LSM的思想,將隨機寫入變為順序寫入,記錄寫入操作日誌,一旦日誌被以追加寫的形式寫入硬盤,就返回寫入成功,由後臺線程將寫入日誌作用於原有的磁盤文件生成新的磁盤數據.Leveldb在內存中維護一個數據結構memtable,采用skiplist來實現,保存當前寫入的數據,當數據達到一定規模後變為不可寫的內存表immutable table.新的寫入操作會寫入新的memtable,而immutable table會被後臺線程寫入到數據文件.Leveldb的數據文件是按層存放的,默認配置的最高層級是7,即level0,level1,…,level7.內存中的immutable總是寫入level0,除level0之外的各個層leveli的所有數據文件的key範圍都是互相不相交的.
進行Compaction操作的條件如下:
1.產生了新的immutable table需要寫入數據文件
2.某個level的數據規模過大
3.某個文件被無效查詢的次數過多(在文件i中查詢key,沒有找到key,這次查詢稱為文件i的無效查詢)
4.手動compaction
滿足以上條件會啟動Compaction過程,接下來分析詳細的Compaction過程.
Leveldb進行Compaction的入口函數是db文件夾下db_impl.cc文件中的DBImpl::MaybeScheduleCompaction,該函數在每次leveldb進行讀寫操作時都有可能被調用.源碼內容如下:
void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions} else if (!bg_error_.ok()) { // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { // No work to be done } else { bg_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); //新建後臺任務並進行調度 } }
首先調用db文件夾下version_set.h中的NeedsCompaction()判斷是否需要啟動Compact任務.源碼內容如下:
// Returns true iff some level needs a compaction. bool NeedsCompaction() const { Version* v = current_; return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); }
version_set.cc中compaction_score_ 的計算如下:
void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(level); } if (score > best_score) { best_level = level; best_score = score; } } v->compaction_level_ = best_level; v->compaction_score_ = best_score; }
註意,這裏同時預計算了進行compaction的最佳level.
確認需要啟動compaction之後,調用util文件夾下env_posix.cc文件中的PosixEnv::Schedule函數啟動Compact過程.
void PosixEnv::Schedule(void (*function)(void*), void* arg) { PthreadCall("lock", pthread_mutex_lock(&mu_)); // Start background thread if necessary if (!started_bgthread_) { started_bgthread_ = true; PthreadCall( "create thread", pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this)); } // If the queue is currently empty, the background thread may currently be // waiting. if (queue_.empty()) { PthreadCall("signal", pthread_cond_signal(&bgsignal_)); } // Add to priority queue queue_.push_back(BGItem()); queue_.back().function = function; queue_.back().arg = arg; PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }
如果沒有後臺線程,則創建後臺線程,否則新建一個後臺執行任務BGItem壓入後臺線程任務隊列,然後調用PosixEnv::BGThreadWrapper喚醒後臺線程:
static void* BGThreadWrapper(void* arg) { reinterpret_cast<PosixEnv*>(arg)->BGThread(); return NULL; }
BGThreadWrapper調用PosixEnv::BGThread,不斷地從後臺任務隊列中拿到任務,然後執行任務
void PosixEnv::BGThread() { while (true) { // Wait until there is an item that is ready to run PthreadCall("lock", pthread_mutex_lock(&mu_)); while (queue_.empty()) { PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); } void (*function)(void*) = queue_.front().function; void* arg = queue_.front().arg; queue_.pop_front(); PthreadCall("unlock", pthread_mutex_unlock(&mu_)); (*function)(arg); } }
回到DBImpl::MaybeScheduleCompaction,方便理解起見這裏再重復一遍源碼
void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else if (!bg_error_.ok()) { // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { // No work to be done } else { bg_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); //新建後臺任務並進行調度 } }
之前分析了env_->Schedule進行的調度過程,現在來分析實際進行後臺任務的DBImpl::BGWork.DBImpl::BGWork在db文件夾下db_impl.cc文件中.
void DBImpl::BGWork(void* db) { reinterpret_cast<DBImpl*>(db)->BackgroundCall(); }
DBImpl::BGWork調用DBImpl::BackgroundCall(),合並完成後可能導致有的level的文件數過多,因此會再次調用MaybeScheduleCompaction()判斷是否需要繼續進行合並.
void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction(); } bg_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); bg_cv_.SignalAll(); }
DBImpl::BackgroundCall()調用 BackgroundCompaction(),在BackgroundCompaction()中分別完成三種不同的Compaction:對Memtable進行合並、 trivial Compaction(直接將文件移動到下一層)以及一般的合並,調用DoCompactionWork()實現.
void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { CompactMemTable();//1、對Memtable進行合並 return; } Compaction* c; bool is_manual = (manual_compaction_ != NULL);//manual_compaction默認為NULL,則is_manual默認為false InternalKey manual_end; if (is_manual) { //取得手動compaction對象 ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == NULL); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, "Manual compaction at level-%d from %s .. %s; will stop at %s\n", m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { //取得自動compaction對象 c = versions_->PickCompaction(); } Status status; if (c == NULL) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) {//2、IsTrivialMove 返回 True,trivial Compaction,則直接將文件移入 level + 1 層即可 // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { //3、一般的合並 CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); //進行compaction if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); // input的文件引用計數減少1 DeleteObsoleteFiles(); //刪除無用文件 } delete c; if (status.ok()) { // Done } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); } if (is_manual) { ManualCompaction* m = manual_compaction_; //標記手動compaction任務完成 if (!status.ok()) { m->done = true; } if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } manual_compaction_ = NULL; } }
首行mutex_.AssertHeld(),Mutex的AssertHeld函數實現默認為空,在很多函數的實現內有調用,其作用如下:
As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.
Memtable的合並
Compaction首先檢查imm_,及時將已寫滿的memtable寫入磁盤sstable文件,對Memtable的合並,調用DBImpl::CompactMemTable()完成:
void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL);//imm_不能為空 VersionEdit edit; Version* base = versions_->current(); base->Ref(); Status s = WriteLevel0Table(imm_, &edit, base);//將Memtable轉化為.sst文件,寫入level0 sst table,並寫入到edit中 base->Unref(); if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_);//應用edit中記錄的變化,來生成新的版本 } if (s.ok()) { // Commit to the new state imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); } else { RecordBackgroundError(s); } }
其中CompactMemTable()主要調用了兩個函數:WriteLevel0Table()和versions_->LogAndApply()
CompactMemTable()首先調用WriteLevel0Table(),源碼內容如下:
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) { mutex_.AssertHeld(); FileMetaData meta; meta.number = versions_->NewFileNumber();//獲取新生成的.sst文件的編號 pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator();//用於遍歷Memtable中的數據 Status s; { mutex_.Unlock(); s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//創建.sst文件,並將其相關信息記錄在meta中 mutex_.Lock(); } delete iter; //iter用完之後一定要刪除 pending_outputs_.erase(meta.number); int level = 0; if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); if (base != NULL) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);//為合並的輸出文件選擇合適的level } edit->AddFile(level, meta.number, meta.file_size,meta.smallest, meta.largest);//將生成的.sst文件加入到該level } return s; }
WriteLevel0Table()首先調用BuildTable()將Immutable Memtable中所有的數據寫入到一個.sst文件中,並將.sst文件的信息(文件編號,Key值範圍,文件大小)記錄到變量meta中.由於Memtable是基於Skiplist的,是一個有序表,因此在寫入.sst文件時,Key值也是從小到大來排列的.可以發現,將Memtable中的數據轉換為SSTable時,是將所有記錄都寫入SSTable的,要刪除的記錄也一樣.刪除操作會在更高level的Compaction中完成.因此level 0中可能會存在Key值相同的記錄.2. 然後調用PickLevelForMemTableOutput()為Memtable合並的輸出文件選擇合適的level,並調用edit->AddFile()將生成的.sst文件加入到該level中.
然後CompactMemTable()調用db文件夾下version_set.cc文件中的versions_->LogAndApply()基於當前版本和更改edit來得到一個新版本.
Trivial Compaction
由之前的分析可知,is_manual默認為false,會調用PickCompaction()來選出要進行合並的level和相應的輸入文件.當c->IsTrivialMove()滿足時,則直接將文件移動到下一level.
c = versions_->PickCompaction(); Status status; if (c == NULL) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); //將文件從該層刪除 c->edit()->AddFile(c->level() + 1, f->number, f->file_size, //將該文件加入到下一level f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); //應用更改,創建新的Version }
首先調用db文件夾下version_set.cc文件中的VersionSet::PickCompaction()為接下來的Compaction操作準備輸入數據,由之前對Compaction的數據結構分析可知,Compaction操作有兩種觸發方式:某一level的文件數太多和某一文件的查找次數超過允許值,在進行合並時,將優先考慮文件數過多的情況.
Compaction* VersionSet::PickCompaction() { Compaction* c; int level; const bool size_compaction = (current_->compaction_score_ >= 1);//文件數過多 const bool seek_compaction = (current_->file_to_compact_ != NULL);//某一文件的查找次數太多 if (size_compaction) {//文件數太多優先考慮 level = current_->compaction_level_; //要進行Compaction的level c = new Compaction(level); //每一層有一個compact_pointer,用於記錄compaction key,這樣可以進行循環compaction for (size_t i = 0; i < current_->files_[level].size(); i++) { //從待合並的level中選擇合適的文件完成合並操作 FileMetaData* f = current_->files_[level][i]; //level層中的第i個文件 if (compact_pointer_[level].empty() || //compact_pointer_中記錄的是下次合並的起始Key值,為空時都可以進行合並 icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { //或者f的最大Key值大於起始值 c->inputs_[0].push_back(f);//則該文件可以參與合並,將其加入到level輸入文件中 break; } } if (c->inputs_[0].empty()) { //若level輸入為空,則將level的第一個文件加入到輸入中 c->inputs_[0].push_back(current_->files_[level][0]); } } else if (seek_compaction) {//然後考慮查找次數過多的情況 level = current_->file_to_compact_level_; c = new Compaction(level); c->inputs_[0].push_back(current_->file_to_compact_);//將待合並的文件作為level層的輸入 } else { return NULL; } c->input_version_ = current_; c->input_version_->Ref(); //level 0中的Key值是可以重復的,因此Key值範圍可能相互覆蓋,把所有重疊都找出來,一起做compaction if (level == 0) { InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest);//待合並的level層的文件的Key值範圍 current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); assert(!c->inputs_[0].empty()); } SetupOtherInputs(c);//獲取待合並的level+1層的輸入 return c; }
然後判斷是否為trivial Compaction,當為trivial Compaction時,只需要簡單的將level層的文件移動到level +1 層即可
bool Compaction::IsTrivialMove() const { return (num_input_files(0) == 1 && //level層只有1個文件 num_input_files(1) == 0 && //level+1層沒有文件 TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);//level+2層文件總大小不超過最大覆蓋範圍,否則會導致後面的merge需要很大的開銷 }
最終完成完成Compaction操作
c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size,f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_);
一般的合並
一般的合並調用DBImpl::DoCompactionWork()完成,compact是調用VersionSet::PickCompacttion()得到的,與之前的trivial Compaction相同.不同level之間,可能存在Key值相同的記錄,但是記錄的seq不同.由之前的分析可知,最新的數據存放在較低的level中,其對應的seq也一定比level+1中的記錄的seq要大,因此當出現相同Key值的記錄時,只需要記錄第一條記錄,後面的都可以丟棄.level 0中也可能存在Key值相同的數據,其後面的seq也不同.數據越新,其對應的seq越大,且記錄在level 0中的記錄是按照user_key遞增,seq遞減的方式存儲的,則相同user_key對應的記錄是聚集在一起的,且按照seq遞減的方式存放的.在更高層的Compaction時,只需要處理第一條出現的user_key相同的記錄即可,後面的相同user_key的記錄都可以丟棄.因此合並後的level +1層的文件中不會存在Key值相同的記錄.刪除記錄的操作也會在此時完成,刪除數據的記錄會被丟棄,而不會被寫入到更高level的文件中.
Status DBImpl::DoCompactionWork(CompactionState* compact) { if (snapshots_.empty()) { compact->smallest_snapshot = versions_->LastSequence(); } else { compact->smallest_snapshot = snapshots_.oldest()->number_; } mutex_.Unlock(); //生成iterator:遍歷要compaction的數據 Iterator* input = versions_->MakeInputIterator(compact->compaction);//用於遍歷待合並的每一個文件 input->SeekToFirst(); Status status; ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { if (has_imm_.NoBarrier_Load() != NULL) { //immutable memtable的優先級最高 mutex_.Lock(); if (imm_ != NULL) { //當imm_非空時,合並Memtable CompactMemTable(); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } mutex_.Unlock(); } Slice key = input->key(); if (compact->compaction->ShouldStopBefore(key) && //是否需要停止Compaction,中途輸出compaction的結果,避免compaction結果和level N+2 files有過多的重疊 compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); } bool drop = false; if (!ParseInternalKey(key, &ikey)) { current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; } else { if (!has_current_user_key || //獲取當前的user_key和sequence user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { //可能存在Key值相同但seq不同的記錄 // 此時是這個Key第一次出現 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber;//則將其seq設為最大值,表示第一次出現 } if (last_sequence_for_key <= compact->smallest_snapshot) {//表示key已經出現過,否則seq應為KMaxSequenceNumber drop = true; // (A) //之前已經存在Key值相同的記錄,丟棄 } else if (ikey.type == kTypeDeletion && //要刪除該記錄 ikey.sequence <= compact->smallest_snapshot && //記錄的序號比數據庫之前的最小序號還小 compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //高的level中沒有數據 drop = true; //此時要丟棄該記錄 } last_sequence_for_key = ikey.sequence;//上次出現的記錄對應的sequence,用於判斷後面出現相同Key值的情況 } if (!drop) { //如果不需要丟棄該記錄 if (compact->builder == NULL) { status = OpenCompactionOutputFile(compact);//若需要,則創建一個.sst文件,用於存放合並後的數據 } if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value());//將記錄寫入.sst文件 if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { //當.sst文件超過最大值時 status = FinishCompactionOutputFile(compact, input);//完成Compaction輸出文件 } } input->Next(); //處理下一個文件 } if (status.ok() && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); } if (status.ok()) { status = input->status(); } delete input; input = NULL; //更新compaction的一些統計數據 CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { stats.bytes_read += compact->compaction->input(which, i)->file_size; } } for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats); if (status.ok()) { status = InstallCompactionResults(compact);//完成合並 } if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status; }
首先將可以留下的記錄寫入到.sst文件中,並將相關信息保存在變量compact中,然後調用InstallCompactionResults()將所做的改動加入到VersionEdit中,再調用LogAndApply()來得到新的版本.
LogAndApply()
在上面三種不同的Compaction操作中,最終當對當前版本的更改VersionEdit全部完成後,都會調用VersionSet::LogAndApply()來應用更改,創建新版本.edit中保存了level和level+1層要刪除和增加的文件.
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Version* v = new Version(this); //創建一個新Version { Builder builder(this, current_);//基於當前Version創建一個builder變量 builder.Apply(edit);//將edit中記錄的要增加、刪除的文件加入到builder類中 builder.SaveTo(v);//然後將edit中的記錄保存到新創建的Version中,這樣就得到了一個新的版本 } Finalize(v);//根據各層文件數來判斷是否還需要進行Compaction std::string new_manifest_file; Status s; if (descriptor_log_ == NULL) { //只會在第一次調用時進入 assert(descriptor_file_ == NULL); new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);//創建一個新的Manifest文件 edit->SetNextFile(next_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); if (s.ok()) { descriptor_log_ = new log::Writer(descriptor_file_); s = WriteSnapshot(descriptor_log_);//快照,系統開始時完整記錄數據庫的所有信息 } } { mu->Unlock(); if (s.ok()) { std::string record; edit->EncodeTo(&record); s = descriptor_log_->AddRecord(record);//將數據庫的變化記錄到Manifest文件中 if (s.ok()) { s = descriptor_file_->Sync(); } } if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); } mu->Lock(); } if (s.ok()) { AppendVersion(v); //將新得到的Version插入到所有Version形成的雙向鏈表的尾部 log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } } return s; }
為了重啟之後能恢復數據庫之前的狀態,就需要將數據庫的歷史變化信息記錄下來,這些信息都是記錄在Manifest文件中的.為了節省空間和時間,leveldb采用的是在系統開始完整的所有數據庫的信息(WriteSnapShot()),以後則只記錄數據庫的變化,即VersionEdit中的信息(descriptor_log_->AddRecord()).恢復時,只需要根據Manifest中的信息就可以一步步的恢復到上次的狀態.
VersionSet::LogAndApply首先創建一個新的Version,然後調用builder.Apply(edit)將edit中所有要刪除、增加的文件編號記錄下來,其源碼如下:
// Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { // 更新每一層下次合並的起始Key值 for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { const int level = edit->compact_pointers_[i].first; vset_->compact_pointer_[level] = edit->compact_pointers_[i].second.Encode().ToString(); } //將所有要刪除的文件加入到levels_[level].deleted_files變量中 const VersionEdit::DeletedFileSet& del = edit->deleted_files_; for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin(); iter != del.end();++iter) { const int level = iter->first; const uint64_t number = iter->second; levels_[level].deleted_files.insert(number); } // 將所有新增加的文件加入到levels_[level].added_files中 for (size_t i = 0; i < edit->new_files_.size(); i++) { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; f->allowed_seeks = (f->file_size / 16384); if (f->allowed_seeks < 100) f->allowed_seeks = 100; levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } }
然後VersionSet::LogAndApply再調用builder.SaveTo(v)將更改保存到新的Version中,其源碼如下:
void SaveTo(Version* v) { BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < config::kNumLevels; level++) { const std::vector<FileMetaData*>& base_files = base_->files_[level];//當前Version中原有的各個level的.sst文件 std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin(); std::vector<FileMetaData*>::const_iterator base_end = base_files.end(); const FileSet* added = levels_[level].added_files;//對應level新增加的文件 v->files_[level].reserve(base_files.size() + added->size()); for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end();++added_iter) { // 將原有文件中編號比added小的加入到新的Version for (std::vector<FileMetaData*>::const_iterator bpos = std::upper_bound(base_iter, base_end, *added_iter, cmp); base_iter != bpos;++base_iter) { MaybeAddFile(v, level, *base_iter); } MaybeAddFile(v, level, *added_iter);//再將新增的文件依次加入到新的Version } for (; base_iter != base_end; ++base_iter) { MaybeAddFile(v, level, *base_iter);//再將原有文件中剩余的部分加入到新的Version } } }
bpos = std::upper_bound(base_iter,base_end,*added_iter,cmp); // 返回base_iter到base_end之間,第一個大於*added_iter的iter.假設原有文件的編號為1、3、4、6、8,新增文件的編號為2、5、7,則第一次循環時,bpos為3對應的叠代器,因此base_iter只遍歷一個元素,即將編號1加入到新的Version中.總體對新增文件來說,就是首先加入base中編號比它小的,然後再將其加入,然後再繼續比那裏下一個新增文件,因此最終得到的文件編號順序是 1、2、3、4、5、6、7、8,即每一層的.sst文件都是按照編號從小到大排列的.這樣就得到了新的Version的每一層的所有文件.
參考文獻:
1.http://blog.csdn.net/u012658346/article/details/45787233
2.http://blog.csdn.net/u012658346/article/details/45788939
3.http://blog.csdn.net/joeyon1985/article/details/47154249
4.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html
5.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/
LevelDB的源碼閱讀(四) Compaction操作