LevelDB | 0x00 开篇

今天开始记录一下学习 LevelDB 的笔记。虽说网上已经有很多相关资料,但还是想自己动手整理一下,加深理解,同时方便查阅。所以有的地方可能不会说的特别细,读者请自行搜索。本系列目录如下:

本篇作为系列的第一篇,主要介绍 LevelDB 的基本概念和操作

首先来聊聊 LevelDB 是干什么的。与 MySQL、Redis 这些传统数据库不同,LevelDB 是一个键值存储数据库。它由 Google 开发,主要用于 存储大量数据,并且以高效的写性能著称。LevelDB 采用了 LSM Tree(Log-Structured Merge Tree)作为其底层数据结构,这使得它在处理大量写操作时表现出色。下表简要给出三者的对比:

名称存储形式场景
MySQL关系型数据存储复杂查询,比如电商系统
Redis内存数据存储高性能缓存,消息队列等场景
LevelDB键值存储大规模数据存储,如浏览器缓存、区块链数据存储等

基本概念和架构

先从整体入手,有个印象。

leveldb-arch

LevelDB 主要由以下部分组成:

  • MemTable:MemTable(内存表)是数据写入的第一站。所有新的写入操作,无论是插入、更新还是删除,都会首先被记录到这个驻留在内存的跳表中。它就像系统的高速缓存区,提供了极快的随机写入和读取能力,确保最新的数据能被瞬时访问。其内部按键有序排列的特性,也为后续的高效查找和持久化奠定了基础
  • Immutable MemTable:当 MemTable 增长到一定大小时,LevelDB 会将其转换为 Immutable MemTable(不可变内存表)。这是一个非常巧妙的设计,它意味着这个内存表变成了一个只读的、冻结的版本。此时,系统会立即创建一个新的空 MemTable 来接收后续的写入请求,而那个被冻结的 Immutable MemTable 则由后台线程负责将其内容安全地、异步地写入磁盘。这个转换过程几乎是瞬间完成的,它巧妙地实现了读写分离,前台写入不会被后台的持久化I/O操作阻塞
  • Log:为了保证数据的持久性,LevelDB 采用了 Write-Ahead Log(预写日志,通常简称为 Log 或 Journal)​ 机制。每一次数据写入,都会在真正修改内存中的 MemTable 之前,被追加写入到一个顺序的日志文件中。这个日志是系统崩溃恢复的“救命稻草”,即使系统意外宕机,MemTable 中的数据丢失,在重启时也能通过重放这个日志文件,完整地重建出崩溃前的内存表状态,从而确保已确认的写入不会丢失
  • SSTable:当 Immutable MemTable 被后台线程持久化到磁盘时,它便转换为了 SSTable(Sorted String Table,有序字符串表)。SSTable 是 LevelDB 在磁盘上存储数据的最终形态,它是一个不可变的、内部按键有序排列的数据文件。LevelDB 会通过“层级压缩”过程,将大量小型的 SSTable 文件逐步合并、整理为更大型且有序的文件,并放置在不同的层级中。这种设计极大地优化了范围查询的效率,并且通过后台的合并过程,持续回收过期或重复的数据,控制存储空间的膨胀
  • Manifest:随着数据文件的不断增删与合并,数据库的文件构成时刻在变化。Manifest(清单文件)​ 就像一本详尽的数据库“编年史”,它忠实记录了每一次SSTable文件的变更——哪些文件被新增,哪些文件被删除。它本质上是一个追加写入的日志,保存了数据库在任意时间点的完整“快照”版本信息,确保系统在任何时候都能知道应该从哪些文件中去查找数据,是保证数据视图一致性的基石
  • Current:最后,在众多不断生成的 Manifest 文件中,Current(当前文件)​ 扮演了一个轻量级“指针”的角色。它本身不存储任何元数据,其内容仅仅是一个简单的字符串,指明了当前应该使用哪一个 Manifest 文件。在数据库启动时,系统首先读取这个 Current 文件,就能立刻定位到最新的、正确的元数据版本,从而快速完成初始化。这个简单的设计,优雅地解决了元数据版本定位的问题,是整个恢复流程的起点

如何安装

既然是学习,那就从源码开始安装,LevelDB 的源码托管在 GitHub 上,代码克隆下来,跟着 README 的步骤即可安装。我是在 WSL2 的 Ubuntu 环境下安装的,工具链是 Clang 18.1.3,在构建时遇到了报错,通过对下面两处位置进行修改解决了问题,仅供参考:

DIFF
diff --git a/util/env_posix.cc b/util/env_posix.cc
index c249032..dbaaa51 100644
--- a/util/env_posix.cc
+++ b/util/env_posix.cc
@@ -874,7 +874,8 @@ class SingletonEnv {
 #endif  // !defined(NDEBUG)
     static_assert(sizeof(env_storage_) >= sizeof(EnvType),
                   "env_storage_ will not fit the Env");
-    static_assert(std::is_standard_layout_v<SingletonEnv<EnvType>>);
+    static_assert(std::is_standard_layout<SingletonEnv<EnvType>>::value,
+                  "SingletonEnv must be standard-layout");
     static_assert(
         offsetof(SingletonEnv<EnvType>, env_storage_) % alignof(EnvType) == 0,
         "env_storage_ does not meet the Env's alignment needs");
diff --git a/util/no_destructor.h b/util/no_destructor.h
index c28a107..147a54c 100644
--- a/util/no_destructor.h
+++ b/util/no_destructor.h
@@ -21,7 +21,8 @@ class NoDestructor {
   explicit NoDestructor(ConstructorArgTypes&&... constructor_args) {
     static_assert(sizeof(instance_storage_) >= sizeof(InstanceType),
                   "instance_storage_ is not large enough to hold the instance");
-    static_assert(std::is_standard_layout_v<NoDestructor<InstanceType>>);
+    static_assert(std::is_standard_layout<NoDestructor<InstanceType>>::value,
+                  "NoDestructor must be standard-layout");
     static_assert(
         offsetof(NoDestructor, instance_storage_) % alignof(InstanceType) == 0,
         "instance_storage_ does not meet the instance's alignment requirement");
点击展开查看更多

初步照面

学习一个新组件,从常用接口入手是一个不错的选择。该怎么学习接口呢?直接让 AI 写段代码看看吧,有个整体印象,还能顺便调试一下。下面是生成的代码:

CPP
#include <cassert>
#include <chrono>
#include <iostream>
#include <string>

#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "leveldb/write_batch.h"

class LevelDBDemo {
 public:
  LevelDBDemo(const std::string& db_path) : db_path_(db_path), db_(nullptr) {}

  ~LevelDBDemo() { Close(); }

  // 1. 打开数据库
  bool Open() {
    leveldb::Options options;
    options.create_if_missing = true;                           // 不存在时创建
    options.error_if_exists = false;                            // 存在时不报错
    options.write_buffer_size = 4 * 1024 * 1024;                // 4MB写缓冲区
    options.max_file_size = 2 * 1024 * 1024;                    // 2MB SST文件
    options.block_size = 4 * 1024;                              // 4KB块大小
    options.compression = leveldb::kSnappyCompression;          // 压缩
    options.filter_policy = leveldb::NewBloomFilterPolicy(10);  // 布隆过滤器

    leveldb::Status status = leveldb::DB::Open(options, db_path_, &db_);
    if (!status.ok()) {
      std::cerr << "打开数据库失败: " << status.ToString() << std::endl;
      return false;
    }
    std::cout << "✓ 数据库打开成功: " << db_path_ << std::endl;
    return true;
  }

  // 2. 关闭数据库
  void Close() {
    if (db_) {
      delete db_;
      db_ = nullptr;
      std::cout << "✓ 数据库已关闭" << std::endl;
    }
  }

  // 3. 基本Put操作
  bool Put(const std::string& key, const std::string& value) {
    leveldb::WriteOptions write_options;
    write_options.sync = false;  // 异步写入,性能更好

    leveldb::Status status = db_->Put(write_options, key, value);
    if (!status.ok()) {
      std::cerr << "Put失败: " << status.ToString() << std::endl;
      return false;
    }
    return true;
  }

  // 4. 基本Get操作
  bool Get(const std::string& key, std::string* value) {
    leveldb::ReadOptions read_options;
    read_options.verify_checksums = false;  // 不验证校验和,更快
    read_options.fill_cache = true;         // 填充缓存

    leveldb::Status status = db_->Get(read_options, key, value);
    if (status.IsNotFound()) {
      std::cout << "键未找到: " << key << std::endl;
      return false;
    } else if (!status.ok()) {
      std::cerr << "Get失败: " << status.ToString() << std::endl;
      return false;
    }
    return true;
  }

  // 5. 基本Delete操作
  bool Delete(const std::string& key) {
    leveldb::WriteOptions write_options;
    leveldb::Status status = db_->Delete(write_options, key);
    if (!status.ok()) {
      std::cerr << "Delete失败: " << status.ToString() << std::endl;
      return false;
    }
    return true;
  }

  // 6. 批量写入操作(原子性)
  bool BatchWrite() {
    leveldb::WriteBatch batch;

    // 批量添加Put操作
    batch.Put("batch_key1", "batch_value1");
    batch.Put("batch_key2", "batch_value2");
    batch.Put("batch_key3", "batch_value3");

    // 批量添加Delete操作
    batch.Delete("batch_key1");  // 立即删除刚添加的

    // 执行批量写入
    leveldb::WriteOptions write_options;
    write_options.sync = true;  // 同步写入,保证持久化

    leveldb::Status status = db_->Write(write_options, &batch);
    if (!status.ok()) {
      std::cerr << "批量写入失败: " << status.ToString() << std::endl;
      return false;
    }
    std::cout << "✓ 批量写入完成" << std::endl;
    return true;
  }

  // 7. 迭代器遍历
  void IteratorDemo() {
    std::cout << "\n=== 迭代器遍历 ===" << std::endl;

    leveldb::ReadOptions read_options;
    read_options.snapshot = db_->GetSnapshot();  // 使用快照,保证一致性视图

    // 创建迭代器
    leveldb::Iterator* it = db_->NewIterator(read_options);

    // 正向遍历
    std::cout << "正向遍历:" << std::endl;
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
      std::cout << "  " << it->key().ToString() << " => " << it->value().ToString() << std::endl;
    }

    // 反向遍历
    std::cout << "\n反向遍历:" << std::endl;
    for (it->SeekToLast(); it->Valid(); it->Prev()) {
      std::cout << "  " << it->key().ToString() << " => " << it->value().ToString() << std::endl;
    }

    // 范围查询
    std::cout << "\n范围查询 [key1, key5]:" << std::endl;
    it->Seek("key1");
    while (it->Valid() && it->key().ToString() <= "key5") {
      std::cout << "  " << it->key().ToString() << " => " << it->value().ToString() << std::endl;
      it->Next();
    }

    // 清理
    db_->ReleaseSnapshot(read_options.snapshot);
    delete it;
  }

  // 8. 快照使用
  void SnapshotDemo() {
    std::cout << "\n=== 快照演示 ===" << std::endl;

    // 创建快照
    const leveldb::Snapshot* snapshot = db_->GetSnapshot();

    // 在快照之后写入新数据
    Put("snapshot_key", "value_after_snapshot");

    // 用快照读取(应该看不到新数据)
    leveldb::ReadOptions snapshot_options;
    snapshot_options.snapshot = snapshot;

    std::string value;
    leveldb::Status status = db_->Get(snapshot_options, "snapshot_key", &value);
    if (status.IsNotFound()) {
      std::cout << "快照中未找到新写入的键" << std::endl;
    }

    // 释放快照
    db_->ReleaseSnapshot(snapshot);
    std::cout << "✓ 快照已释放" << std::endl;
  }

  // 9. 获取属性
  void GetPropertyDemo() {
    std::cout << "\n=== 数据库属性 ===" << std::endl;

    std::string value;
    if (db_->GetProperty("leveldb.stats", &value)) {
      std::cout << "统计信息:\n" << value << std::endl;
    }

    if (db_->GetProperty("leveldb.sstables", &value)) {
      std::cout << "SSTables信息:\n" << value << std::endl;
    }

    if (db_->GetProperty("leveldb.approximate-memory-usage", &value)) {
      std::cout << "内存使用: " << value << std::endl;
    }
  }

  // 10. 手动压缩
  void CompactRangeDemo() {
    std::cout << "\n=== 手动压缩 ===" << std::endl;

    // 压缩指定键范围
    leveldb::Slice start("key1");
    leveldb::Slice end("key9");
    db_->CompactRange(&start, &end);

    std::cout << "✓ 键范围 [" << start.ToString() << ", " << end.ToString() << "] 已压缩" << std::endl;
  }

  // 11. 获取数据库大小估算
  void GetApproximateSizesDemo() {
    std::cout << "\n=== 数据库大小估算 ===" << std::endl;

    leveldb::Range ranges[2];
    ranges[0] = leveldb::Range("a", "m");
    ranges[1] = leveldb::Range("m", "z");

    uint64_t sizes[2];
    db_->GetApproximateSizes(ranges, 2, sizes);

    std::cout << "范围 [a, m) 大约: " << sizes[0] << " 字节" << std::endl;
    std::cout << "范围 [m, z) 大约: " << sizes[1] << " 字节" << std::endl;
  }

  // 12. 调试:检查键是否存在
  bool KeyExists(const std::string& key) {
    std::string value;
    leveldb::Status status = db_->Get(leveldb::ReadOptions(), key, &value);
    return status.ok();
  }

  // 13. 清空所有数据(危险操作)
  bool ClearAll() {
    leveldb::WriteBatch batch;

    // 遍历并删除所有键
    leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
    for (it->SeekToFirst(); it->Valid(); it->Next()) {
      batch.Delete(it->key().ToString());
    }
    delete it;

    leveldb::WriteOptions write_options;
    write_options.sync = true;

    leveldb::Status status = db_->Write(write_options, &batch);
    if (!status.ok()) {
      std::cerr << "清空数据失败: " << status.ToString() << std::endl;
      return false;
    }

    std::cout << "✓ 所有数据已清空" << std::endl;
    return true;
  }

  // 运行完整演示
  void RunDemo() {
    if (!Open()) {
      return;
    }

    std::cout << "\n=== LevelDB 接口演示 ===" << std::endl;

    // 测试基本操作
    std::cout << "\n1. 基本操作测试:" << std::endl;

    // 写入一些测试数据
    Put("key1", "value1");
    Put("key2", "value2");
    Put("key3", "value3");
    Put("key4", "value4");
    Put("key5", "value5");

    // 读取测试
    std::string value;
    if (Get("key3", &value)) {
      std::cout << "读取 key3: " << value << std::endl;
    }

    // 更新操作
    Put("key3", "updated_value3");
    if (Get("key3", &value)) {
      std::cout << "更新后 key3: " << value << std::endl;
    }

    // 删除测试
    Delete("key4");
    if (!Get("key4", &value)) {
      std::cout << "key4 已删除" << std::endl;
    }

    // 运行各个功能演示
    BatchWrite();
    IteratorDemo();
    SnapshotDemo();
    GetPropertyDemo();
    GetApproximateSizesDemo();
    CompactRangeDemo();

    // 性能测试
    PerformanceTest();

    // 清理
    ClearAll();
  }

  // 性能测试
  void PerformanceTest() {
    std::cout << "\n=== 性能测试 ===" << std::endl;

    auto start = std::chrono::high_resolution_clock::now();

    // 写入性能测试
    leveldb::WriteBatch batch;
    const int NUM_WRITES = 1000;

    for (int i = 0; i < NUM_WRITES; ++i) {
      batch.Put("perf_key_" + std::to_string(i), "perf_value_" + std::to_string(i));
    }

    leveldb::WriteOptions write_options;
    write_options.sync = false;  // 异步以获得更好性能
    db_->Write(write_options, &batch);

    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

    std::cout << "写入 " << NUM_WRITES << " 条记录耗时: " << duration.count() << "ms" << std::endl;
    std::cout << "平均每条: " << (duration.count() * 1000.0 / NUM_WRITES) << "us" << std::endl;
  }

 private:
  std::string db_path_;
  leveldb::DB* db_;
};

int main(int argc, char* argv[]) {
  // 设置数据库路径
  std::string db_path = "./test_leveldb";

  // 移除旧的数据库文件(如果有)
  leveldb::DestroyDB(db_path, leveldb::Options());

  // 创建并运行演示
  LevelDBDemo demo(db_path);
  demo.RunDemo();

  std::cout << "\n=== 演示完成 ===" << std::endl;
  std::cout << "可以检查数据库文件: " << db_path << std::endl;

  return 0;
}
点击展开查看更多

上面出现的相关接口包括:

  1. 打开数据库(leveldb::DB::Open
  2. 关闭数据库(delete db
  3. 基本的 Put/Get/Delete 操作(db->Putdb->Getdb->Delete
  4. 批量写入(leveldb::WriteBatch
  5. 迭代器遍历(db->NewIterator
  6. 快照使用(db->GetSnapshot
  7. 获取属性(db->GetProperty
  8. 手动压缩(db->CompactRange
  9. 获取数据库大小估算(db->GetApproximateSizes

下面先看看 LevelDB 存了哪些文件,然后针对几个主要的操作先看看实现函数,从而对 LevelDB 中的相关类和函数有个初步印象

LevelDB 存了哪些文件

跑完上面的测试,会发现 LevelDB 生成了如下的 6 个文件,现在只需要了解一下大概的作用:

BASH
test_leveldb
├── 000004.log  // 预写日志文件,负责记录所有写入操作,用于崩溃恢复
├── 000005.ldb  // SSTable 数据文件
├── CURRENT     // 当前使用的 Manifest
├── LOCK        // 用于进程间互斥访问
├── LOG         // 内部运行日志,记录各种信息、警告等
└── MANIFEST-000002   // 数据文件的增删和层级结构等关键元数据
点击展开查看更多

打开数据库

打开数据库的接口长这样:

CPP
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr);
点击展开查看更多

这个函数接受一个 optionsdbname,将得到的数据库指针赋值到 dbptr,函数的返回值是 Status。先来看看这个 Options 有哪些内容吧(位于 include/leveldb/options.h):

成员变量含义
comparatorDB 中 key 的比较方式,默认为 BytewiseComparator
create_if_missing如果 DB 不存在就创建一个新的
error_if_exists如果 DB 已经存在了打开时会返回错误
paranoid_checks启用严格检查,如果处理数据时发生错误会立即停止
env抽象的环境接口,统一 Windows&Linux 上相关的操作
info_log如果非空的话,DB产生的中间日志信息会记录在这里
write_buffer_size内存中的写缓冲区大小,越大性能越好,但在下次打开 DB 时会增加 recovery 的时间
max_open_filesDB 可同时打开的最大文件数,默认为 1000
block_cache数据会以 block 的形式进行组织管理。如果为空,leveldb 会自动创建一个 8MB 大小的内部 cache
block_size块大小。这个参数可以动态调整
block_restart_interval在键的增量编码中,两个重启点之间包含的键的数量,默认为 16
max_file_size文件的最大尺寸(其实就是 SST 文件的大小)
compression对 block 采用什么压缩算法,可选的有不压缩,snappy 以及 zstd
zstd_compression_levelzstd 的压缩等级
reuse_logs(实验性)若设置为 true,在打开 DB 时复用已有 MANIFEST 和日志文件
filter_policy默认为空。可设置为布隆过滤器来减少磁盘的读次数

好的,如果是第一次看的话,相信你头都要大了。等到把我这个系列看完,你就全都理解了。现在只是留个印象,先不深入解释。下面看看 Open 这个接口的具体实现

CPP
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  *dbptr = nullptr;

  DBImpl* impl = new DBImpl(options, dbname); // 1. 创建数据库内部实现实例
  impl->mutex_.Lock();                        // 2. 加锁,保护后续初始化过程的原子性
  VersionEdit edit;                           // 3. 创建版本编辑对象,用于记录本次启动的变更
  // Recover handles create_if_missing, error_if_exists
  bool save_manifest = false;
  // 4. 关键:执行数据库恢复。如果数据库存在则加载数据,如果不存在且设置create_if_missing则会创建新库
  Status s = impl->Recover(&edit, &save_manifest);

  // 5. 如果恢复成功,但当前内存表为空(例如是新数据库,或所有数据已持久化),则需创建新的日志文件和内存表
  if (s.ok() && impl->mem_ == nullptr) {
    // Create new log and a corresponding memtable.
    uint64_t new_log_number = impl->versions_->NewFileNumber();
    WritableFile* lfile;
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                     &lfile);
    if (s.ok()) {
      edit.SetLogNumber(new_log_number); // 记录新日志文件号到版本编辑
      impl->logfile_ = lfile;           // 持久的日志文件
      impl->logfile_number_ = new_log_number;
      impl->log_ = new log::Writer(lfile); // 日志写入器
      impl->mem_ = new MemTable(impl->internal_comparator_); // 新的内存表
      impl->mem_->Ref();
    }
  }

  // 6. 如果需要保存清单(通常在恢复过程中有数据变更),则将版本编辑应用到当前版本
  if (s.ok() && save_manifest) {
    edit.SetPrevLogNumber(0);  // 恢复后,不再需要旧的日志文件
    edit.SetLogNumber(impl->logfile_number_);
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_); // 持久化元数据变更
  }

  // 7. 清理无用文件,并可能触发后台压缩
  if (s.ok()) {
    impl->RemoveObsoleteFiles();
    impl->MaybeScheduleCompaction(); // 不立即执行,只是安排任务
  }
  impl->mutex_.Unlock();

  // 8. 最终处理:成功则返回实例指针,失败则清理资源
  if (s.ok()) {
    assert(impl->mem_ != nullptr);
    *dbptr = impl;
  } else {
    delete impl;
  }
  return s;
}
点击展开查看更多

对照注释可以先看个大概,这里面主要涉及了 DBImplVersionEdit 等类,以及 manifest、recover、log_number 等概念,其中的加锁解锁时机也可能是一个值得探究的点

关闭数据库

看看 DBImpl 的析构函数长什么样

CPP
DBImpl::~DBImpl() {
  // Wait for background work to finish.
  mutex_.Lock();
  shutting_down_.store(true, std::memory_order_release);
  while (background_compaction_scheduled_) {
    background_work_finished_signal_.Wait();
  }
  mutex_.Unlock();

  if (db_lock_ != nullptr) {
    env_->UnlockFile(db_lock_);
  }

  delete versions_;
  if (mem_ != nullptr) mem_->Unref();
  if (imm_ != nullptr) imm_->Unref();
  delete tmp_batch_;
  delete log_;
  delete logfile_;
  delete table_cache_;

  if (owns_info_log_) {
    delete options_.info_log;
  }
  if (owns_cache_) {
    delete options_.block_cache;
  }
}
点击展开查看更多

这里可以看出,LevelDB 应该有后台的 compation 任务,在析构时要提醒这些任务,然后才能继续析构其他成员。这里的 delete 顺序应该也是有讲究的。

基本的读写操作

LevelDB 是一个基于 LSM Tree 的数据库,PutDelete 都是先添加一条记录,真实的修改行为要到落盘时在处理。在实现上,这两个接口最终都通过组成 WriteBatch,然后用 Write 函数实现具体的细节

CPP
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  // 1. 创建一个写者对象,封装本次写入的请求
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;  // 是否要求同步刷盘
  w.done = false;         // 完成标志

  // 2. 加锁进入临界区
  MutexLock l(&mutex_);
  // 将当前写者加入等待队列尾部
  writers_.push_back(&w);
  
  // 3. 等待成为队首或被前一个写者完成
  //    如果不是队首且未完成,则等待条件变量唤醒
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  
  // 4. 如果已被前一个写者完成(批处理合并),直接返回结果
  if (w.done) {
    return w.status;
  }

  // 5. 至此,当前写者已是队首,开始处理写入
  // 可能需要临时解锁等待(如memtable已满需刷盘或压缩)
  Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;  // 记录本批次最后处理的写者
  
  // 6. 如果updates不为空(非压缩触发的写入),执行实际的写入逻辑
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    // 6.1 构建批处理组:合并队列中多个连续的写请求,提高吞吐
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    // 设置序列号并更新last_sequence
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // 6.2 写入日志和内存表(释放锁执行,减少锁占用时间)
    // 释放锁是安全的,因为当前写者是唯一的日志写入者和内存表写入者
    {
      mutex_.Unlock();
      // 先写WAL日志(Write-Ahead Logging),保证持久性
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      // 如果需要同步写入,则调用fsync确保数据落盘
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      // 写入内存表(MemTable)
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      
      // 处理同步错误:如果日志同步失败,数据库进入不可写状态
      if (sync_error) {
        // 日志文件状态不确定:刚添加的日志记录在DB重新打开时可能不存在
        // 因此强制DB进入一种模式,使所有后续写入都失败
        RecordBackgroundError(status);
      }
    }
    // 清理临时批处理
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    // 6.3 更新全局序列号
    versions_->SetLastSequence(last_sequence);
  }

  // 7. 处理本批次中的所有写者(从队首到last_writer)
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    // 将当前批处理的结果状态复制给所有被合并的写者
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();  // 唤醒等待的写者线程
    }
    // 到达本批次的最后一个写者,结束循环
    if (ready == last_writer) break;
  }

  // 8. 唤醒新队首(如果队列不为空),开始下一批处理
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}
点击展开查看更多

虽然具体每个类是干什么的不知道,至少从这段代码中可以知道 LevelDB 是怎么用锁来处理多线程的写操作的,因为某一时刻可能有多个线程调用 Write 函数

  1. DBImpl 中维护了一个 writers 的写者队列(实际是个 deque),调用 Write 时先加个锁,将当前 WriteBatch 封装成 Writer 丢到队列尾部
  2. 然后进入 while 循环进行等待,直到当前的 writer 已经被完成或变成队首
  3. 在经过一些基本准备后,将锁释放掉,然后先写 WAL 日志,再写入 MemTable
  4. 这两个写完后继续加锁,处理本批次中的所有写者
  5. Write 函数调用结束后释放锁

下面再看看读操作的实现

CPP
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
                   std::string* value) {
  Status s;
  // 1. 加锁保护共享数据
  MutexLock l(&mutex_);

  // 2. 确定读取的快照序列号
  SequenceNumber snapshot;
  if (options.snapshot != nullptr) {
    // 如果指定了快照,使用该快照的序列号
    snapshot =
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  } else {
    // 否则使用当前最新的序列号
    snapshot = versions_->LastSequence();
  }

  // 3. 获取当前数据库状态的引用
  MemTable* mem = mem_;        // 可写的内存表
  MemTable* imm = imm_;        // 不可变的(immutable)内存表
  Version* current = versions_->current();  // 当前版本,包含所有SST文件
  mem->Ref();                   // 增加引用计数,防止被销毁
  if (imm != nullptr) imm->Ref();
  current->Ref();

  bool have_stat_update = false;
  Version::GetStats stats;     // 用于收集读取统计信息

  // 4. 释放锁进行实际读取(减少锁持有时间)
  {
    mutex_.Unlock();
    // 首先在可写内存表中查找
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {
      // 在mem中找到
    } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
      // 在imm中找到
    } else {
      // 在磁盘文件(SSTable)中查找
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;  // 标记有统计信息需要更新
    }
    mutex_.Lock();
  }

  // 5. 如果有读取统计信息,则更新并可能触发压缩
  if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();  // 可能需要安排后台压缩
  }

  // 6. 释放对数据结构的引用
  mem->Unref();
  if (imm != nullptr) imm->Unref();
  current->Unref();

  return s;
}
点击展开查看更多

Get 中可以看到有 snapshot 这个变量,能够猜测在 LevelDB 中应该是有版本管理的,在后续章节中对这项内容都会逐一进行分析

参考资料

版权声明

作者: MiaoHN

链接: https://404notfixed.cn/posts/leveldb-0x00/

许可证: CC BY-NC-SA 4.0

This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License. Please attribute the source, use non-commercially, and maintain the same license.

开始搜索

输入关键词搜索文章内容

↑↓
ESC
⌘K 快捷键