3.文件结构
3.1 文件组成
Level DB包含一下几种文件:
文件类型 | 说明 |
dbname/MANIFEST-[0-9]+ | 清单文件 |
dbname/[0-9]+.log | db日志文件 |
dbname/[0-9]+.sst | dbtable文件 |
dbname/[0-9]+.dbtmp | db临时文件 |
dbname/CURRENT | 记录当前使用的清单文件名 |
dbname/LOCK | DB锁文件 |
dbname/LOG | info log日志文件 |
dbname/LOG.old | 旧的info log日志文件 |
上面的log文件,sst文件,临时文件,清单文件末尾都带着序列号,序号是单调递增的(随着next_file_number从1开始递增),以保证不会和之前的文件名重复。另外,注意区分db log与info log:前者是为了防止保障数据安全而实现的二进制Log,后者是打印引擎中间运行状态及警告等信息的文本log。
随着更新与Compaction的进行,LevelDB会不断生成新文件,有时还会删除老文件,所以需要一个文件来记录文件列表,这个列表就是清单文件的作用,清单会不断变化,DB需要知道最新的清单文件,必须将清单准备好后原子切换,这就是CURRENT文件的作用,Level DB的清单过程更新如下:
- Status SetCurrentFile(Env* env,const std::string& dbname,uint64_t descriptor_number) {
- // 创建一个新的清单文件名
- std::string manifest = DescriptorFileName(dbname,descriptor_number);
- Slice contents = manifest;
- // 移除"dbname/"前缀
- assert(contents.starts_with(dbname + "/"));
- contents.remove_prefix(dbname.size() + 1);
- // 创建一个临时文件
- std::string tmp = TempFileName(dbname,descriptor_number);
- // 写入清单文件名
- Status s = WriteStringToFile(env,contents.ToString() + "\n",tmp);
- if (s.ok()) {
- // 将临时文件改名为CURRENT
- s = env->RenameFile(tmp,CurrentFileName(dbname));
- }
- if (!s.ok()) {
- env->DeleteFile(tmp);
- }
- return s;
- }
3.2 Manifest
2. 其的格式如下:
我们可以看看其序列化的代码:
- void VersionEdit::EncodeTo(std::string* dst) const {
- if (has_comparator_) { // 记录Comparator名称
- PutVarint32(dst,kComdparator);
- PutLengthPrefixedSlice(dst,comparator_);
- }
- if (has_log_number_) { // 记录Log Numer
- PutVarint32(dst,kLogNumber);
- PutVarint64(dst,log_number_);
- }
- if (has_prev_log_number_) { // 记录Prev Log Number,现在已废弃,一般为0
- PutVarint32(dst,kPrevLogNumber);
- PutVarint64(dst,prev_log_number_);
- }
- if (has_next_file_number_) { // 记录下一个文件序号
- PutVarint32(dst,kNextFileNumber);
- PutVarint64(dst,next_file_number_);
- }
- if (has_last_sequence_) { // 记录最大的sequence num
- PutVarint32(dst,kLastSequence);
- PutVarint64(dst,last_sequence_);
- }
- // 记录每一级Level下次compaction的起始Key
- for (size_t i = 0; i < compact_pointers_.size(); i++) {
- PutVarint32(dst,kCompactPointer);
- PutVarint32(dst,compact_pointers_[i].first); // level
- PutLengthPrefixedSlice(dst,compact_pointers_[i].second.Encode());
- }
- // 记录每一级需要删除的文件
- for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
- iter != deleted_files_.end();
- ++iter) {
- PutVarint32(dst,kDeletedFile);
- PutVarint32(dst,iter->first); // level
- PutVarint64(dst,iter->second); // file number
- }
- // 记录每一级需要有效的sst以及其smallest与largest的key
- for (size_t i = 0; i < new_files_.size(); i++) {
- const FileMetaData& f = new_files_[i].second;
- PutVarint32(dst,kNewFile);
- PutVarint32(dst,new_files_[i].first); // level
- PutVarint64(dst,f.number);
- PutVarint64(dst,f.file_size);
- PutLengthPrefixedSlice(dst,f.smallest.Encode());
- PutLengthPrefixedSlice(dst,f.largest.Encode());
- }
- }
3.3 Sortedtable
按照SSTable的结构,可以正向遍历,也可以逆向遍历,但是逆向遍历的代价要远远高于正向遍历的代价,因为每条record都是变长的,且其没有记录前一条记录的偏移,因此逆向Group遍历时,只能先回到group(代码中称为一个restart,为了便于理解,下面都称为group)开头(一个Data Block的group一般为16条记录,每个Data Block的尾部有group起始位置偏移索引),然后从头开始正向遍历,直至找到其前一条记录,如果当前位置为group的第一条记录,则需要回到上一个group的开头,遍历到其最后一条记录。另外,内存中跳表反向的遍历效率也远远不如正向遍历。
3.4 Sparse Index
一个sst文件内部除了Data Block,还有Index Block,Index Block的结构与Data Block一样,只不过每个group只包含一条记录,即Data Block的最大Key与偏移。其实这里说最大Key并不是很准确,理论上,只要保存最大Key就可以实现二分查找,但是Level DB在这里做了个优化,它并保存最大key,而是保存一个能分隔两个Data Block的最短Key,如:假定Data Block1的最后一个Key为“abcdefg”,Data Block2的第一个Key为“abzxcv”,则index可以记录Data Block1的索引key为“abd”;这样的分割串可以有很多,只要保证Data Block1中的所有Key都小于等于此索引,Data Block2中的所有Key都大于此索引即可。这种优化缩减了索引长度,查询时可以有效减小比较次数。我们可以看看默认comparator如何实现这种分割的:
- void BytewiseComparatorImpl::FindShortestSeparator(
- std::string* start,const Slice& limit) const {
- // 先比较获得最大公共前缀
- size_t min_length = std::min(start->size(),limit.size());
- size_t diff_index = 0;
- while ((diff_index < min_length) &&
- ((*start)[diff_index] == limit[diff_index])) {
- diff_index++;
- }
- if (diff_index >= min_length) {
- // 如果start就是limit的前缀,则只能使用start本身作为分割
- } else {
- uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
- // 将第一个不同字符+1,并确保其不会溢出,同时比limit小
- if (diff_byte < static_cast<uint8_t>(0xff) &&
- diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
- (*start)[diff_index]++;
- start->resize(diff_index + 1);
- assert(Compare(*start,limit) < 0);
- }
- }
- }
从上面可以看出,FindShortestSeparator方法并不严格,有些时候没有找出最短分割的key(比如第一个不等的字符已经为0xFF时),它只是一种优化,我们自定义Comparator时,既可以实现,也可以不实现,如果不实现,将始终使用Data Block的最大Key作为索引,并不影响功能正确性。
4. Operations
在介绍了数据结构后,我们看看LevelDB一些基本操作的实现:
4.1 创建一个新表
创建一个新的表大概分为几步,包括建立各类文件以及内存中的数据结构,线程同步对象等,关键代码如下:
- // DBImpl在构造时会初始化互斥体与信号量,创建一个空的memtable,并根据配置设置Comparator及LRU缓冲
- DBImpl::DBImpl(const Options& options,const std::string& dbname)
- : env_(options.env),internal_comparator_(options.comparator),// 初始化Comparator
- options_(SanitizeOptions(dbname,&internal_comparator_,options)),// 检查参数是否合法
- owns_info_log_(options_.info_log != options.info_log),// 是拥有自己info log,还是使用用户提供的
- owns_cache_(options_.block_cache != options.block_cache),// 是否拥有自己的LRU缓冲,或者使用用户提供的
- dbname_(dbname),// 数据表名称
- db_lock_(NULL),// 不创建也不锁定文件锁
- shutting_down_(NULL),bg_cv_(&mutex_),// 用于与后台线程交互的条件信号
- mem_(new MemTable(internal_comparator_)),// 创建一个新的跳表
- imm_(NULL),// 用于双缓冲的缓冲跳表开始时为NULL
- logfile_(NULL),// log文件
- logfile_number_(0),// log文件的序号
- log_(NULL),// log writer
- logger_(NULL),// 用于在多线程环境中记录Owner logger的一个指针
- logger_cv_(&mutex_),// 用于与Logger交互的条件信号
- bg_compaction_scheduled_(false),// 没打开表时不起动后台的compaction线程
- manual_compaction_(NULL) {
- // 增加memtable的引用计数
- mem_->Ref();
- has_imm_.Release_Store(NULL);
- // 根据Option创建一个LRU的缓冲对象,如果options中指定了Cache空间,则使用用户
- // 提供的Cache空间,否则会在内部确实创建8MB的Cache,另外,LRU的Entry数目不能超过max_open_files-10
- const int table_cache_size = options.max_open_files - 10;
- table_cache_ = new TableCache(dbname_,&options_,table_cache_size);
- // 创建一个Version管理器
- versions_ = new VersionSet(dbname_,table_cache_,&internal_comparator_);
- }
- Options SanitizeOptions(const std::string& dbname,const InternalKeyComparator* icmp,const Options& src) {
- Options result = src;
- result.comparator = icmp;
- ClipToRange(&result.max_open_files,20,50000);
- ClipToRange(&result.write_buffer_size,64<<10,1<<30);
- ClipToRange(&result.block_size,1<<10,4<<20);
- // 如果用户未指定info log文件(用于打印状态等文本信息的日志文件),则由引擎自己创建一个info log文件。
- if (result.info_log == NULL) {
- // Open a log file in the same directory as the db
- src.env->CreateDir(dbname); // 如果目录不存在则创建
- // 如果已存在以前的info log文件,则将其改名为LOG.old,然后创建新的log文件与日志的writer
- src.env->RenameFile(InfoLogFileName(dbname),OldInfoLogFileName(dbname));
- Status s = src.env->NewLogger(InfoLogFileName(dbname),&result.info_log);
- if (!s.ok()) {
- result.info_log = NULL;
- }
- }
- // 如果用户没指定LRU缓冲,则创建8MB的LRU缓冲
- if (result.block_cache == NULL) {
- result.block_cache = NewLRUCache(8 << 20);
- }
- return result;
- }
- Status DBImpl::NewDB() {
- // 创建version管理器
- VersionEdit new_db;
- // 设置Comparator
- new_db.SetComparatorName(user_comparator()->Name());
- new_db.SetLogNumber(0);
- // 下一个序号从2开始,1留给清单文件
- new_db.SetNextFile(2);
- new_db.SetLastSequence(0);
- // 创建一个清单文件,MANIFEST-1
- const std::string manifest = DescriptorFileName(dbname_,1);
- WritableFile* file;
- Status s = env_->NewWritableFile(manifest,&file);
- if (!s.ok()) {
- return s;
- }
- {
- // 写入清单文件头
- log::Writer log(file);
- std::string record;
- new_db.EncodeTo(&record);
- s = log.AddRecord(record);
- if (s.ok()) {
- s = file->Close();
- }
- }
- delete file;
- if (s.ok()) {
- // 设置CURRENT文件,使其指向清单文件
- s = SetCurrentFile(env_,dbname_,1);
- } else {
- env_->DeleteFile(manifest);
- }
- return s;
- }
4.2 打开一个已存在的表
- Status DB::Open(const Options& options,DB** dbptr) {
- *dbptr = NULL;
- DBImpl* impl = new DBImpl(options,dbname);
- impl->mutex_.Lock();
- VersionEdit edit;
- // 如果存在表数据,则Load表数据,并对日志进行恢复,否则,创建新表
- Status s = impl->Recover(&edit);
- if (s.ok()) {
- // 从VersionEdit获取一个新的文件序号,所以如果是新建数据表,则第一个LOG的序号为2(1已经被MANIFEST占用)
- uint64_t new_log_number = impl->versions_->NewFileNumber();
- // 记录日志文件号,创建新的log文件及Writer对象
- WritableFile* lfile;
- s = options.env->NewWritableFile(LogFileName(dbname,new_log_number),&lfile);
- if (s.ok()) {
- edit.SetLogNumber(new_log_number);
- impl->logfile_ = lfile;
- impl->logfile_number_ = new_log_number;
- impl->log_ = new log::Writer(lfile);
- // 如果存在原来的log,则回放log
- s = impl->versions_->LogAndApply(&edit,&impl->mutex_);
- }
- if (s.ok()) {
- // 删除废弃的文件(如果存在)
- impl->DeleteObsoleteFiles();
- // 检查是否需要Compaction,如果需要,则让后台启动Compaction线程
- impl->MaybeScheduleCompaction();
- }
- }
- impl->mutex_.Unlock();
- if (s.ok()) {
- *dbptr = impl;
- } else {
- delete impl;
- }
- return s;
- }
从上面可以看出,其实到底是新建表还是打开表都是取决与DBImpl::Recover()这个方法的行为,它的流程如下:
- Status DBImpl::Recover(VersionEdit* edit) {
- mutex_.AssertHeld();
- // 创建DB目录,不关注错误
- env_->CreateDir(dbname_);
- // 在DB目录下打开或创建(如果不存在)LOCK文件并锁定它,防止其他进程打开此表
- Status s = env_->LockFile(LockFileName(dbname_),&db_lock_);
- if (!s.ok()) {
- return s;
- }
- if (!env_->FileExists(CurrentFileName(dbname_))) {
- // 如果DB目录下不存在CURRENT文件且允许在表不存在时创建表,则新建一个表返回
- if (options_.create_if_missing) {
- s = NewDB();
- if (!s.ok()) {
- return s;
- }
- } else {
- return Status::InvalidArgument(
- dbname_,"does not exist (create_if_missing is false)");
- }
- } else {
- if (options_.error_if_exists) {
- return Status::InvalidArgument(
- dbname_,"exists (error_if_exists is true)");
- }
- }
- // 如果运行到此,表明表已经存在,需要load,第一步是从MANIFEST文件中恢复VersionSet
- s = versions_->Recover();
- if (s.ok()) {
- SequenceNumber max_sequence(0);
- // 获取MANIFEST中获取最后一次持久化清单时在使用LOG文件序号,注意:这个LOG当时正在使用,
- // 表明数据还在memtable中,没有形成sst文件,所以数据恢复需要从这个LOG文件开始(包含这个LOG)。
- // 另外,prev_log是早前版本level_db使用的机制,现在以及不再使用,这里只是为了兼容
- const uint64_t min_log = versions_->LogNumber();
- const uint64_t prev_log = versions_->PrevLogNumber();
- // 扫描DB目录,记录下所有比MANIFEST中记录的LOG更加新的LOG文件
- std::vector<std::string> filenames;
- s = env_->GetChildren(dbname_,&filenames);
- if (!s.ok()) {
- return s;
- }
- uint64_t number;
- FileType type;
- std::vector<uint64_t> logs;
- for (size_t i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i],&number,&type)
- && type == kLogFile
- && ((number >= min_log) || (number == prev_log))) {
- logs.push_back(number);
- }
- }
- // 将LOG文件安装从小到大排序
- std::sort(logs.begin(),logs.end());
- // 逐个LOG文件回放 for (size_t i = 0; i < logs.size(); i++) {
- // 回放LOG时,记录被插入到memtable,如果超过write buffer,则还会dump出level 0的sst文件,
- // 此方法会将日志种每条记录的sequence num与max_sequence进行比较,以记录下最大的sequence num。
- s = RecoverLogFile(logs[i],edit,&max_sequence);
- // 更新最大的文件序号,因为MANIFEST文件中没有记录这些LOG文件占用的序号;
- // 当然,也可能LOG的序号小于MANIFEST中记录的最大文件序号,这时不需要更新。
- versions_->MarkFileNumberUsed(logs[i]);
- }
- if (s.ok()) {
- // 比较日志回放前后的最大sequence num,如果回放记录中有超过LastSequence()的记录,则替换
- if (versions_->LastSequence() < max_sequence) {
- versions_->SetLastSequence(max_sequence);
- }
- }
- }
- return s;
- }
4.3 关闭一个已打开的表
- DBImpl::~DBImpl() {
- // 通知后台线程,DB即将关闭
- mutex_.Lock();
- // 后台线程会间歇性地检查shutting_down_对象的指针,一旦不为NULL就会退出
- shutting_down_.Release_Store(this);
- // 注意:这里必须循环通知,直至compaction线程获得信号并设置了bg_compaction_scheduled_为false
- while (bg_compaction_scheduled_) {
- bg_cv_.Wait();
- }
- mutex_.Unlock();
- // 如果锁定了文件锁,则释放文件锁
- if (db_lock_ != NULL) {
- env_->UnlockFile(db_lock_);
- }
- delete versions_;
- // 减去memtable的引用计数
- if (mem_ != NULL) mem_->Unref();
- if (imm_ != NULL) imm_->Unref();
- // 销毁db log相关对象以及表缓冲对象
- delete log_;
- delete logfile_;
- delete table_cache_;
- // 如果info log与cache是引擎自己构建,则需要销毁它们
- if (owns_info_log_) {
- delete options_.info_log;
- }
- if (owns_cache_) {
- delete options_.block_cache;
- }
- }
由上可见,delete一个db对象可能会阻塞调用线程一段时间,必须让其完成一些必须完成的工作,才能进一步保障数据的安全。