1. 程式人生 > >深入淺出leveldb之基礎知識

深入淺出leveldb之基礎知識

記得大學剛畢業那年看了侯俊傑的《深入淺出MFC》,就對深入淺出這四個字特別偏好,並且成為了自己對技術的要求標準——對於技術的理解要足夠的深刻以至於可以用很淺顯的道理給別人講明白。以下內容為個人見解,如有雷同,純屬巧合,如有錯誤,煩請指正。

因為leveldb很多型別的宣告和實現分別在.h和.cc兩個檔案中,為了程式碼註釋方便,我將二者合一(類似JAVA和GO類的定義方法),讀者在原始碼中找不到我引用的部分屬於正常現象。


目錄

Slice

SequenceNumber

Varint

ValueType

InternalKey

LookupKey

Comparator

BytewiseComparatorImpl

InternalKeyComparator

leveldb_comparator_t

Iterator

AtomicPointer


Slice

Slice作為leveldb最基礎的型別之一,應用非常廣泛,關鍵點在於leveldb的key和value兩個型別都是Slice型別。其實Slice型別和std::string(std::string也是可以用來儲存binary資料,protobuf就是用std::string實現的binary資料儲存)非常類似,但是要比std::string輕量很多。不用多說,都在註釋裡:

// 程式碼源自leveldb/include/leveldb/slice.h
class Slice {
public:
    // 沒有任何引數的建構函式,預設資料指向一個空字串,而不是NULL
    Slice() : data_(""), size_(0) { }
    // 通過字串指標和長度構造Slice,此方法可以用於擷取部分字串,也可以用於binary型別資料
    // 我好奇的是為什麼沒有用const void*作為d的引數型別,這樣會更通用一點
    Slice(const char* d, size_t n) : data_(d), size_(n) { }
    // 通過std::string構造Slice,因為沒有用explicit宣告,說明可以Slice s = std::string()方式賦值
    Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
    // 通過字串指標構造Slice,大小就是字串長度
    Slice(const char* s) : data_(s), size_(strlen(s)) { }
    // 返回資料指標,返回型別是const char*而不是void*需要注意一下
    const char* data() const { return data_; }
    // 返回資料長度
    size_t size() const { return size_; }
    // 判斷Slice是否為空,僅通過size_等於0,因為預設建構函式data="",不是NULL,所以不能用空指標判斷
    bool empty() const { return size_ == 0; }
    // 過載了[]運算子,那麼就可以像陣列一樣訪問Slice了,返回的型別是char型
    char operator[](size_t n) const {
        assert(n < size());
        return data_[n];
    }
    // 清空Slice
    void clear() { data_ = ""; size_ = 0; }
    // 刪除前面一些資料
    void remove_prefix(size_t n) {
        assert(n <= size());
        // 對於Slice來說就是指標偏移
        data_ += n;
        size_ -= n;
    }
    // 把Slice轉換為std::string
    std::string ToString() const { return std::string(data_, size_); }
    // 比較函式,基本上和std::string是相同的比較方法
    int compare(const Slice& b) const {
        // 取二者最小的長度,避免越界
        const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
        // 直接使用memcmp()函式實現
        int r = memcmp(data_, b.data_, min_len);
        // 二者相等還有其他情況,那就是連個Slice的長度不同時,誰的更長誰就更大
        if (r == 0) {
            if (size_ < b.size_) r = -1;
            else if (size_ > b.size_) r = +1;
        }
        return r;     
    }
    // 判斷Slice是不是以某個Slice開頭的,這個普遍是拿Slice作為key使用的情況,因為leveldb的key是字串型的,而且經常以各種字首做分類
    bool starts_with(const Slice& x) const {
        return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
    }

private:
    // 就兩個成員變數,一個指向資料,一個記錄大小
    const char* data_;
    size_t size_;
};
// 同時還過載了連個全域性的運算子==和!=,應該還是比較簡單的哈
inline bool operator==(const Slice& x, const Slice& y) {
    return ((x.size() == y.size()) && (memcmp(x.data(), y.data(), x.size()) == 0));
}
inline bool operator!=(const Slice& x, const Slice& y) {
    return !(x == y);
}

Slice型別不像std::string有記憶體管理能力,所有的記憶體由Slice外部管理,這一點需要注意一下。

SequenceNumber

SequenceNumber是一個無符號64位整型的值,我們這裡用“順序號”這個名字。leveldb每新增/修改一次記錄都會觸發順序號的+1。

// 程式碼源自leveldb/db/dbformat.h
typedef uint64_t SequenceNumber;

Varint

 Varint是一種比較特殊的整數型別,它包含有Varint32和Varint64兩種,它相比於int32和int64最大的特點是長度可變。我們都知道sizeof(int32)=4,sizeof(int64)=8,但是我們使用的整型資料真的都需要這麼長的位數麼?舉個例子,我們的使用leveldb儲存的鍵,很可能長度連一個位元組都用不上,但是leveldb又不能確定使用者鍵的大小範圍,所以Varint就應運而生了。

因為Varint沒法用具體的結構體或者標準型別表達,所以使用的時候需要編碼/解碼(亦或是序列化/反序列化)過程,我們通過程式碼就可以清晰的瞭解Varint的格式了。

// 程式碼源自leveldb/util/coding.cc
// 我們指的Varint32就是儲存在dst中,按照Varint32格式封裝的資料
char* EncodeVarint32(char* dst, uint32_t v) {
    unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
    static const int B = 128;
    // 小於128,儲存空間為一個位元組,[v]
    if (v < (1<<7)) {
        *(ptr++) = v;
    }
    // 大於等於128小於16K用兩個位元組儲存[v(7-13位), v(0-6位)|128] 
    else if (v < (1<<14)) {
        *(ptr++) = v | B;
        *(ptr++) = v>>7;
    }
    // 大於等於16K小於2M用3個位元組儲存[v(14-20位),v(7-13位)|128, v(0-6位)|128] 
    else if (v < (1<<21)) {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = v>>14;
    }
    // 大於等於2M小於256M用4個位元組儲存[v(21-27位),v(14-20位)|128,v(7-13位)|128, v(0-6位)|128] 
    else if (v < (1<<28)) {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = (v>>14) | B;
        *(ptr++) = v>>21;
    }
    // 大於等於256M用5個位元組儲存[v(28-32位),v(21-27位)|128,v(14-20位)|128,v(7-13位)|128, v(0-6位)|128] 
    else {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = (v>>14) | B;
        *(ptr++) = (v>>21) | B;
        *(ptr++) = v>>28;
    }

    return reinterpret_cast<char*>(ptr);
}

上面是int32編碼成Varint32的原始碼,其實他的編碼風格有點類似utf-8,位元組從低到高儲存的是整型的從低到高指定位數,每個位元組的最高位為標誌位,為1代表後面(更高位)還有數,直到遇到一個位元組最高位為0。所以每個位元組的有效位數只有7位,這樣做雖然看似浪費了一些空間,如果我們使用的整型資料主要集中在2M以內的話,那麼我們反而節省了2個位元組的空間。

解碼以及Varint64相關的程式碼讀者自行看吧,原理已經瞭解了,這些已經沒有什麼技術含量了。 

ValueType

ValueType我們直譯成“值型別”,在leveldb中,值的型別只有兩種,一種是有效資料,一種是刪除資料。因為值型別主要和物件鍵配合使用,這樣就可以知道該物件是有值的還是被刪除的。在leveldb中更新和刪除都不會直接修改資料,而是新增一條記錄,後期合併會刪除老舊資料。

// 程式碼源自leveldb/db/dbformat.h
enum ValueType {
    kTypeDeletion = 0x0,                               // 刪除
    kTypeValue = 0x1                                   // 資料
};
static const ValueType kValueTypeForSeek = kTypeValue; // 用於查詢

 在查詢物件時,物件不能是被刪除的,所以kValueTypeForSeek等於kTypeValue。

InternalKey

型別名字已經非常能代表意思了,雖然使用者在使用leveldb的時候用Slice作為key,但是在leveldb內部是以InternalKey作為Key的。所以我們非常有必要了解一下這個型別到底和Slice有什麼不同,而且為什麼要這麼實現。

// 程式碼源自leveldb/db/dbformat.h
class InternalKey {
private:
    // 只有一個私有成員變數,也就是說把Slice變成了std::string型別
    std::string rep_;
public:
    // 建構函式,這個沒引數,但是可以通過DecodeFrom()再設定
    InternalKey() { }
    // 內部鍵包含:使用者指定的鍵Slice、順序號、以及值型別
    InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
        // 最終的內部鍵的格式為:[Slice]+[littleendian(SequenceNumber<<8 + ValueType)],程式碼實現讀者自己看吧 
        AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
    }
    // 從一個Slice中解碼內部鍵,就是直接按照字串賦值,所以此處的Slice非user_key
    // 而是別的InternalKey.Encode()介面輸出的
    void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
    // 把內部鍵編碼成Slice格式,其實就是用Slice封裝一下,注意呼叫這個函式後InternalKey物件不能析構
    // 因為Slice不負責記憶體管理
    Slice Encode() const {
        assert(!rep_.empty());
        // 直接返回std::string型別?上面我們介紹Slice時候說過,Slice有一個建構函式引數就是std::string型別
        // 並且沒有宣告為explicit,所以可以將std::string賦值給Slice
        return rep_;
    }
    // 返回使用者鍵值,所以需要按照上面建構函式的格式提取出來,ExtractUserKey()讀者自己看就行,很簡單
    Slice user_key() const { return ExtractUserKey(rep_); }
    // ParsedInternalKeyd的型別下面會有介紹,比較簡單
    void SetFrom(const ParsedInternalKey& p) {
        rep_.clear();
        // 這個函式呼叫和建構函式裡面呼叫的是一個函式,所以就不多說了
        AppendInternalKey(&rep_, p);
    }
    // 清空介面
    void Clear() { rep_.clear(); }
};

從程式碼我們可以得出一個結論,內部鍵格式是在使用者指定的鍵(Slice)基礎上追加了按照小端方式儲存的(順序號<<8+值型別),即便是用std::string型別儲存的,但是已經不再是純粹的字串了。

LookupKey

當需要在leveldb查詢物件的時候,查詢順序是從第0層到第n層遍歷查詢,找到為止(最新的修改或者刪除的資料會優先被找到,所以不會出現一個鍵有多個值的情況)。由於不同層的鍵值不同,所以LookupKey提供了不同層所需的鍵值。

// 程式碼源自leveldb/db/dbformat.h
// 因為查詢可能需要查詢memtable和sst,所以儲存的內容要包含多種儲存結構的鍵值
class LookupKey {
public:
    // 建構函式需要使用者指定的鍵以及leveldb內部
    LookupKey(const Slice& user_key, SequenceNumber sequence) {
        // 獲取使用者指定的鍵的長度
        size_t usize = user_key.size();
        // 為什麼擴充套件了13個位元組,其中8位元組(64位整型順序號<<8+值型別)+5位元組(內部鍵的長度Varint32)
        size_t needed = usize + 13;  // A conservative estimate
         // 內部有一個固定大小的空間,200個位元組,這樣可以避免頻繁的記憶體分配,因為200個位元組可以滿足絕大部分需求
        char* dst;
        if (needed <= sizeof(space_)) {
            dst = space_;
        } else {
            dst = new char[needed];
        }
        // 記錄一下起始地址,在物件析構的時候需要釋放空間(如果是從堆上申請的空間)
        start_ = dst;
        // 起始先儲存內部鍵的長度,這個長度是Varint32型別
        dst = EncodeVarint32(dst, usize + 8);
        // 接著就是使用者指定的鍵值
        kstart_ = dst;
        memcpy(dst, user_key.data(), usize);
        dst += usize;
        // 最後是64位的(順序號<<8|值型別),此處值型別是kValueTypeForSeek,和類名LookupKey照應上了
        EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
        dst += 8;
        // 記錄結束位置,可以用於計算各種型別鍵值長度
        end_ = dst;
        // 整個儲存空間的結構為[內部鍵大小(Varint32)][使用者指定鍵][順序號<<8|值型別]
    }
    // 解構函式,因為有申請記憶體的可能性,所以解構函式還是要有的
    ~LookupKey() {
        // 要判斷一下記憶體是否為堆上申請的,如果是就釋放記憶體
        if (start_ != space_) delete[] start_;
    }
    // 獲取memtable需要的鍵值,從這裡我們知道memtable需要的是記憶體中全部內容
    Slice memtable_key() const { return Slice(start_, end_ - start_); }
    // 獲取內部鍵
    Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
    // 獲取使用者指定鍵
    Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

private:
    const char* start_;  // 指向儲存空間的起始位置
    const char* kstart_; // 指向使用者指定鍵/內部鍵的起始位置
    const char* end_;    // 指向鍵值的結尾
    char space_[200];    // 這樣可以避免頻繁申請記憶體
};

Comparator

Comparator是個抽象類,主要用於資料鍵值比較的,畢竟leveldb是按照鍵有序儲存的,所以比較器算是一個比較通用的型別。並且,leveledb裡面定義的比較器有兩個特殊函式介面挺有意思的,具體我們從程式碼定義上來看看:

// 程式碼源自leveldb/include/leveldb/comparator.h
class Comparator {
public:
    // 虛解構函式,析構物件的時候會呼叫實現類的解構函式
    virtual ~Comparator();
    // 比較,這個和Slice的比較是一個意思,所以基本上就是用Slice.compare()實現的,直接用Slice.compare()不就完了麼
    // 為什麼還要定義這個類呢,後面會有各種實現類讀者就知道為什麼了
    virtual int Compare(const Slice& a, const Slice& b) const = 0;
    // 獲取比較器的名字,這個不是很重要,也就是每個比較器有一個名字,我看基本都是實現類的全名(含namespace)
    virtual const char* Name() const = 0;
    // 通過函式名有點看不出來什麼意思,這個函式需要實現的功能是找到一個最短的字串,要求在[*start,limit)區間
    // 這是很有意思的功能,當需要找到以某些字串為字首的所有物件時,就會用到這個介面,等我們看到的時候會重點解釋使用方式
    // 如果比較簡單的比較器實現可以實現為空函式
    virtual void FindShortestSeparator(std::string* start, const Slice& limit) const = 0;
    // 這個介面和上一個介面很像,只是輸出最短的比*key大的字串,沒有區間限制,如果比較簡單的比較器可以實現為空函式
    virtual void FindShortSuccessor(std::string* key) const = 0;
};

因為FindShortestSeparator()和FindShortSuccessor()這兩個介面看似與比較器沒啥關係,但是這兩個介面實現的功能嚴重依賴比較功能,所以leveldb做到了比較器裡面。我搜遍了leveldb的所有原始碼,繼承Comparator的類只有三個:leveldb_comparator_t,InternalKeyComparator和BytewiseComparatorImpl。其中最為重要的當屬BytewiseComparatorImpl,我們需要先從這個型別入手,然後再介紹其他的型別,等看完後大家就明白為什麼BytewiseComparatorImpl是最重要的了。

BytewiseComparatorImpl

因為Slice並沒有規定Key具體型別,所以leveldb是支援使用者自定義比較器的,在建立leveldb資料庫物件的時候通過Option指定。大家需要注意一點,Option的建構函式預設是把比較器設定為BytewiseComparatorImpl的,也就是說BytewiseComparatorImpl是leveldb預設的比較器,所以我說它比較重要。直接上程式碼吧:

// 程式碼源自leveldb/util/options.cc
// 下面是Options的建構函式,我們可以看出comparator賦值是BytewiseComparator()的返回值
// BytewiseComparator()返回的就是BytewiseComparatorImpl物件指標
Options::Options()
    : comparator(BytewiseComparator()),
    ...... {
}
// 程式碼源自leveldb/util/comparator.cc
// Bytewise顧名思義按照位元組主意比較,所以原理比較簡單,下面就是具體實現了
class BytewiseComparatorImpl : public Comparator {
public:
    // 空的建構函式,也是,這種物件也不需要啥成員變數
    BytewiseComparatorImpl() { }
    // 實現獲取名字的介面
    virtual const char* Name() const {
        return "leveldb.BytewiseComparator";
    }
    // 比較本身就依賴Slice.compare()函式就可以了,本身Slice.compare()就是用memcmp()實現的,符合按位元組比較的初心
    virtual int Compare(const Slice& a, const Slice& b) const {
        return a.compare(b);
    }
    // 在Comparator定義的時候就感覺就比較模糊,這回看看是怎麼實現的?
    virtual void FindShortestSeparator(std::string* start, const Slice& limit) const {
        // 二者取最小的長度,避免越界訪問
        size_t min_length = (std::min)(start->size(), limit.size());
        // 名字比較明確,就是第一個位元組值不同的索引值,初始為0,這一點應該比較好理解,如果和limit相同,+1肯定就比limit大了
        // 所以要找到第一個和limit不同的位元組,從理論上講,應該*start<=limit,第一個不同的字元就是第一個比limit位元組小的數
        // 當然也不排除有傳錯引數的情況
        size_t diff_index = 0;
        // 找到第一個不相同的位元組值的位置
        while ((diff_index < min_length) && ((*start)[diff_index] == limit[diff_index])) {
            diff_index++;
        }
        // 也就是說*start和limit是相同的或者說limit是以*start為字首的,所以肯定找不到一個字串比*start大還比limit小
        if (diff_index >= min_length) {
        } else {
            // 取出這個位元組值
            uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
            // 這個位元組不能是0xff,同時+1後也也要比limit相應位元組小,至少能看出來0xff對於leveldb是有特殊意義的
            if (diff_byte < static_cast<uint8_t>(0xff) && diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
                // 把找到的那個字元+1,同時把字串縮短到那個不同字元的位置,這樣就是[*start,limit)區間最短的字串了
                (*start)[diff_index]++;
                start->resize(diff_index + 1);
                assert(Compare(*start, limit) < 0);
            }
            // 其他情況就不對*start做任何修改,認為*start本身就是這個字串了,那我們舉一個例子:
            // *start=['a', 'a', 'a', 'c', 'd', 'e']
            //  limit=['a', 'a', 'a', 'b', 'd', 'e']
            // 上面這種情況是我認為輸出*start=['a', 'a', 'a', 'c', 'e']可能會更好,為什麼此處不這麼做呢?
            // 除非就是用在列舉以某個字串為字首的物件時候使用,找到第一個比字首大的字串
        }
    }
    // 找到第一個比*key大的最短字串
    virtual void FindShortSuccessor(std::string* key) const {
        // 獲取鍵的長度
        size_t n = key->size();
        // 遍歷鍵的每個位元組
        for (size_t i = 0; i < n; i++) {
            // 找到第一個不是0xff的位元組
            const uint8_t byte = (*key)[i];
            if (byte != static_cast<uint8_t>(0xff)) {
                // 把這個位置的位元組+1,然後截斷就可以了,算是比較簡單。
                (*key)[i] = byte + 1;
                key->resize(i+1);
                return;
            }
        }
        // 能到這裡說明*key全部都是0xff,也就找不到相應的字串了
    }
};

BytewiseComparatorImpl實現比較簡單,沒有什麼太複雜的邏輯,所以就不再多說寫什麼了。但是讀者還是需要注意一下,上面很多地方提到了字串,而且是用std::string作為型別,其實資料中可能存在'\0'。在很多情況需要用一些特殊的字元做分割,比如std::string a=['a', 'b', 'c', '\0', 'd'],a的長度是5而不是3。只是強行轉換為const char*時表現為“abc”而已,這本身是char*的自身約束(0是字串的結尾)。所以以後大家要習慣這種所謂的“字串”,並且BytewiseComparatorImpl這種位元組比較器名字還是比較貼切的。

InternalKeyComparator

顧名思義,主要用在內部的Key比較器,範圍也確定了,功能也確定了。其實InternalKeyComparator的主要功能還是通過BytewiseComparatorImpl實現,只是在BytewiseComparatorImpl基礎上做了一點擴充套件。

// 程式碼源自leveldb/db/dbformat.h
class InternalKeyComparator : public Comparator {
private:
    // 這個是最關鍵的了,有一個使用者定義的比較器,不用多說了,肯定是BytewiseComparatorImpl的物件啦
    // 所以我前面說InternalKeyComparator絕大部分是通過BytewiseComparatorImpl實現的
    const Comparator* user_comparator_;
public:
    // 建構函式需要提供比較器物件
    explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) { }
    // 返回比較器的名字
    virtual const char* Name() const {
        return "leveldb.InternalKeyComparator";
    }
    // 比較兩個Slice
    virtual int Compare(const Slice& a, const Slice& b) const {
        // 採用BytewiseComparatorImpl.compare()進行比較,但是比較的是刨除順序ID的使用者提供的鍵
        // 為什麼要刨除順序ID呢?看看下面的註釋就知道了
        int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
        // 使用者提供的鍵相等,意味著遇到了物件刪除或者修改操作,需要再比較一下序列號。
        if (r == 0) {
            // 要返序列化(解碼)順序號
            const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
            const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
            // 順序ID越大,說明插入時間越晚,資料就越新,如果排序的話應該排在前面,這個在LSM演算法有說明
            // 這就是為什麼要針對順序ID要特殊處理一下
            if (anum > bnum) {
                r = -1;
            } else if (anum < bnum) {
                r = +1;
            }
        }
        return r;
    }
    // 獲取一個最短的字串在[*start,limit)範圍內
    virtual void FindShortestSeparator(std::string* start, const Slice& limit) const {
        // 先把順序號去了,把使用者傳入的key提取出來
        Slice user_start = ExtractUserKey(*start);
        Slice user_limit = ExtractUserKey(limit);
        // 呼叫比較器的FindShortestSeparator()會修改傳入引數,所以先存在臨時變數中
        std::string tmp(user_start.data(), user_start.size());
        // 呼叫BytewiseComparatorImpl.FindShortestSeparator()
        user_comparator_->FindShortestSeparator(&tmp, user_limit);
        // 看看是否找到了字串,因為只要找到了那個字串,長度肯定會被截斷的。
        if (tmp.size() < user_start.size() && user_comparator_->Compare(user_start, tmp) < 0) {
            // 追加順序ID,此時的順序ID沒什麼大作用,所以用最大順序ID就可以
            PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
            assert(this->Compare(*start, tmp) < 0);
            assert(this->Compare(tmp, limit) < 0);
            start->swap(tmp);
        }
    }
    // 獲取一個比*key大的最短字串,原理和FindShortestSeparator很相似,我就不做重複註釋了
    virtual void FindShortSuccessor(std::string* key) const {
        Slice user_key = ExtractUserKey(*key);
        std::string tmp(user_key.data(), user_key.size());
        user_comparator_->FindShortSuccessor(&tmp);
        if (tmp.size() < user_key.size() && user_comparator_->Compare(user_key, tmp) < 0) {
            PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
            assert(this->Compare(*key, tmp) < 0);
            key->swap(tmp);
        }
    }
    // 獲取使用者傳入的比較器指標
    const Comparator* user_comparator() const { return user_comparator_; }
    // 比較內部鍵,內部鍵儲存格式就是[使用者鍵][順序號<<8|值型別],所以重新封裝成Slice後複用上面提到的比較函式
    int Compare(const InternalKey& a, const InternalKey& b) const {
        return Compare(a.Encode(), b.Encode());
    }
};

看上面的程式碼就明白為什麼叫做InternalKeyComparator了,因為leveldb內部儲存鍵的時候在使用者提供的鍵之上追加了順序號,所以比較的時候略有不同。

leveldb_comparator_t

這個是轉為C介面設計的比較器,非常簡單,讀者自己看程式碼瞭解一下。

Iterator

用過標準庫(stl)容器的對迭代器不會陌生,用於遍歷(可以條件遍歷)容器中的元素使用。在leveldb中也有迭代器,畢竟leveldb可以想象成一個容量更大、功能更強的std::map嘛。在leveldb中,對Iterator進行了抽象,不同的儲存型別自行實現具體的迭代器,我們這裡只對Iterator的每個介面的功能做說明,具體實現會在其他文章中說明。

// 程式碼源自leveldb/include/leveldb/iterator.h
class Iterator {
    // 為了方便理解建構函式和解構函式,程式碼上把私有成員變數和型別定義放到前面
private:
    // 定義清理型別
    struct Cleanup {
        CleanupFunction function; // 清理函式,型別定義下面有定義
        void* arg1;               // 清理函式的引數1
        void* arg2;               // 清理函式的引數2
        Cleanup* next;            // 單向連結串列,指向下一個清理物件
    };
    // 所有需要清理的內容
    Cleanup cleanup_;

public:
    // 建構函式,初始是沒有任何需要清理的物件的
    Iterator() {
        cleanup_.function = NULL;
        cleanup_.next = NULL;
    }
    // 解構函式
    virtual ~Iterator() {
        // 看看是否有任何需要清理的物件
        if (cleanup_.function != NULL) {
            // 清理掉物件
            (*cleanup_.function)(cleanup_.arg1, cleanup_.arg2);
            // 遍歷連結串列上的其他清理物件,逐一清理
            for (Cleanup* c = cleanup_.next; c != NULL; ) {
                (*c->function)(c->arg1, c->arg2);
                Cleanup* next = c->next;
                delete c;
                c = next;
            }
        }
    }
    // 獲取迭代器當前是否正常,比如到了結束為止該函式就會返回false
    virtual bool Valid() const = 0;
    // 定位到第一個物件為止
    virtual void SeekToFirst() = 0;
    // 定位到最後一個物件位置
    virtual void SeekToLast() = 0;
    // 定位到Slice指定的物件位置,如果沒有物件,那麼Valid()返回false.
    virtual void Seek(const Slice& target) = 0;
    // 定位到下一個物件,等同於stl容器迭代器的++
    virtual void Next() = 0;
    // 定位到前一個物件,等同於stl容器迭代器的--
    virtual void Prev() = 0;
    // 獲取迭代器當前定位物件的鍵,前提是Valid()返回true
    virtual Slice key() const = 0;
    // 獲取迭代器當前定位物件的值,前提是Valid()返回true
    virtual Slice value() const = 0;
    // 返回當前的狀態
    virtual Status status() const = 0;
    // 定義清理函式型別
    typedef void (*CleanupFunction)(void* arg1, void* arg2);
    // 註冊清理物件
    void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2) {
        assert(func != NULL);
        Cleanup* c;
        // 如果當前清理物件連結串列是空的,那就把當前的清理物件記錄在表頭
        if (cleanup_.function == NULL) {
            c = &cleanup_;
        } else {
            // 建立新的清理物件放在表頭的下一個位置,因為沒有規定清理順序,所以這種做法效率最高
            c = new Cleanup;
            c->next = cleanup_.next;
            cleanup_.next = c;
        }
        // 記錄清理物件的清理函式和引數
        c->function = func;
        c->arg1 = arg1;
        c->arg2 = arg2;
    }
};

迭代器的定義還是比較簡單的,相比於stl增加了清理物件的內容,我會在其他章節說明都清理什麼內容,這主要看具體的儲存實現才行。

AtomicPointer

原子指標是通過原子操作訪問的指標(不是訪問指標指向的記憶體值,是指標本身的值),相比於普通指標(比如void*),對指標的獲取和設定都支援原子操作的介面,同時也支援普通的訪問方式。我們來看看AtomicPointer的定義:

// 程式碼源自leveldb/port/atomic_pointer.h
// leveldb/port目錄是根據系統、CPU的不同自行實現的適配目錄,所以我們只講介面,不講實現
class AtomicPointer {
private:
    // 指標
    void* rep_;
 public:
    // 預設建構函式
    AtomicPointer() { }
    // 提供指標的建構函式
    explicit AtomicPointer(void* p) : rep_(p) {}
    // 正常方式的獲取/設定指標,比較簡單,不用多說了
    inline void* NoBarrier_Load() const { return rep_; }
    inline void NoBarrier_Store(void* v) { rep_ = v; }
    // 原子獲取指標是通過記憶體屏障實現的,leveldb的記憶體屏障是阻止編譯器亂序,但不能阻止CPU亂序執行(在SMP體系下)
    // 此處使用記憶體屏障的最主要原因是因為該函式是inline,在不知道使用函式的上下程式碼的情況下,可能會被編譯器優化掉
    // 我們這裡舉一個比較簡單的例子:void* ptr = NULL;
    //                            ......(等待其他執行緒設定指標)
    //                            ptr = Acquire_Load();
    // 此時從編譯器的角度來看ptr和後面的程式碼沒有任何依賴,那麼編譯器可以先賦值ptr在等待其他執行緒設定
    // 這個本身就完全改變了我們想要的結果
    inline void* Acquire_Load() const {
        void* result = rep_;
        MemoryBarrier();
        return result;
    }
    // 原子設定指標是通過記憶體屏障實現的,同樣的道理,因為是inline函式,此處的v可能會在呼叫本函式前被其他執行緒修改
    // 此處的記憶體屏障就可以保證賦值語句不會被編譯器優化而被提前執行
    inline void Release_Store(void* v) {
        MemoryBarrier();
        rep_ = v;
    }
};

從程式碼上看,是否採用記憶體屏障訪問指標,感覺都沒什麼區別,因為二者都是inline,不存在呼叫函式帶來的開銷,二者的區別在於使用時的上下文。所以,我在其他介紹leveldb的文章中但凡用到源自指標的地方我都會說明為什麼使用帶有記憶體屏障的方式訪問指標。