小明思考

          Just a software engineer
          posts - 124, comments - 36, trackbacks - 0, articles - 0
            BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

          日歷

          <2025年7月>
          293012345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          相冊

          My blogs

          搜索

          •  

          最新評論

          leveldb 是通過Open函數(shù)來打開/新建數(shù)據(jù)庫。
          static Status Open(const Options& options,
                               
          const std::string& name,
                               DB
          ** dbptr);

          其中options指定一些選項。
          struct Options {
            
          // -------------------
            
          // 影響行為的參數(shù)

            
          //comparator用于指定key的排列方式,默認(rèn)按照字節(jié)排序
            const Comparator* comparator;

            
          //如果不存在則創(chuàng)建
            
          // Default: false
            bool create_if_missing;

            
          // 如果存在則失敗
            
          // Default: false
            bool error_if_exists;

            
          // 是否做嚴(yán)格的檢查
            
          // Default: false
            bool paranoid_checks;

            
          // env: os 封裝
            
          // Default: Env::Default()
            Env* env;

            
          // log file,默認(rèn)和database相同路徑
            
          // Default: NULL
            Logger* info_log;

            
          // -------------------
            
          // 影響性能的參數(shù)

            
          // 寫緩沖大小,增加會提高寫的性能,但是會增加啟動的時間,因為有更多的數(shù)據(jù)需要恢復(fù)
            
          //
            
          // Default: 4MB
            size_t write_buffer_size;

            
          // 最大打開的文件個數(shù),用于TableCache
            
          //
            
          // Default: 1000
            int max_open_files;

            
          // Control over blocks (user data is stored in a set of blocks, and
            
          // a block is the unit of reading from disk).

            
          // 指定Block cache,默認(rèn)leveldb會自動創(chuàng)建8MB的internal cache
            
          // Default: NULL
            Cache* block_cache;

            
          //SST file中的Block size,為壓縮之前的數(shù)據(jù)
            
          //
            
          // Default: 4K
            size_t block_size;

            
          // SST file 中的restart pointer的間隔,參見SST的文件格式
            
          //
            
          // Default: 16
            int block_restart_interval;

            
          // 壓縮類型,默認(rèn)為google的snappy壓縮
            CompressionType compression;

            
          // Create an Options object with default values for all fields.
            Options();
          };

          具體看看Open的實現(xiàn):
          <db/dbimpl.cc>
          Status DB::Open(const Options& options, const std::string& dbname,
                          DB
          ** dbptr) {
            
          *dbptr = NULL;

            
          //實例化對象:DBImpl
            DBImpl* impl = new DBImpl(options, dbname);
            
          //加鎖
            impl->mutex_.Lock();
            VersionEdit edit;
            
          //從log中恢復(fù)數(shù)據(jù),生成新的SST file
            Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
            if (s.ok()) { 
              
          //創(chuàng)建新的log file
              uint64_t new_log_number = impl->versions_->NewFileNumber();
              WritableFile
          * lfile;
              s 
          = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                               
          &lfile);
              
          if (s.ok()) {
                edit.SetLogNumber(new_log_number);
                impl
          ->logfile_ = lfile;
                impl
          ->logfile_number_ = new_log_number;
                impl
          ->log_ = new log::Writer(lfile);
                
          //生成新的manifest文件
                s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
              }
              
          if (s.ok()) {
                
          //刪除失效文件
                impl->DeleteObsoleteFiles();
                
          //進行compaction
                impl->MaybeScheduleCompaction();
              }
            }
            impl
          ->mutex_.Unlock();
            
          if (s.ok()) {
              
          *dbptr = impl;
            } 
          else {
              delete impl;
            }
            
          return s;
          }

          因為上次關(guān)閉數(shù)據(jù)庫的時候,內(nèi)存的數(shù)據(jù)可能并沒有寫入SST文件,所以要從*.log中讀取記錄,并寫入新的SST文件。
          <db/dbimpl.cc>
          Status DBImpl::Recover(VersionEdit* edit) {
            mutex_.AssertHeld();

            
          //創(chuàng)建folder
            env_->CreateDir(dbname_);
            assert(db_lock_ 
          == NULL);
            
          //生成LOCK文件并鎖定
            Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
            
          if (!s.ok()) {
              
          return s;
            }

            
          if (!env_->FileExists(CurrentFileName(dbname_))) {
              
          if (options_.create_if_missing) {
                
          //新建database
                s = NewDB();
                
          if (!s.ok()) {
                  
          return s;
                }
              } 
          else {
                
          return Status::InvalidArgument(
                    dbname_, 
          "does not exist (create_if_missing is false)");
              }
            } 
          else {
              
          if (options_.error_if_exists) {
                
          return Status::InvalidArgument(
                    dbname_, 
          "exists (error_if_exists is true)");
              }
            }

            
          //重建manifest信息
            s = versions_->Recover();
            
          if (s.ok()) {
              SequenceNumber max_sequence(
          0);

              
          //得到上次的log file
              const uint64_t min_log = versions_->LogNumber();
              
          const uint64_t prev_log = versions_->PrevLogNumber();
              std::vector
          <std::string> filenames;
              s 
          = env_->GetChildren(dbname_, &filenames);
              
          if (!s.ok()) {
                
          return s;
              }
              uint64_t number;
              FileType type;
              std::vector
          <uint64_t> logs;
              
          for (size_t i = 0; i < filenames.size(); i++) {
                
          if (ParseFileName(filenames[i], &number, &type)
                    
          && type == kLogFile
                    
          && ((number >= min_log) || (number == prev_log))) {
                  logs.push_back(number);
                }
              }

              
          // Recover in the order in which the logs were generated
              std::sort(logs.begin(), logs.end());
              
          for (size_t i = 0; i < logs.size(); i++) {
                
          //從*.log中恢復(fù)數(shù)據(jù)
                s = RecoverLogFile(logs[i], edit, &max_sequence);

                
          // The previous incarnation may not have written any MANIFEST
                
          // records after allocating this log number.  So we manually
                
          // update the file number allocation counter in VersionSet.
                versions_->MarkFileNumberUsed(logs[i]);
              }

              
          if (s.ok()) {
                
          if (versions_->LastSequence() < max_sequence) {
                  versions_
          ->SetLastSequence(max_sequence);
                }
              }
            }

            
          return s;
          }

          繼續(xù)看RecoverLogFile的實現(xiàn):
          <db/dbimpl.cc>
          Status DBImpl::RecoverLogFile(uint64_t log_number,
                                        VersionEdit
          * edit,
                                        SequenceNumber
          * max_sequence) {
            
          //LogReporter:出現(xiàn)壞數(shù)據(jù)的時候報告
             struct LogReporter : public log::Reader::Reporter {
              Env
          * env;
              Logger
          * info_log;
              
          const char* fname;
              Status
          * status;  // NULL if options_.paranoid_checks==false
              virtual void Corruption(size_t bytes, const Status& s) {
                Log(info_log, 
          "%s%s: dropping %d bytes; %s",
                    (
          this->status == NULL ? "(ignoring error) " : ""),
                    fname, static_cast
          <int>(bytes), s.ToString().c_str());
                
          if (this->status != NULL && this->status->ok()) *this->status = s;
              }
            };

            mutex_.AssertHeld();

            
          //打開Log file用于順序讀取
            std::string fname = LogFileName(dbname_, log_number);
            SequentialFile
          * file;
            Status status 
          = env_->NewSequentialFile(fname, &file);
            
          if (!status.ok()) {
              MaybeIgnoreError(
          &status);
              
          return status;
            }

            LogReporter reporter;
            reporter.env 
          = env_;
            reporter.info_log 
          = options_.info_log;
            reporter.fname 
          = fname.c_str();
            reporter.status 
          = (options_.paranoid_checks ? &status : NULL);
            
          // log::Reader讀取數(shù)據(jù)
            log::Reader reader(file, &reporter, true/*checksum*/,
                               
          0/*initial_offset*/);
            Log(options_.info_log, 
          "Recovering log #%llu",
                (unsigned 
          long long) log_number);

            std::
          string scratch;
            Slice record;
            WriteBatch batch;
            MemTable
          * mem = NULL;
            
          //遍歷log file,讀取記錄
            while (reader.ReadRecord(&record, &scratch) &&
                   status.ok()) {
              
          if (record.size() < 12) {
                reporter.Corruption(
                    record.size(), Status::Corruption(
          "log record too small"));
                
          continue;
              }
              WriteBatchInternal::SetContents(
          &batch, record);

              
          if (mem == NULL) {
                
          //新建MemTable用于保存數(shù)據(jù)
                mem = new MemTable(internal_comparator_);
                mem
          ->Ref();
              }
              
          //插入memtable
              status = WriteBatchInternal::InsertInto(&batch, mem);
              MaybeIgnoreError(
          &status);
              
          if (!status.ok()) {
                
          break;
              }
              
          const SequenceNumber last_seq =
                  WriteBatchInternal::Sequence(
          &batch) +
                  WriteBatchInternal::Count(
          &batch) - 1;
              
          if (last_seq > *max_sequence) {
                
          *max_sequence = last_seq;
              }

              
          if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
                
          //寫入SST file:level 0
                status = WriteLevel0Table(mem, edit, NULL);
                
          if (!status.ok()) {
                  
          break;
                }
                
          //釋放并刪除memtable
                mem->Unref();
                mem 
          = NULL;
              }
            }

            
          if (status.ok() && mem != NULL) {
              status 
          = WriteLevel0Table(mem, edit, NULL);
              
          // Reflect errors immediately so that conditions like full
              
          // file-systems cause the DB::Open() to fail.
            }

            
          if (mem != NULL) mem->Unref();
            delete file;
            
          return status;
          }

          至此完成SST file的寫入。

          接下來看看manifest文件的重建
          mainfest的重建有兩步,第一步是調(diào)用VersionSet::Recover函數(shù)恢復(fù)到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件記錄也寫入manifest文件中。
          <db/version_set.cc>

          Status VersionSet::Recover() {
            
          struct LogReporter : public log::Reader::Reporter {
              Status
          * status;
              
          virtual void Corruption(size_t bytes, const Status& s) {
                
          if (this->status->ok()) *this->status = s;
              }
            };

            
          // 讀取CURRENT文件,獲取最新的MANIFEST文件
            std::string current;
            Status s 
          = ReadFileToString(env_, CurrentFileName(dbname_), &current);
            
          if (!s.ok()) {
              
          return s;
            }
            
          if (current.empty() || current[current.size()-1!= '\n') {
              
          return Status::Corruption("CURRENT file does not end with newline");
            }
            current.resize(current.size() 
          - 1);

            std::
          string dscname = dbname_ + "/" + current;
            SequentialFile
          * file;
            
          //打開當(dāng)前MANIFEST文件
            s = env_->NewSequentialFile(dscname, &file);
            
          if (!s.ok()) {
              
          return s;
            }

            
          bool have_log_number = false;
            
          bool have_prev_log_number = false;
            
          bool have_next_file = false;
            
          bool have_last_sequence = false;
            uint64_t next_file 
          = 0;
            uint64_t last_sequence 
          = 0;
            uint64_t log_number 
          = 0;
            uint64_t prev_log_number 
          = 0;
            Builder builder(
          this, current_);

            {
              LogReporter reporter;
              reporter.status 
          = &s;
              
          //使用log::Reader讀取log記錄:VersionEdit
              log::Reader reader(file, &reporter, true/*checksum*/0/*initial_offset*/);
              Slice record;
              std::
          string scratch;
              
          while (reader.ReadRecord(&record, &scratch) && s.ok()) {
                VersionEdit edit;
                s 
          = edit.DecodeFrom(record);
                
          if (s.ok()) {
                  
          if (edit.has_comparator_ &&
                      edit.comparator_ 
          != icmp_.user_comparator()->Name()) {
                    s 
          = Status::InvalidArgument(
                        edit.comparator_ 
          + "does not match existing comparator ",
                        icmp_.user_comparator()
          ->Name());
                  }
                }

                
          if (s.ok()) {
                  
          //應(yīng)用Edit到VersionSet
                  builder.Apply(&edit);
                }

                
          if (edit.has_log_number_) {
                  log_number 
          = edit.log_number_;
                  have_log_number 
          = true;
                }

                
          if (edit.has_prev_log_number_) {
                  prev_log_number 
          = edit.prev_log_number_;
                  have_prev_log_number 
          = true;
                }

                
          if (edit.has_next_file_number_) {
                  next_file 
          = edit.next_file_number_;
                  have_next_file 
          = true;
                }

                
          if (edit.has_last_sequence_) {
                  last_sequence 
          = edit.last_sequence_;
                  have_last_sequence 
          = true;
                }
              }
            }
            delete file;
            file 
          = NULL;

            
          if (s.ok()) {
              
          if (!have_next_file) {
                s 
          = Status::Corruption("no meta-nextfile entry in descriptor");
              } 
          else if (!have_log_number) {
                s 
          = Status::Corruption("no meta-lognumber entry in descriptor");
              } 
          else if (!have_last_sequence) {
                s 
          = Status::Corruption("no last-sequence-number entry in descriptor");
              }

              
          if (!have_prev_log_number) {
                prev_log_number 
          = 0;
              }

              MarkFileNumberUsed(prev_log_number);
              MarkFileNumberUsed(log_number);
            }

            
          if (s.ok()) { //生成新的version,并設(shè)為current version
              Version* v = new Version(this);
              builder.SaveTo(v);
              
          // Install recovered version
              Finalize(v);
              AppendVersion(v);
              manifest_file_number_ 
          = next_file;
              next_file_number_ 
          = next_file + 1;
              last_sequence_ 
          = last_sequence;
              log_number_ 
          = log_number;
              prev_log_number_ 
          = prev_log_number;
            }

            
          return s;
          }

          Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
            
          if (edit->has_log_number_) {
              assert(edit
          ->log_number_ >= log_number_);
              assert(edit
          ->log_number_ < next_file_number_);
            } 
          else {
              edit
          ->SetLogNumber(log_number_);
            }

            
          if (!edit->has_prev_log_number_) {
              edit
          ->SetPrevLogNumber(prev_log_number_);
            }

            edit
          ->SetNextFile(next_file_number_);
            edit
          ->SetLastSequence(last_sequence_);

            
          //使用VersionEdit創(chuàng)建新的Version
            Version* v = new Version(this);
            {
              Builder builder(
          this, current_);
              builder.Apply(edit);
              builder.SaveTo(v);
            }
            Finalize(v);

            
          // Initialize new descriptor log file if necessary by creating
            
          // a temporary file that contains a snapshot of the current version.
            std::string new_manifest_file;
            Status s;
            
          //創(chuàng)建新的manifest文件
            if (descriptor_log_ == NULL) {
              
          // No reason to unlock *mu here since we only hit this path in the
              
          // first call to LogAndApply (when opening the database).
              assert(descriptor_file_ == NULL);
              new_manifest_file 
          = DescriptorFileName(dbname_, manifest_file_number_);
              edit
          ->SetNextFile(next_file_number_);
              s 
          = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
              
          if (s.ok()) {
                descriptor_log_ 
          = new log::Writer(descriptor_file_);
                s 
          = WriteSnapshot(descriptor_log_);
              }
            }

            
          // Unlock during expensive MANIFEST log write
            {
              mu
          ->Unlock();

              
          // 寫入manifest log
              if (s.ok()) {
                std::
          string record;
                edit
          ->EncodeTo(&record);
                s 
          = descriptor_log_->AddRecord(record);
                
          if (s.ok()) {
                  s 
          = descriptor_file_->Sync();
                }
              }

              
          // If we just created a new descriptor file, install it by writing a
              
          // new CURRENT file that points to it.
              if (s.ok() && !new_manifest_file.empty()) {
                s 
          = SetCurrentFile(env_, dbname_, manifest_file_number_);
              }

              mu
          ->Lock();
            }

            
          // 設(shè)置新的version
            if (s.ok()) {
              AppendVersion(v);
              log_number_ 
          = edit->log_number_;
              prev_log_number_ 
          = edit->prev_log_number_;
            } 
          else {
              delete v;
              
          if (!new_manifest_file.empty()) {
                delete descriptor_log_;
                delete descriptor_file_;
                descriptor_log_ 
          = NULL;
                descriptor_file_ 
          = NULL;
                env_
          ->DeleteFile(new_manifest_file);
              }
            }

            
          return s;
          }






          主站蜘蛛池模板: 仁布县| 扎囊县| 忻城县| 通渭县| 永修县| 正宁县| 虹口区| 浦北县| 宁远县| 阿坝| 呼玛县| 安图县| 资溪县| 江山市| 扎兰屯市| 冀州市| 湾仔区| 徐闻县| 铅山县| 建德市| 兰州市| 怀远县| 寿阳县| 北流市| 舒兰市| 紫金县| 五峰| 周宁县| 芒康县| 阜南县| 五华县| 图木舒克市| 谢通门县| 伊金霍洛旗| 区。| 孟村| 偏关县| 民和| 汉沽区| 社会| 高台县|