下一步用武的目标

用武的出发点

产生于我自己比较失败的工作经历的一个发泄, 之所以说是失败, 有以下一些原因:

  1. 日益增加的技术渴求和没有匹配的日常工作之间的矛盾
  2. 工作口味一直没有被满足, 对工作的几个要求也是如此, 造成不能沉下心来做事
  3. 恶性循环, 导致离自己心里的追求越来越远

可能有这样那样的原因导致你的工作并不如意, 这可能包括你的技术、业务、薪资、人际等各种期待,与真实的工作并不合拍.

所以, 对我而言, 无论英雄, 都需要有用武之地.

我们更需要的是**过来人对工作选择的经验, 以及获得更多更详细的职位内核, 包括每日所干的事, 和其它类似职位有何不同, 做一个程序员能参与的评价和比较,
更有利于有诉求的程序员
根据自己的需求**, 以及对职位的更深入了解, 而不是通过千篇一律的职位描述, 或者其它官方的回答去揣测一份工作, 尽量减少错误挑选工作的概率, 减少走弯路的概率.

而我希望用武能提供这样的一个平台, 给程序员更多的机会去用自己经历和心得给这些来自互联网的职位进行评论以及评分, 或者打小标签等, 当然是基于实名的负责任地评价优劣, 降低程序员踩坑的次数.

用武的下一步方向

由于我是一个it从业人员, 可能对于这行的朋友的诉求更了解些, 因为自己就是这样很”作”, 所以优先收录这些互联网的职位, 比如国内的大型互联网职位发布网站, 如拉勾网, 内推网…

初步功能

  1. 收录更多的职位信息
    现在版本只收录了450个拉勾网上的职位信息, 是非常少的部分, 其次需要对于其它网站进行收录, 归纳各个网站的不同点, 规约一份公共的职位信息部分, 以及各自的特色.
  2. 尽量实时收录职位信息
    可能设置一个合适的频率去检测新增的职位.
  3. 更准确的关键词搜索
    现在的搜索引擎使用的是solr, 面对不完整的关键字PH, 搜索出的结果为空, 显然并不合理, 需要进行更精确的配置.
  4. github第三方登陆及用户注册
    既然是为了程序员的职业, 那么首先选择github作为第三方登陆模块, 为了以后大家可以使用程序员的名义负责任地进行职位的评分和评论.
  5. 更nice的前端, 职位块的显示更清晰, 显示出重点
    能够已更清晰简洁的排版展示出职位关键信息.

欲查看用武情况, 可访问用武

PHP7数组实现(画图版)

主要介绍一下PHP7对于数组的实现。


预备知识

PHP的数据类型

php_types

zend_long

php中的long类型, 在64位机上是64位有符号整数, 不然是32位符号整数。

1
2
3
4
5
6
7
8
9
10
11
// zend_long.h
/* This is the heart of the whole int64 enablement in zval. */
#if defined(__x86_64__) || defined(__LP64__) || defined(_LP64) || defined(_WIN64)
# define ZEND_ENABLE_ZVAL_LONG64 1
#endif

#ifdef ZEND_ENABLE_ZVAL_LONG64
typedef int64_t zend_long;
#else
typedef int32_t zend_long;
#endif

double

双精度浮点数

zend_refcounted

引用计数类型, 记录引用次数, 以及u用于存储元素类型信息type字段(如is_string, is_array, is_object等), 标明对象是否调用过free, destructor函数的flags, 记录GC root number (or 0) and color的gc_info字段(详情见php的zend_gc.h及zend_gc.c).

1
2
3
4
5
6
7
8
9
10
11
12
13
// zend_types.h
struct _zend_refcounted {
uint32_t refcount; /* reference counter 32-bit */
union {
struct {
ZEND_ENDIAN_LOHI_3(
zend_uchar type,
zend_uchar flags, /* used for strings & objects */
uint16_t gc_info) /* keeps GC root number (or 0) and color */
} v;
uint32_t type_info;
} u;
};

zend_string

字符串类型, 带有引用计数, 和string的hash值h, 避免每次需要计算, 字段长度len, 以及val用来索引字符串, 这里tricky地避免了2次malloc内存.

1
2
3
4
5
6
7
8
// zend_types.h
typedef struct _zend_string zend_string;
struct _zend_string {
zend_refcounted gc;
zend_ulong h; /* hash value */
size_t len;
char val[1];
};

zend_object

1
2
3
4
5
6
7
8
9
typedef struct _zend_object     zend_object;
struct _zend_object {
zend_refcounted gc;
uint32_t handle; // TODO: may be removed ???
zend_class_entry *ce;
const zend_object_handlers *handlers;
HashTable *properties;
zval properties_table[1];
};

zend_resource

1
2
3
4
5
6
struct _zend_resource {
zend_refcounted gc;
int handle; // TODO: may be removed ???
int type;
void *ptr;
};

zend_reference

1
2
3
4
struct _zend_reference {
zend_refcounted gc;
zval val;
};

zend_array

详细见下文具体实现.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// zend_types.h
typedef struct _zend_array zend_array;

typedef struct _zend_array HashTable;

struct _zend_array {
zend_refcounted gc;
union {
struct {
ZEND_ENDIAN_LOHI_4(
zend_uchar flags,
zend_uchar nApplyCount,
zend_uchar nIteratorsCount,
zend_uchar reserve)
} v;
uint32_t flags;
} u;
uint32_t nTableMask;
Bucket *arData;
uint32_t nNumUsed;
uint32_t nNumOfElements;
uint32_t nTableSize;
uint32_t nInternalPointer;
zend_long nNextFreeElement;
dtor_func_t pDestructor;
};

位运算

给出一个上限Max, 利用位运算求出指定N的表达式-(Max-(N % Max))的值.

1
2
3
4
uint32_t max = 2;
uint32_t mask = (uint32_t)-max; // 二进制表示: 11111111 11111111 11111111 11111110
uint32_t n = 12345678;
int32_t answser = (int32_t)(n | mask); // -2

散列表

散列表原理维基百科

局部性原理

空间局部性, 时间局部性

具体实现

初始化

1
2
<?php
$arr = array();

php_init

arData指针指向的内存, 是真实数据的起始地址, 在邻近的低址内存是存储hash值到真实数据地址的映射表, 这个映射表是uint32_t类型的数组, 大小相同于nTableSize, 这样最好情况下, 每个hash值都能映射一个真实数据.

插入

插入key为$key1, $key1.hash为0, 值为10的元素

1
2
<?php
$arr[$key1] = 10;

php_insert

上文提到的位运算就是应用在插入元素场景, 由于arData指向的是真实数据的起始地址, 而索引信息(即存储hash值到真实数据的映射)处于arData更低地址, 那么要更新索引信息, 就需要计算出-(nTableSize-(nHash % nTableSize)), nHash就是键的hash值, 例如向大小为2的数组, 插入hash值为0的元素, 那么索引到hash值为0的区域就是((uint32_t*)arData)-(2-(0%2)), 如图, 将hash值为0的数据偏移0*sizeof(Bucket)存储到了((uint32_t*)arData)-2.

哈希冲突

插入key为$key2($key2 != $key1), $key2.hash为0, 值为20的元素, 造成哈希冲突

1
2
<?php
$arr[$key2] = 20;

php_collide

数组拓容

插入key为$key3, $key3.hash为1, 值为30的元素, 造成数组的load factor过高, 触发拓容

1
2
<?php
$arr[$key3] = 30;

php_extend

删除

删除key为$key2的元素

1
2
<?php
delete $arr[$key2];

php_del

遍历

1
2
3
4
5
6
7
8
9
<?php
foreach ($arr as $v) {
print $v . "\n";
}

// output
// 10
// 30

直接遍历arData, 最大边界为arData+nNumUsed, 跳过被UNDEF的元素.

与历史版本比较

改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// zend_hash.h
typedef struct _hashtable {
uint nTableSize;
uint nTableMask;
uint nNumOfElements;
ulong nNextFreeElement;
Bucket *pInternalPointer; /* Used for element traversal */
Bucket *pListHead;
Bucket *pListTail;
Bucket **arBuckets;
dtor_func_t pDestructor;
zend_bool persistent;
unsigned char nApplyCount;
zend_bool bApplyProtection;
#if ZEND_DEBUG
int inconsistent;
#endif
} HashTable;
  1. 连续的内存, 更高的效率
    通过简单地观察数据结构, 可以发现5.3.23版本是使用Bucket **arBuckets分配一块二维Bucket数组存放键值, 与7.0的预分配连续内存的做法不同, 每次需要插入元素都要申请一块sizeof(Bucket)的内存, 更容易造成内存碎片, 以及低效率;
  2. 更小的数据结构, 更优美的实现
    此外, 废弃了Bucket *pListHead以及Bucket *pListTail这两个头尾指针, 本来是为了实现数组特性, 实现正反序遍历等功能, 而7.0既然已经是连续的一块内存, 那么直接从Bucket *arData下标0处, 到达边界arData+nNumUsed就可以实现这个功能.
  3. 良好的局部性
    遍历数组有更好的局部性, 相较于5.3.23的链表遍历, 使得遍历时cache能更准确地加载数据, 拥有更好的时间空间局部性.

可以说这个7.0版本对PHP数组的优化是非常成功的, 除此之外, 对于其他的数据结构7.0也是有”瘦身”优化, 对于整个效率和内存占用有比较明显的改善.

PHP数组的内存布局

PHP数组的内存布局

内存布局

它的内存结构,目前的实现方案是分配一块连续的内存区间,用来存储hash元数据和具体数据。
连续的内存,上面部分是存储hash值到值地址的映射,而下面部分就是值的存储区域,如下图(摘自php-src/Zend/zend_types.h):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
* HashTable Data Layout
* =====================
*
* +=============================+
* | HT_HASH(ht, ht->nTableMask) |
* | ... |
* | HT_HASH(ht, -1) |
* +-----------------------------+
* ht->arData ---> | Bucket[0] |
* | ... |
* | Bucket[ht->nTableSize-1] |
* +=============================+
*/

新建数组

初始化一块内存,大小为HT_SIZE(ht) = HT_HASH_SIZE(ht) + HT_DATA_SIZE(ht),如果你查看zend_types.h,能看到这三个宏定义:

1
2
3
4
5
6
7
8
#define HT_SIZE(ht) \
(HT_HASH_SIZE(ht) + HT_DATA_SIZE(ht))

#define HT_DATA_SIZE(ht) \
((size_t)(ht)->nTableSize * sizeof(Bucket))

#define HT_HASH_SIZE(ht) \
(((size_t)(uint32_t)-(int32_t)(ht)->nTableMask) * sizeof(uint32_t))

这里要知道nTableMask是什么,从字面意思是掩码,它的类型是uint32_t,值是(uint32_t)(-nTableSize),nTableSize最小为2,那么此时nTableMask为4294967294,表示成二进制,11111111 11111111 11111111 11111110,这个值在之后计算对应hash所在的下标值会用到。
下面就是这块内存最开始的状态,假设数组大小为2,nNumUsed即已有元素为0,则下面是初始状态:

1
2
3
4
5
6
7
8
9
10
11
12
/*
* HashTable Data Layout
* =====================
* ht->nNumUsed = 0
* +=============================+
* 0 | |
* 1 | |
* +-----------------------------+
* ht->arData ---> | |
* | |
* +=============================+
*/

插入key的hash值为12345678的元素E

事实上对于这块内存,我们仅仅是根据arData这个指针去使用,而它是Bucket* 类型,上半部分的类型是uint32_t* ,怎么在不超出上半部分边界的情况下进行索引,这里用到了刚才提到的nTableMask,下标为(int32_t)(HASH | nTableMask),计算出为-2,那么((uint32_t*)arData)[-2]所指向的就是下标为0处的内存地址,通过nTableMask保证不会越界。
E元素首先根据nNumUsed找到存储自身的位置,即ht->arData[ht->nNumUsed],并且修改hash元数据部分,0位置的值指向存储区域,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
/*
* HashTable Data Layout
* =====================
* ht->nNumUsed = 1
* +=============================+
* 0 | 0 |
* 1 | |
* +-----------------------------+
* ht->arData ---> | E | next->HT_INVALID_IDX
* | |
* +=============================+
*/

插入key的hash值为12345678的元素F

使用链表法解决冲突。

1
2
3
4
5
6
7
8
9
10
11
12
/*
* HashTable Data Layout
* =====================
* ht->nNumUsed = 2
* +=============================+
* 0 | 1*sizeof(Bucket) |
* 1 | |
* +-----------------------------+
* ht->arData ---> | E | next->HT_INVALID_IDX
* | F | next->0
* +=============================+
*/

哈希拓容,rehash

扩大数组大小,将数据转存到新数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* HashTable Data Layout
* =====================
* ht->nNumUsed = 2
* +=============================+
* 0 | 1*sizeof(Bucket) |
* 1 | |
* 2 | |
* 3 | |
* +-----------------------------+
* ht->arData ---> | E | next->HT_INVALID_IDX
* | F | next->0
* | |
* | |
* +=============================+
*/

Mit分布式系统课程-Lab2-PartB

前两天删了在github上的6.824, 因为课程里说了不要把源码放在公共的地方, 之前没删一直心有歉意, 还好进度没这么快, 应该对别人也不会造成影响.

言归正传, 这星期空余时间一直在看Lab2的Part B, 今天因为身体不适, 请假一天在家, 开始实现思路, 并记录一下自己遇到的一些大小问题.

目标

实现基于viewservice(Lab1实现的视图服务器, 用来记录主副服务器的地址)的键值服务器, 分别为primary(主服务器),Backup(备份服务器), 客户端通过viewservice获得primary的地址, 发送键值的存取请求, primary在响应请求的同时, 将数据备份到backup服务器. 实现完成后, 通过所有的unit test.

架构

2个kv服务器, viewservice以及客户端

  1. viewservice
    • 存储并提供primary以及backup的信息
    • 接受primary, backup的定时ping请求, 更新或保持两个服务器的位置
    • 提供查询当前服务器状态
  2. primary
    • 接受client的请求
    • 定期向viewservice报备自己的状态
    • 在backup服务器更换时负责将自身全部数据同步过去
  3. backup
    • 作为primary的备份存储
    • 定期向viewservice报备自己的状态
    • 在primary挂了之后, 提升为primary, 负责client的请求
  4. client
    • 向primary发送数据存储,查询的请求
    • 若primary返回”错误的服务器”时, 询问viewservice, 更新primary

遇到的挑战

  1. 实现时有些golang的特性不是特别了解, 一边打开golang的官方文档, 一边poc.
  2. 决定在什么时候需要容错? 是否可恢复? 在反复重试时, 是否要定最大重试次数, 以及失败的处理.

单元测试

当我运行go test来验证逻辑的时候, 实在不得不赞叹老师们详细的测试用例, 有几个case让我挠头.

  • TestAtMostOnce
    模拟服务器返回报文丢失的情况, 利用puthash-get验证, puthash是put操作的一种变体, 它要存储的值是先前的值和当前值的hash结果, 所以这个地方引入了存储状态, 我通过每次请求要求客户端发送一个unique id来记录此次操作, 尽管报文丢失, 客户端重试时还是能用这个id找回上次通信的内容.
  • TestConcurrentSame
    验证primary服务器只有在backup也处理成功的情况才算成功.
  • TestRepeatedCrash
    模拟存储服务器间断崩溃的情况, 就是要保证客户端的每个请求都有重试的可能, primary和backup的通信过程中, backup随时可能崩溃, 这个测试使用put-get验证.
  • TestRepeatedCrashUnreliable
    模拟存储服务器间断崩溃的同时, 网络也不稳定, 服务器的返回报文会丢失. 这个case的难点在它是使用puthash-get来验证的, 就是说, 你要在服务器不时崩溃的情况下保证以前的值不仅在primary服务器上存在, 也要在backup服务器上保存, 以免primary下一秒挂了.
  • TestPartition1
    模拟primary已经过期的情况下, 向他发送的请求, 能被它拒绝.

POC

backup服务器更换时, primary需要同步当前数据库所有数据到backup, 我不想用foreach来每条数据进行同步, 所以想既然rpc能传递这么多类型的参数, 那直接将map传递过去不就不用那么麻烦了?所以进行了一次poc.

server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import "net"
import "net/rpc"
import "os"
import "fmt"

type Server struct {
db map[string]string
}

func (server *Server) Sync(database map[string]string, reply *int) error {
fmt.Println("Start syncing...")
server.db = database
for k, v := range server.db {
fmt.Println(k, " => ", v)
}
*reply = 1
fmt.Println("Finish syncing...")
return nil
}

func main() {
server := new(Server)
server.db = make(map[string]string)
rpcs := rpc.NewServer()
rpcs.Register(server)
me := "/var/tmp/test1122"
os.Remove(me)
l, e := net.Listen("unix", me)
if e != nil {
fmt.Println("listen error")
}

for true {
conn, e := l.Accept()
if e != nil {
fmt.Println("accept error")
conn.Close()
return
}
go func() {
rpcs.ServeConn(conn)
}()
}
}
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main
import "net/rpc"
import "fmt"

func main() {
me := "/var/tmp/test1122"
c, e := rpc.Dial("unix", me)
if e != nil {
fmt.Println("dial error")
return
}
defer c.Close()

db := make(map[string]string)
db["name"] = "srg"
db["age"] = "25"
reply := 1
e = c.Call("Server.Sync", db, &reply)
if e != nil {
fmt.Println("sync error")
return
}
}

开两个terminal, 在$GOPATH目录先运行server.go, 再运行client.go, 最后server.go运行后的输出, 看来是可以的

1
2
3
4
5
$ go run server.go                                                             
Start syncing...
name => srg
age => 25
Finish syncing...

在我把这页的hint也看完的时候, 发现里面提到这个问题, 建议直接把这个map作为参数传递.

最后贴上过关纪念

微信本地测试

微信本地测试

项目里要测试微信公众号, 但又不愿意忍受每次改下代码就提交到版本库, 再发布到生产这个琐碎的流程, 所以必须有一个本地能测试的方法, 到知乎上找了一个方法(引用知乎操晓峰的回答).

原理是HTTP请求重定向, 将微信中要访问的url转发到你设置的代理服务器, 代理服务器转发到你的开发服务器.

有趣的是, 在公司一直没有完全成功, 总有一个页面是刷不出来, 查看Charles发现传输速度极慢, 真想吐槽公司的网络. 回家后重试了一次, 完全成功.

levelDB里缓存实现

leveldb的缓存机制

leveldb采用LRU机制, 利用键的哈希值前n位作为索引, 将要插入的键值对分派到指定的缓存区, 当缓存区的使用率大于总容量后, 优先淘汰最近最少使用的缓存, 独立的缓存区总量为2^n .

初始化ShardedLRUCache

  • 设置初始缓存容量, 并设置16个子分区的容量.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    static const int kNumShardBits = 4;
    static const int kNumShards = 1 << kNumShardBits;
    explicit ShardedLRUCache(size_t capacity)
    : last_id_(0) {
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
    for (int s = 0; s < kNumShards; s++) {
    shard_[s].SetCapacity(per_shard);
    }
    }

新建缓存

  • 由key的hash值的前kNumShardBits位作为子缓存区索引, 默认为前4位的索引位, 索引到指定的区, 子缓存区LRUCache接管新建缓存的任务.
    1
    2
    3
    4
    5
    6
    7
    class ShardedLRUCache : public Cache {
    ...
    virtual Handle* Insert(const Slice& key, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
    }
  • LRUCache结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // A single shard of sharded cache.
    class LRUCache {
    public:
    LRUCache();
    ~LRUCache();

    // Separate from constructor so caller can easily make an array of LRUCache
    void SetCapacity(size_t capacity) { capacity_ = capacity; }

    // Like Cache methods, but with an extra "hash" parameter.
    Cache::Handle* Insert(const Slice& key, uint32_t hash,
    void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value));
    Cache::Handle* Lookup(const Slice& key, uint32_t hash);
    void Release(Cache::Handle* handle);
    void Erase(const Slice& key, uint32_t hash);

    private:
    void LRU_Remove(LRUHandle* e);
    void LRU_Append(LRUHandle* e);
    void Unref(LRUHandle* e);

    // Initialized before use.
    size_t capacity_;

    // mutex_ protects the following state.
    port::Mutex mutex_;
    size_t usage_;

    // Dummy head of LRU list.
    // lru.prev is newest entry, lru.next is oldest entry.
    LRUHandle lru_;

    HandleTable table_;
    };

LRUCache才是真正具有缓存功能的结构, capacity_表示它的最大容量, mutex_规范各线程互斥地访问, usage_标志缓存中已用容量, lru_作为哑节点, 它的前节点是最新的缓存对象, 它的后节点是最老的缓存对象, table_是用来统计对象是否被缓存的哈希表.

  • LRUCache缓存生成

    1. 缓存的基础节点是LRUHandle, 储存节点的(键, 值, 哈希值, next, prev节点, 销毁处理函数等)信息. 综合上面的LRUCache结构也看到, 实际上, 缓存节点是双向列表存储, LRUHandle lru_这个哑节点用来分隔最常更新的节点和最不常更新节点.
    2. 当要将缓存节点插入缓存区, 先由哈希表判断缓存是否已存在, 若存在, 将其更新至最常更新节点; 若不存在, 则插入为最常更新节点. 同时更新哈希表HandleTable table_.
    3. 这里的HandleTable实现并没特殊之处, 我看是采用键哈希策略进行哈希, 如果键冲突则以链表进行存储.
    4. 利用二级指针对链表进行插入.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    LRUHandle* Insert(LRUHandle* h) {
    LRUHandle** ptr = FindPointer(h->key(), h->hash);
    LRUHandle* old = *ptr;
    h->next_hash = (old == NULL ? NULL : old->next_hash);
    *ptr = h;
    if (old == NULL) {
    ++elems_;
    if (elems_ > length_) {
    // Since each cache entry is fairly large, we aim for a small
    // average linked list length (<= 1).
    Resize();
    }
    }
    return old;
    }
  • 二级指针
    在HandleTable这个哈希表实现里, 有个FindPointer方法用来查找此对象是否已存在, 并返回LRUHandle**, 代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // Return a pointer to slot that points to a cache entry that
    // matches key/hash. If there is no such cache entry, return a
    // pointer to the trailing slot in the corresponding linked list.
    LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != NULL &&
    ((*ptr)->hash != hash || key != (*ptr)->key())) {
    ptr = &(*ptr)->next_hash;
    }
    return ptr;
    }

可见, 如果已存在节点, 则返回这个节点的LRUHandle**; 如果不存在, 返回的是可以保存这个LRUHandle*的地址.

  • LRUCache缓存查找
    如果缓存存在, 则将其放置到哑节点lru_的prev位置, 即最近使用节点, 相当于提升它的优先级, 便于下次快速查找; 如果不存在这个缓存, 返回NULL.

    小结

    看完leveldb的缓存实现, 在实现思路上, 是传统的lru算法, 使用哈希表判重, 根据缓存的请求时间提升缓存的优先级. 里面的细节例如使用哈希值的前n位进行路由, 路由到2^n个独立的缓存区, 各个缓存区维护自己的mutex进行并发控制; 哈希表在插入节点时判断空间使用率, 并进行自动扩容, 保证查找效率在O(1).

levelDB里integer编码

leveldb的数字编码方法

  • EncodeFixed32
    按照机器的字节存储顺序(大端, 小端), 原样存储到一块32位内存

  • EncodeFixed64
    按照机器的字节存储顺序(大端, 小端), 原样存储到一块64位内存

  • PutFixed32
    将EncodeFixed32处理过的内存块追加到目的地址

  • PutFixed64
    同PutFixed32

  • EncodeVarint32
    将32位的无符号整数以7个有效位, 1个标志位的形式编码.

对于Varint这种整数编码, 是为了减短数值较小的整数所占的存储空间. 实现方法是每8位, 低7位存储有效数值和高1位存储标记位(用来标记是否还有后续的数值), 以127和128这两个数为例:
127 => 0111 1111
第一个字节低7位是有效数值127, 最高位位0, 所以表示没有后续的数值, 于是算出它所代表的数值是127.

128 => 1111 1111 0000 0001
第一个字节低7位是有效数值127, 最高位位1, 所以表示有后续的数值, 相同计算方法算出后续值1, 最高位标志0表示结束, 127 + (1<<7) = 128, 因为每8位实际表示的是7位, 所以要将1左移7位, 若有更多位, 计算方式相同.

leveldb里的32位编码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
if (v < (1<<7)) {
*(ptr++) = v;
} else if (v < (1<<14)) {
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}

用了5个if来判断这个数落在哪个数值区间里, 32位的数值使用变长编码能使用1-5位来表示

leveldb里的64位编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// leveldb implementation
char* EncodeVarint64(char* dst, uint64_t v) {
static const int B = 128;
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
while (v >= B) {
*(ptr++) = (v & (B-1)) | B;
v >>= 7;
}
*(ptr++) = static_cast<unsigned char>(v);
return reinterpret_cast<char*>(ptr);
}

// My own implementation
char* EncodeVarint64(char* dst, uint64_t v) {
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
int flag = 1;
for (int offset = 0; flag > 0 && offset < 8; offset++) {
*(ptr++) = ((v >> (offset * 7)) & 127) | B;
flag = v >> (offset * 7);
}
return reinterpret_cast<char*>(ptr);
}

比较了我和leveldb的实现之后, 发现leveldb更简洁高效, 是因为我的实现维护了一个offset来记录当前的偏移, flag用来记录是否继续计算, 每次都需要维护这个offset和flag, leveldb直接使用v作为变量, 通过每次对它的修改, 来记录是否继续, 同时它只需要用低7位, 省去了记录偏移的麻烦, 简洁有力!

levelDB里自定义的状态

LevelDB状态码

LevelDB里专门定义了Status类,自定义了常见的状态码,用于函数的返回。

状态码类型

在Status类中以枚举类型定义Code:

0 => 处理成功
1 => 未找到
2 => 崩溃
3 => 不支持
4 => 无效的参数
5 => IO错误

并且有6个静态方法用来生成对应状态码的Status对象, 和4个判断方法来方便确认状态。

存储方式

状态码包括状态信息都是利用私有字符串变量state_存储,存储方式如下:

0 ~ 4字节:消息长度(消息1的长度len1+分割符的长度+消息2的长度len2)
4 ~ 5字节:状态码
5 ~ 5+len1:消息1的长度
(若消息2不为空)
5+len1 ~ 7+len1:分隔符的长度(分隔符为2个字符,分别为冒号和空格)
7+len1 ~ 7+len1+len2:消息2的长度

这种存储方式,另得所有操作都是基于这个state_变量的下标索引操作。

小结

定义异常在其它代码遇到错误时可以抛出适当的异常, 这样的实现方式我认为好处是初始化,销毁和复制操作简单,并且这些内容在内存中的位置紧密,有利于缓存,不知道是不是这样考虑才这么做的,但分成3个变量也无伤大雅。

MIT 6.824 Lab2, part1小结

关于MIT 6.824课程的Lab 2, part 1

从昨晚在宾馆到今天飞机上,都在过这几个test case,终于最后有点讨巧的过了。

View service是MIT介绍paxos之前用来入门的,它的原理是以时间先后的顺序确定主备服务器,由定期的心跳更新此关系。之所以放在单独的service里获取主备关系,是为了将主备关系从K-V系统中解耦,从而K-V系统不用关心键值操作之外的事情,让更加上层的系统进行处理。

遇到的问题:

  • view service当前的view,和下一个未确认的view在哪些时间点更新?
    • 怎么认为这个服务器作为主,又怎么确认其为Backup,Idle?
    • 什么时候能将这些认识更新到服务器当前view上?
  • 服务器看到的view service的当前view是怎样的?
    • 不同服务器查询当前view状态,获得的结果是一样的吗?

我用复杂难看的if-else语句最后实现这些逻辑,但不可否认,实现的漏洞很多,我理想的解决方案还是使用状态机,可以更加清晰容易更新(用图表展现)。

levelDB里skiplist的实现代码赏析

跳表的原理就是利用随机性建立索引,加速搜索,并且简化代码实现难度。具体的跳表原理不再赘述,主要是看了levelDB有一些实现细节的东西,凸显自己写的实现不足之处。

  • 去除冗余的key

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    template<typename Key, class Comparator>
    struct SkipList<Key,Comparator>::Node {
    explicit Node(const Key& k) : key(k) { }

    Key const key;

    // Accessors/mutators for links. Wrapped in methods so we can
    // add the appropriate barriers as necessary.
    Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
    }
    void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
    }

    // No-barrier variants that can be safely used in a few locations.
    Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
    }
    void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
    }

    private:
    // Array of length equal to the node height. next_[0] is lowest level link.
    port::AtomicPointer next_[1];
    };

    这里使用一个Node节点表示所有相同key,不同高度的节点集合,仅保留了key和不同高度的向右指针,并且使用NewNode来动态分配随即高度的向右指针集合,而next_就指向这指针集合。这也是c/c++ tricky的地方。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #include <stdio.h>
    struct Node {
    char str[1];
    };
    int main() {
    char* mem = new char[4];
    for (int i = 0; i < 4; i++) {
    mem[i] = i + '0';
    }
    Node* node = (Node*)mem;
    char* const pstr = node->str;
    for (int i = 0; i < 4; i++) {
    printf("%c", pstr[i]);
    }
    return 0;
    }

    就像上面这个简单的sample,成员str可以作为指针指向从数组下标0开始的元素,并且不受申明时的限制,不局限于大小1,索引至分配的最大的内存地址。

  • 简易随机数生成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    uint32_t Next() {
    static const uint32_t M = 2147483647L; // 2^31-1
    static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0
    // We are computing
    // seed_ = (seed_ * A) % M, where M = 2^31-1
    //
    // seed_ must not be zero or M, or else all subsequent computed values
    // will be zero or M respectively. For all other values, seed_ will end
    // up cycling through every number in [1,M-1]
    uint64_t product = seed_ * A;

    // Compute (product % M) using the fact that ((x << 31) % M) == x.
    seed_ = static_cast<uint32_t>((product >> 31) + (product & M));
    // The first reduction may overflow by 1 bit, so we may need to
    // repeat. mod == M is not possible; using > allows the faster
    // sign-bit-based test.
    if (seed_ > M) {
    seed_ -= M;
    }
    return seed_;
    }

可以看到,他使用A和M对种子进行运算,达到一定数据范围内不会重复的数集,而里面对于(product % M),使用(product >> 31) + (product & M)进行运算优化,考虑右移和与操作的代价远小于取余操作。

  • 简洁清晰的私有帮助方法,帮助寻找小于指定key的节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    template<typename Key, class Comparator>
    typename SkipList<Key,Comparator>::Node*
    SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
    Node* x = head_;
    int level = GetMaxHeight() - 1;
    while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
    if (level == 0) {
    return x;
    } else {
    // Switch to next list
    level--;
    }
    } else {
    x = next;
    }
    }
    }
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×