levelDB里缓存实现

leveldb的缓存机制

leveldb采用LRU机制, 利用键的哈希值前n位作为索引, 将要插入的键值对分派到指定的缓存区, 当缓存区的使用率大于总容量后, 优先淘汰最近最少使用的缓存, 独立的缓存区总量为2^n .

初始化ShardedLRUCache

  • 设置初始缓存容量, 并设置16个子分区的容量.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    static const int kNumShardBits = 4;
    static const int kNumShards = 1 << kNumShardBits;
    explicit ShardedLRUCache(size_t capacity)
    : last_id_(0) {
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
    shard_[s].SetCapacity(per_shard);
    }
    }

新建缓存

  • 由key的hash值的前kNumShardBits位作为子缓存区索引, 默认为前4位的索引位, 索引到指定的区, 子缓存区LRUCache接管新建缓存的任务.
    1
    2
    3
    4
    5
    6
    7
    class ShardedLRUCache : public Cache {
    ...
    virtual Handle* Insert(const Slice& key, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
    }
  • LRUCache结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // A single shard of sharded cache.
    class LRUCache {
    public:
    LRUCache();
    ~LRUCache();

    // Separate from constructor so caller can easily make an array of LRUCache
    void SetCapacity(size_t capacity) { capacity_ = capacity; }

    // Like Cache methods, but with an extra "hash" parameter.
    Cache::Handle* Insert(const Slice& key, uint32_t hash,
    void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value));
    Cache::Handle* Lookup(const Slice& key, uint32_t hash);
    void Release(Cache::Handle* handle);
    void Erase(const Slice& key, uint32_t hash);

    private:
    void LRU_Remove(LRUHandle* e);
    void LRU_Append(LRUHandle* e);
    void Unref(LRUHandle* e);

    // Initialized before use.
    size_t capacity_;

    // mutex_ protects the following state.
    port::Mutex mutex_;
    size_t usage_;

    // Dummy head of LRU list.
    // lru.prev is newest entry, lru.next is oldest entry.
    LRUHandle lru_;

    HandleTable table_;
    };

LRUCache才是真正具有缓存功能的结构, capacity_表示它的最大容量, mutex_规范各线程互斥地访问, usage_标志缓存中已用容量, lru_作为哑节点, 它的前节点是最新的缓存对象, 它的后节点是最老的缓存对象, table_是用来统计对象是否被缓存的哈希表.

  • LRUCache缓存生成

    1. 缓存的基础节点是LRUHandle, 储存节点的(键, 值, 哈希值, next, prev节点, 销毁处理函数等)信息. 综合上面的LRUCache结构也看到, 实际上, 缓存节点是双向列表存储, LRUHandle lru_这个哑节点用来分隔最常更新的节点和最不常更新节点.
    2. 当要将缓存节点插入缓存区, 先由哈希表判断缓存是否已存在, 若存在, 将其更新至最常更新节点; 若不存在, 则插入为最常更新节点. 同时更新哈希表HandleTable table_.
    3. 这里的HandleTable实现并没特殊之处, 我看是采用键哈希策略进行哈希, 如果键冲突则以链表进行存储.
    4. 利用二级指针对链表进行插入.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    LRUHandle* Insert(LRUHandle* h) {
    LRUHandle** ptr = FindPointer(h->key(), h->hash);
    LRUHandle* old = *ptr;
    h->next_hash = (old == NULL ? NULL : old->next_hash);
    *ptr = h;
    if (old == NULL) {
    ++elems_;
    if (elems_ > length_) {
    // Since each cache entry is fairly large, we aim for a small
    // average linked list length (<= 1).
    Resize();
    }
    }
    return old;
    }
  • 二级指针
    在HandleTable这个哈希表实现里, 有个FindPointer方法用来查找此对象是否已存在, 并返回LRUHandle**, 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Return a pointer to slot that points to a cache entry that
    // matches key/hash. If there is no such cache entry, return a
    // pointer to the trailing slot in the corresponding linked list.
    LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != NULL &&
    ((*ptr)->hash != hash || key != (*ptr)->key())) {
    ptr = &(*ptr)->next_hash;
    }
    return ptr;
    }

可见, 如果已存在节点, 则返回这个节点的LRUHandle**; 如果不存在, 返回的是可以保存这个LRUHandle*的地址.

  • LRUCache缓存查找
    如果缓存存在, 则将其放置到哑节点lru_的prev位置, 即最近使用节点, 相当于提升它的优先级, 便于下次快速查找; 如果不存在这个缓存, 返回NULL.

    小结

    看完leveldb的缓存实现, 在实现思路上, 是传统的lru算法, 使用哈希表判重, 根据缓存的请求时间提升缓存的优先级. 里面的细节例如使用哈希值的前n位进行路由, 路由到2^n个独立的缓存区, 各个缓存区维护自己的mutex进行并发控制; 哈希表在插入节点时判断空间使用率, 并进行自动扩容, 保证查找效率在O(1).

levelDB里integer编码

leveldb的数字编码方法

  • EncodeFixed32
    按照机器的字节存储顺序(大端, 小端), 原样存储到一块32位内存

  • EncodeFixed64
    按照机器的字节存储顺序(大端, 小端), 原样存储到一块64位内存

  • PutFixed32
    将EncodeFixed32处理过的内存块追加到目的地址

  • PutFixed64
    同PutFixed32

  • EncodeVarint32
    将32位的无符号整数以7个有效位, 1个标志位的形式编码.

对于Varint这种整数编码, 是为了减短数值较小的整数所占的存储空间. 实现方法是每8位, 低7位存储有效数值和高1位存储标记位(用来标记是否还有后续的数值), 以127和128这两个数为例:
127 => 0111 1111
第一个字节低7位是有效数值127, 最高位位0, 所以表示没有后续的数值, 于是算出它所代表的数值是127.

128 => 1111 1111 0000 0001
第一个字节低7位是有效数值127, 最高位位1, 所以表示有后续的数值, 相同计算方法算出后续值1, 最高位标志0表示结束, 127 + (1<<7) = 128, 因为每8位实际表示的是7位, 所以要将1左移7位, 若有更多位, 计算方式相同.

leveldb里的32位编码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
if (v < (1<<7)) {
*(ptr++) = v;
} else if (v < (1<<14)) {
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}

用了5个if来判断这个数落在哪个数值区间里, 32位的数值使用变长编码能使用1-5位来表示

leveldb里的64位编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// leveldb implementation
char* EncodeVarint64(char* dst, uint64_t v) {
static const int B = 128;
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
while (v >= B) {
*(ptr++) = (v & (B-1)) | B;
v >>= 7;
}
*(ptr++) = static_cast<unsigned char>(v);
return reinterpret_cast<char*>(ptr);
}

// My own implementation
char* EncodeVarint64(char* dst, uint64_t v) {
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
int flag = 1;
for (int offset = 0; flag > 0 && offset < 8; offset++) {
*(ptr++) = ((v >> (offset * 7)) & 127) | B;
flag = v >> (offset * 7);
}
return reinterpret_cast<char*>(ptr);
}

比较了我和leveldb的实现之后, 发现leveldb更简洁高效, 是因为我的实现维护了一个offset来记录当前的偏移, flag用来记录是否继续计算, 每次都需要维护这个offset和flag, leveldb直接使用v作为变量, 通过每次对它的修改, 来记录是否继续, 同时它只需要用低7位, 省去了记录偏移的麻烦, 简洁有力!

levelDB里自定义的状态

LevelDB状态码

LevelDB里专门定义了Status类,自定义了常见的状态码,用于函数的返回。

状态码类型

在Status类中以枚举类型定义Code:

0 => 处理成功
1 => 未找到
2 => 崩溃
3 => 不支持
4 => 无效的参数
5 => IO错误

并且有6个静态方法用来生成对应状态码的Status对象, 和4个判断方法来方便确认状态。

存储方式

状态码包括状态信息都是利用私有字符串变量state_存储,存储方式如下:

0 ~ 4字节:消息长度(消息1的长度len1+分割符的长度+消息2的长度len2)
4 ~ 5字节:状态码
5 ~ 5+len1:消息1的长度
(若消息2不为空)
5+len1 ~ 7+len1:分隔符的长度(分隔符为2个字符,分别为冒号和空格)
7+len1 ~ 7+len1+len2:消息2的长度

这种存储方式,另得所有操作都是基于这个state_变量的下标索引操作。

小结

定义异常在其它代码遇到错误时可以抛出适当的异常, 这样的实现方式我认为好处是初始化,销毁和复制操作简单,并且这些内容在内存中的位置紧密,有利于缓存,不知道是不是这样考虑才这么做的,但分成3个变量也无伤大雅。

levelDB里skiplist的实现代码赏析

跳表的原理就是利用随机性建立索引,加速搜索,并且简化代码实现难度。具体的跳表原理不再赘述,主要是看了levelDB有一些实现细节的东西,凸显自己写的实现不足之处。

  • 去除冗余的key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    template<typename Key, class Comparator>
    struct SkipList<Key,Comparator>::Node {
    explicit Node(const Key& k) : key(k) { }

    Key const key;

    // Accessors/mutators for links. Wrapped in methods so we can
    // add the appropriate barriers as necessary.
    Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
    }
    void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
    }

    // No-barrier variants that can be safely used in a few locations.
    Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
    }
    void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
    }

    private:
    // Array of length equal to the node height. next_[0] is lowest level link.
    port::AtomicPointer next_[1];
    };

    这里使用一个Node节点表示所有相同key,不同高度的节点集合,仅保留了key和不同高度的向右指针,并且使用NewNode来动态分配随即高度的向右指针集合,而next_就指向这指针集合。这也是c/c++ tricky的地方。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #include <stdio.h>
    struct Node {
    char str[1];
    };
    int main() {
    char* mem = new char[4];
    for (int i = 0; i < 4; i++) {
    mem[i] = i + '0';
    }
    Node* node = (Node*)mem;
    char* const pstr = node->str;
    for (int i = 0; i < 4; i++) {
    printf("%c", pstr[i]);
    }
    return 0;
    }

    就像上面这个简单的sample,成员str可以作为指针指向从数组下标0开始的元素,并且不受申明时的限制,不局限于大小1,索引至分配的最大的内存地址。

  • 简易随机数生成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    uint32_t Next() {
    static const uint32_t M = 2147483647L; // 2^31-1
    static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0
    // We are computing
    // seed_ = (seed_ * A) % M, where M = 2^31-1
    //
    // seed_ must not be zero or M, or else all subsequent computed values
    // will be zero or M respectively. For all other values, seed_ will end
    // up cycling through every number in [1,M-1]
    uint64_t product = seed_ * A;

    // Compute (product % M) using the fact that ((x << 31) % M) == x.
    seed_ = static_cast<uint32_t>((product >> 31) + (product & M));
    // The first reduction may overflow by 1 bit, so we may need to
    // repeat. mod == M is not possible; using > allows the faster
    // sign-bit-based test.
    if (seed_ > M) {
    seed_ -= M;
    }
    return seed_;
    }

可以看到,他使用A和M对种子进行运算,达到一定数据范围内不会重复的数集,而里面对于(product % M),使用(product >> 31) + (product & M)进行运算优化,考虑右移和与操作的代价远小于取余操作。

  • 简洁清晰的私有帮助方法,帮助寻找小于指定key的节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    template<typename Key, class Comparator>
    typename SkipList<Key,Comparator>::Node*
    SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
    Node* x = head_;
    int level = GetMaxHeight() - 1;
    while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
    if (level == 0) {
    return x;
    } else {
    // Switch to next list
    level--;
    }
    } else {
    x = next;
    }
    }
    }
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×