在正式分析核心代码之前,我们还是先从一些相对独立、容易理解的模块入手,这篇文章主要讲讲 util 目录下面的代码。LevelDB 包含了如下几个工具模块:
Arena
Arena 是一个简单的内存分配器,为 MemTable 提供内存分配的功能。它通过大块分配内存,减少了频繁调用系统函数的开销。而且其生命周期和 MemTable 一致,MemTable 销毁时,Arena 分配的内存也会一并释放。比较简单,代码就不贴了,直接看流程图吧:


Arena 每次向系统申请一个较大的内存块(默认 4MB),然后会记录这个内存块空闲位置指针和剩余空间大小。剩余空间不足时,如果申请的空间大于 kBlockSize/4 也就是 1KB 时,会直接申请对应长度的内存块返回,不更新当前剩余内存块的起始位置和大小,这样下次申请小空间时依然可以使用当前余下的空间;否则将放弃当前剩余空间,重新申请一块 4KB 的内存块再分配
BloomFilterPolicy
LevelDB 使用了布隆过滤器(Bloom Filter)来减少不必要的磁盘访问,从而提高读取性能。布隆过滤器是一种空间效率很高的概率型数据结构,可以用来测试一个元素是否在一个集合中。LevelDB 用一个 FilterPolicy 抽象类来定义布隆过滤器的接口,我们来看看它是如何定义和使用的。BloomFilterPolicy 有两个成员变量,一个是 bits_per_key_,表示每个键分配的位数,另一个是 k_,表示哈希函数的数量,一般来说,k_ 取 bits_per_key_ * ln2,这样可以在给定的空间下最小化误判率1
void CreateFilter(const Slice* keys, int n, std::string* dst) const override {
// Compute bloom filter size (in both bits and bytes)
size_t bits = n * bits_per_key_;
// For small n, we can see a very high false positive rate. Fix it
// by enforcing a minimum bloom filter length.
if (bits < 64) bits = 64;
size_t bytes = (bits + 7) / 8;
bits = bytes * 8;
const size_t init_size = dst->size();
dst->resize(init_size + bytes, 0);
dst->push_back(static_cast<char>(k_)); // Remember # of probes in filter
char* array = &(*dst)[init_size];
for (int i = 0; i < n; i++) {
// Use double-hashing to generate a sequence of hash values.
// See analysis in [Kirsch,Mitzenmacher 2006].
uint32_t h = BloomHash(keys[i]);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k_; j++) {
const uint32_t bitpos = h % bits;
array[bitpos / 8] |= (1 << (bitpos % 8));
h += delta;
}
}
}CreateFilter 方法接受一个键数组 keys 和它的大小 n,然后计算出布隆过滤器所需的位数和字节数。接着,它会为每个键计算多个哈希值,并将对应的位设置为 1。这里使用了双重哈希技术来生成多个哈希值,以减少哈希冲突的概率。
再来看看布隆过滤器是怎么使用的吧
bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const override {
const size_t len = bloom_filter.size();
if (len < 2) return false;
const char* array = bloom_filter.data();
const size_t bits = (len - 1) * 8;
// Use the encoded k so that we can read filters generated by
// bloom filters created using different parameters.
const size_t k = array[len - 1];
if (k > 30) {
// Reserved for potentially new encodings for short bloom filters.
// Consider it a match.
return true;
}
uint32_t h = BloomHash(key);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k; j++) {
const uint32_t bitpos = h % bits;
if ((array[bitpos / 8] & (1 << (bitpos % 8))) == 0) return false;
h += delta;
}
return true;
}KeyMayMatch 方法接受一个键 key 和一个布隆过滤器 bloom_filter,它会计算出该键的多个哈希值,并检查对应的位是否都为 1。如果有任何一位为 0,则说明该键一定不在集合中,返回 false;否则返回 true,表示该键可能在集合中
LRUCache
LRU 算法是一种很常见的缓存淘汰策略,感觉到处都能看到它的身影。Least Recently Used,思想很简单,就是把最近最少使用的数据淘汰掉以腾出空间给新数据,保证缓存中保留的是最近最常用的数据,从而提高命中率。实现思路也基本都是类似的,用一个哈希表来存储缓存数据,使用一个双向链表来维护数据的使用顺序。每次访问数据时,将对应的节点移动到链表头部,表示该数据最近被使用过;当缓存满了需要淘汰数据时,从链表尾部删除节点,表示该数据最久未被使用
LevelDB 也实现了自己的一套 LRUCache
,这里重点来看看它是如何封装和设计的,共有如下几个主要类:
LRUHandle:双向链表的结点,同时包含引用数和哈希表的下一个结点指针HandleTable:哈希表,通过链式地址法解决冲突,存储LRUHandle结点。通过该类可以快速查找缓存数据,然后用LRUHandle的接口来更新使用顺序LRUCache:LRU 缓存的主体类,维护哈希表和双向链表,同时为正在使用的LRUHandle维护了一个in_use_链表,表示当前正在被使用的缓存数据,防止被误删ShardedLRUCache:分片的 LRU 缓存,将缓存划分为多个独立的分片,每个分片维护自己的 LRU 缓存实例,从而减少锁竞争,提高并发性能
首先来说 LRUHandle,它就是一个双向链表的节点,有一个 next 和 prev 指针指向前后节点。这里需要注意的就是 in_cache 和 refs 的联动,这两个变量共同维护了节点的生命周期。当 in_cache 为 true 时,表示该节点在缓存中,此时 refs 至少为 2(一个来自缓存,一个来自用户);当 in_cache 为 false 时,表示该节点不在缓存中,此时 refs 至少为 1(来自用户)。当 refs 减少到 0 时,说明没有任何引用指向该节点,可以安全地删除它
接下来是 HandleTable,它是一个简单的哈希表,使用链式地址法解决冲突。它提供了 Lookup、Insert 和 Remove 方法,分别用于查找、插入和删除缓存数据。通过哈希函数计算键的哈希值,然后在对应的桶中查找或操作 LRUHandle 结点
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
void Resize() {
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != nullptr) {
LRUHandle* next = h->next_hash;
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};这里可以重点理解下 FindPointer 方法,其实就是在哈希表中查找对应键的节点,如果找到了就返回指向该节点的指针,否则返回指向该桶链表末尾的指针,方便后续插入操作使用
为啥要自己实现一套 Hash Table?
首先自己实现的比较简单,相对可控,去掉了很多不必要的功能。而且经过测试,HandleTable 的性能甚至还略优于一些编译器自带的实现
除了基本的 LRUCache 的实现,LevelDB 还提供了 ShardedLRUCache
,它将缓存划分为多个独立的分片,每个分片维护自己的 LRU 缓存实例,从而减少锁竞争,提高并发性能。具体实现也比较简单,就是通过哈希函数将键映射到不同的分片上,然后在对应的分片中进行缓存操作
SkipList
跳表也是一个相当经典的数据结构了,比如 Redis 的有序集合底层就是用跳表实现的。跳表通过多级索引来加速查找操作,基本思想是将链表划分为多个层级,每一层都是下一层的子集。最高层包含最少的节点,最低层包含所有节点。查找时,从最高层开始,逐层向下查找,直到找到目标节点:

高层级的节点比低节点少,为了方便增删操作,通常会使用随机化的方法来决定一个节点应该出现在多少层中。这样可以保证跳表的平均时间复杂度为 $O(\log n)$
跳表的原理其实不算复杂,但我猜没多少人自己实现过 我也是。正好可以学学 LevelDB 是如何实现一个支持并发读取的跳表的,代码在 db/skiplist.h
下,总共只有 300 多行,我们来仔细研究下。LevelDB 中对 SkipList 的实现增加了多线程并发访问方面的优化的代码,提供以下保证:
- 写:在修改跳表时,需要在用户代码侧加锁
- 读:在访问跳表(查找、遍历)时,只需保证跳表不被其他线程销毁即可,不必额外加锁
也就是说,用户侧只需要处理写写冲突,LevelDB 跳表保证没有读写冲突。这是因为在实现时,LevelDB 做了以下假设:
- 除非跳表被销毁,跳表节点只会增加而不会被删除,因为跳表对外根本不提供删除接口(会随着 MemTable 的销毁一起释放)
- 被插入到跳表中的节点,除了 next 指针其他域都是不可变的,并且只有插入操作会改变跳表
看看 SkipList 这个类有哪些成员变量:
class SkipList {
...
private:
...
// Immutable after construction
Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes
Node* const head_;
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
std::atomic<int> max_height_; // Height of the entire list
// Read/written only by Insert().
Random rnd_;只看成员变量,就已经能大概猜到它们是做什么的了。compare_ 是一个比较器,用于比较跳表中的键;arena_ 是一个内存分配器,用于分配跳表节点的内存;head_ 是跳表的头节点,所有节点都从这里开始;max_height_ 是跳表的当前最大高度,使用原子变量以支持并发读取;rnd_ 是一个随机数生成器,用于决定新节点的高度。不同层级的节点应该是在 Node 类中定义的,我们来看下它有哪些成员:
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
...
Key const key;
...
private:
// Array of length equal to the node height. next_[0] is lowest level link.
std::atomic<Node*> next_[1];
};这里刚看其实蛮困惑的,为啥 next_ 数组长度只有 1?Node 有可能是个多层节点啊。其实 LevelDB 为了尽可能节省内存,使用了变长数组的技巧。看看如何创建 Node 的代码就知道了:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::NewNode(
const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}在实际创建节点时,会根据节点高度从 Arena 分配一块大小刚好的空间,然后再用 placement new 直接在这块内存上构造 Node 对象。这样 next_ 数组就可以根据节点高度动态变化了
placement new
placement new 是 C++ 中的一种特殊的 new 操作符,用于在预先分配好的内存上构造对象。它的语法是 new (pointer) Type(arguments),其中 pointer 是指向预分配内存的指针,Type 是要构造的对象类型,arguments 是传递给构造函数的参数。使用 placement new 可以避免额外的内存分配开销,适用于需要高性能或内存受限的场景
下面看看 Insert 方法是如何插入新节点的:
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);
// Our data structure does not allow duplicate insertion
assert(x == nullptr || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// 在没有任何同步的情况下修改 max_height_ 是可以的。一个并发的读取者如果观察到 max_height_ 的新值,
// 要么会看到 head_ 的旧指针值(nullptr),要么会看到下面循环设置的新值。在前一种情况下,读取者会立即
// 降级到下一个层级,因为 nullptr 排在所有键之后。在后一种情况下,读取者会使用新节点。
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}Insert 方法首先调用 FindGreaterOrEqual 找到新节点应该插入的位置,并记录每一层的前驱节点。然后生成一个随机高度,如果新节点的高度超过当前最大高度,就更新 max_height_。最后创建新节点,并在每一层将其插入到正确的位置。下面是 FindGreaterOrEqual 方法的实现:
template <typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
if (prev != nullptr) prev[level] = x;
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}从头节点开始,逐层向下查找,直到找到第一个大于等于目标键的节点。如果传入了 prev 数组,就会在每一层记录前驱节点,方便后续插入操作使用
SkipList 中内存序的使用
SkipList 中使用了不同的内存序来保证并发读取的正确性。内存序决定了不同线程对共享变量的可见性和操作顺序。SkipList 中主要使用了以下几种内存序:
memory_order_relaxed:不提供任何同步保证,适用于不需要跨线程同步的场景memory_order_acquire:保证在读取操作之后的操作不会被重排序到读取操作之前memory_order_release:保证在写入操作之前的操作不会被重排序到写入操作之后
在 SkipList 中,max_height_ 使用 memory_order_relaxed 进行更新,因为并发读取者可以容忍观察到旧值或新值。节点的 next_ 指针使用 memory_order_acquire 和 memory_order_release 来确保读取和写入操作的正确顺序,从而保证跳表结构的一致性
LevelDB 中还有设置为 NoBarrier 的方法,即 NoBarrier_SetNext 和 NoBarrier_Next,这两个方法只在 Insert 方法中使用
// 线程 A(写者)
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// ① 无屏障设置(在本线程内初始化)
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
// ② Release 屏障发布(让其他线程能看到 ① 的效果)
prev[i]->SetNext(i, x); // memory_order_release
}
// 线程 B(读者)
Node* next = x->Next(0); // memory_order_acquire
// 现在能确保看到 SetNext 前的所有写操作
在 Node 进行初始化时不进行任何内存屏障,仅在发布节点时使用 memory_order_release,这样可以减少不必要的内存屏障开销,提高性能。而读取节点时使用 memory_order_acquire,确保读取操作之后的操作不会被重排序到读取操作之前,从而保证数据的一致性
Slice
Slice 本身不存储任何数据,仅记录 char 数组头部指针和长度,本身并不包含内存的管理与分配。这样做的好处是避免了不必要的内存拷贝和分配开销,提高了性能。Slice 主要用于表示字符串或二进制数据的片段,提供了一些常用的操作方法,比如比较、截取、查找等。下面展示它的一些常用方法:
class LEVELDB_EXPORT Slice {
public:
...
// Return a pointer to the beginning of the referenced data
const char* data() const { return data_; }
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
const char* begin() const { return data(); }
const char* end() const { return data() + size(); }
// Return the ith byte in the referenced data.
// REQUIRES: n < size()
char operator[](size_t n) const;
// Change this slice to refer to an empty array
void clear();
// Drop the first "n" bytes from this slice.
void remove_prefix(size_t n);
// Return a string that contains the copy of the referenced data.
std::string ToString() const;
// Three-way comparison. Returns value:
// < 0 iff "*this" < "b",
// == 0 iff "*this" == "b",
// > 0 iff "*this" > "b"
int compare(const Slice& b) const;
// Return true iff "x" is a prefix of "*this"
bool starts_with(const Slice& x) const;
private:
const char* data_;
size_t size_;
};
inline bool operator==(const Slice& x, const Slice& y);
inline bool operator!=(const Slice& x, const Slice& y);
inline int Slice::compare(const Slice& b) const ;Comparator
在 LevelDB 中,Comparator 相关的操作随处可见,毕竟很多键值都需要排序和比较。Comparator 是一个抽象类,定义了一些通用的接口,下面看看它的定义:
class LEVELDB_EXPORT Comparator {
public:
virtual ~Comparator();
// Three-way comparison. Returns value:
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
virtual int Compare(const Slice& a, const Slice& b) const = 0;
// The name of the comparator. Used to check for comparator
// mismatches (i.e., a DB created with one comparator is
// accessed using a different comparator.
//
// The client of this package should switch to a new name whenever
// the comparator implementation changes in a way that will cause
// the relative ordering of any two keys to change.
//
// Names starting with "leveldb." are reserved and should not be used
// by any clients of this package.
virtual const char* Name() const = 0;
// Advanced functions: these are used to reduce the space requirements
// for internal data structures like index blocks.
// If *start < limit, changes *start to a short string in [start,limit).
// Simple comparator implementations may return with *start unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const = 0;
// Changes *key to a short string >= *key.
// Simple comparator implementations may return with *key unchanged,
// i.e., an implementation of this method that does nothing is correct.
virtual void FindShortSuccessor(std::string* key) const = 0;
};这里最后两个方法 FindShortestSeparator 和 FindShortSuccessor 比较有意思,它们用于压缩键字符串,从而减少索引块等数据结构的空间占用:
FindShortestSeparator:接受两个参数start和limit,如果*start < limit,则将*start修改为一个介于start和limit之间的较短字符串。当一个SSTable块结束时,我们需要一个分隔符来区分当前块的最后一个键和下一个块的第一个键,这个方法可以在不改变键的相对顺序的前提下,减少索引块中存储的键的长度FindShortSuccessor:接受一个参数key,将*key修改为一个大于等于*key的较短字符串。这个方法用于索引块的边界,简化索引键表示
举个例子
假设有两个键 start = "abcde" 和 limit = "abfgh",调用 FindShortestSeparator(&start, limit) 后,start 可能会被修改为 "abf",因为它介于 abcde 和 abfgh 之间,并且比原始的 start 更短。类似地,调用 FindShortSuccessor(&key) 可能会将 key = "xyz" 修改为 "y",因为它是一个大于等于 xyz 的较短字符串
LevelDB 提供了一个默认的比较器 BytewiseComparator,它按字节顺序对键进行比较。同时还有一个 InternalKeyComparator,用于比较内部键(InternalKey),它会先比较用户键,然后比较序列号和类型,以确保正确的排序顺序
Iterator
Iterator 是 LevelDB 中用于遍历数据结构(如跳表、SSTable 等)的抽象接口。除了 Next,Prev 等基本方法外,Iterator 还定了义资源清理的回调函数 RegisterCleanup,允许用户在迭代器销毁时执行自定义的清理操作:
class LEVELDB_EXPORT Iterator {
public:
...
// Clients are allowed to register function/arg1/arg2 triples that
// will be invoked when this iterator is destroyed.
//
// Note that unlike all of the preceding methods, this method is
// not abstract and therefore clients should not override it.
using CleanupFunction = void (*)(void* arg1, void* arg2);
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
private:
// Cleanup functions are stored in a single-linked list.
// The list's head node is inlined in the iterator.
struct CleanupNode {
// True if the node is not used. Only head nodes might be unused.
bool IsEmpty() const { return function == nullptr; }
// Invokes the cleanup function.
void Run() {
assert(function != nullptr);
(*function)(arg1, arg2);
}
// The head node is used if the function pointer is not null.
CleanupFunction function;
void* arg1;
void* arg2;
CleanupNode* next;
};
CleanupNode cleanup_head_;
};这里将清理函数存储在一个单链表中,迭代器销毁时会依次调用这些函数。用户可以通过 RegisterCleanup 方法注册自己的清理操作,比如释放资源、关闭文件等。整个项目中有三处地方使用了该方法:
- table_cache.cc#L93
:
result->RegisterCleanup(&UnrefEntry, cache_, handle)- 释放缓存句柄 - db_impl.cc#L1102
:
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr)- 释放内存表和版本引用 - table.cc#L198
:
iter->RegisterCleanup(&DeleteBlock, block, nullptr)- 删除数据块
这样设计的好处是:Iterator 不需要知道具体持有哪些资源,调用者可以灵活注册清理逻辑,确保资源在 Iterator 销毁时自动释放,避免内存泄漏。

