小明思考

          Just a software engineer
          posts - 124, comments - 36, trackbacks - 0, articles - 0
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          leveldb研究6- Level和Compaction

          Posted on 2012-03-15 17:28 小明 閱讀(7899) 評論(0)  編輯  收藏 所屬分類: 分布式計算
          leveldb之所以使用level作為數據庫名稱,精華就在于level的設計。



          本質是一種歸并排序算法。
          這樣設計的好處主要是可以減少compaction的次數和每次的文件個數。


          Compaction

          • 為什么要compaction?
           compaction可以提高數據的查詢效率,沒有經過compaction,需要從很多SST file去查找,而做過compaction后,只需要從有限的SST文件去查找,大大的提高了隨機查詢的效率,另外也可以刪除過期數據。

          • 什么時候可能進行compaction?
           1. database open的時候
           2. write的時候
           3. read的時候?

          <db/dbimpl.cc>

          //是否要進行compaction
          void DBImpl::MaybeScheduleCompaction() {
            mutex_.AssertHeld();
            
          if (bg_compaction_scheduled_) { //已經在進行
            } else if (shutting_down_.Acquire_Load()) {
            } 
          else if (imm_ == NULL &&
                       manual_compaction_ 
          == NULL &&
                       
          !versions_->NeedsCompaction()) {
              
          //imm_為NULL:沒有memtable需要flush
              
          //manual_compaction_:手動compaction
            } else {
              bg_compaction_scheduled_ 
          = true;
              env_
          ->Schedule(&DBImpl::BGWork, this);
            }
          }

          <db/version_set.h>
          bool NeedsCompaction() const {
              Version
          * v = current_;
              
          return (v->compaction_score_ >= 1|| (v->file_to_compact_ != NULL);
            }

          如何計算這個compaction_score呢?看下面的代碼:
          <db/version_set.cc>
          void VersionSet::Finalize(Version* v) {
            
          int best_level = -1;
            
          double best_score = -1;

          //遍歷所有的level  
          for (int level = 0; level < config::kNumLevels-1; level++) {
              
          double score;
              
          if (level == 0) {
                
          //對于level 0,計算當前文件個數和預定義的compaction trigger value(Default:4)之比
                score = v->files_[level].size() /
                    static_cast
          <double>(config::kL0_CompactionTrigger);
              } 
          else {
                
          //對于其他level,計算level文件大小和level應有的大小(10^N MB)
                const uint64_t level_bytes = TotalFileSize(v->files_[level]);
                score 
          = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
              }
               
          //找出最需要compaction的level
              if (score > best_score) {
                best_level 
          = level;
                best_score 
          = score;
              }
            }

            v
          ->compaction_level_ = best_level;
            v
          ->compaction_score_ = best_score;
          }

          • 如何做compaction?
          leveldb 運行會啟動一個background thread,會執行一些background task,compaction就在這個線程中執行。


          首先來看看compaction對象如何定義的
          <db/version_set.h>
          //關于compaction的一些信息
          class Compaction {
           
          public:
            
          ~Compaction();

            
          //compaction Level:會將N層N+1層合并生成N+1文件
            int level() const { return level_; }

            
          //返回VersionEdit,用于記錄到manifest
            VersionEdit* edit() { return &edit_; }

            
          //返回N層或者N+1層的文件個數,which = 0,1
            int num_input_files(int which) const { return inputs_[which].size(); }

            
          //返回具體的文件信息,which:level
            FileMetaData* input(int which, int i) const { return inputs_[which][i]; }

            
          //本次compaction最大輸出字節
            uint64_t MaxOutputFileSize() const { return max_output_file_size_; }

            
          //是否只需要移動文件進行compaction,不需要merge和split
            bool IsTrivialMove() const;

            
          //把input都當成delete寫入edit
            void AddInputDeletions(VersionEdit* edit);

            
          // Returns true if the information we have available guarantees that
            
          // the compaction is producing data in "level+1" for which no data exists
            
          // in levels greater than "level+1".
            bool IsBaseLevelForKey(const Slice& user_key);

            
          // Returns true iff we should stop building the current output
            
          // before processing "internal_key".
            bool ShouldStopBefore(const Slice& internal_key);

            
          // Release the input version for the compaction, once the compaction
            
          // is successful.
            void ReleaseInputs();

           
          private:
            friend 
          class Version;
            friend 
          class VersionSet;

            
          explicit Compaction(int level);

            
          int level_;
            uint64_t max_output_file_size_;
            Version
          * input_version_;
            VersionEdit edit_;

            
          // Each compaction reads inputs from "level_" and "level_+1"
            std::vector<FileMetaData*> inputs_[2];      // The two sets of inputs

            
          // State used to check for number of of overlapping grandparent files
            
          // (parent == level_ + 1, grandparent == level_ + 2)
            std::vector<FileMetaData*> grandparents_;
            size_t grandparent_index_;  
          // Index in grandparent_starts_
            bool seen_key_;             // Some output key has been seen
            int64_t overlapped_bytes_;  // Bytes of overlap between current output
                                        
          // and grandparent files

            
          // State for implementing IsBaseLevelForKey

            
          // level_ptrs_ holds indices into input_version_->levels_: our state
            
          // is that we are positioned at one of the file ranges for each
            
          // higher level than the ones involved in this compaction (i.e. for
            
          // all L >= level_ + 2).
            size_t level_ptrs_[config::kNumLevels];
          };

          Compaction Thread
          <db/dbimpl.cc>
          void DBImpl::BackgroundCompaction() {
            mutex_.AssertHeld();

            
          //把memtable flush到sstable
            if (imm_ != NULL) {
              CompactMemTable();
              
          return;
            }

            Compaction
          * c;
            
          bool is_manual = (manual_compaction_ != NULL);
            InternalKey manual_end;
            
          if (is_manual) { //手動compaction
              ManualCompaction* m = manual_compaction_;
              
          //根據range來做compaction
              c = versions_->CompactRange(m->level, m->begin, m->end);
              m
          ->done = (c == NULL);
              
          if (c != NULL) {
                manual_end 
          = c->input(0, c->num_input_files(0- 1)->largest;
              }
              Log(options_.info_log,
                  
          "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
                  m
          ->level,
                  (m
          ->begin ? m->begin->DebugString().c_str() : "(begin)"),
                  (m
          ->end ? m->end->DebugString().c_str() : "(end)"),
                  (m
          ->done ? "(end)" : manual_end.DebugString().c_str()));
            } 
          else {
              
          //找到需要compaction的level&file
              c = versions_->PickCompaction();
            }

            Status status;
            
          if (c == NULL) {
              
          // Nothing to do
            } else if (!is_manual && c->IsTrivialMove()) { //只需要移動sst file
              
          // Move file to next level
              assert(c->num_input_files(0== 1);
              FileMetaData
          * f = c->input(00);
              c
          ->edit()->DeleteFile(c->level(), f->number);
              c
          ->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                                 f
          ->smallest, f->largest);
              status 
          = versions_->LogAndApply(c->edit(), &mutex_);
              VersionSet::LevelSummaryStorage tmp;
              Log(options_.info_log, 
          "Moved #%lld to level-%d %lld bytes %s: %s\n",
                  static_cast
          <unsigned long long>(f->number),
                  c
          ->level() + 1,
                  static_cast
          <unsigned long long>(f->file_size),
                  status.ToString().c_str(),
                  versions_
          ->LevelSummary(&tmp));
            } 
          else {//完成compaction
              CompactionState* compact = new CompactionState(c);
              status 
          = DoCompactionWork(compact);
              CleanupCompaction(compact);
              c
          ->ReleaseInputs();
              DeleteObsoleteFiles();
            }
            delete c;

            
          if (status.ok()) {
              
          // Done
            } else if (shutting_down_.Acquire_Load()) {
              
          // Ignore compaction errors found during shutting down
            } else {
              Log(options_.info_log,
                  
          "Compaction error: %s", status.ToString().c_str());
              
          if (options_.paranoid_checks && bg_error_.ok()) {
                bg_error_ 
          = status;
              }
            }

            
          if (is_manual) {
              ManualCompaction
          * m = manual_compaction_;
              
          if (!status.ok()) {
                m
          ->done = true;
              }
              
          if (!m->done) {
                
          // We only compacted part of the requested range.  Update *m
                
          // to the range that is left to be compacted.
                m->tmp_storage = manual_end;
                m
          ->begin = &m->tmp_storage;
              }
              manual_compaction_ 
          = NULL;
            }
          }

          compaction memtable:寫一個level0文件,并寫入manifest log

          Status DBImpl::CompactMemTable() {
            mutex_.AssertHeld();
            assert(imm_ 
          != NULL);

            VersionEdit edit;
            Version
          * base = versions_->current();
            
          base->Ref();
            
          //寫入level0 sst table
            Status s = WriteLevel0Table(imm_, &edit, base);
            
          base->Unref();

            
          if (s.ok() && shutting_down_.Acquire_Load()) {
              s 
          = Status::IOError("Deleting DB during memtable compaction");
            }

            
          // Replace immutable memtable with the generated Table
            if (s.ok()) {
              edit.SetPrevLogNumber(
          0);
              edit.SetLogNumber(logfile_number_);  
          // Earlier logs no longer needed
              
          //生成edit并計入manifest log
              s = versions_->LogAndApply(&edit, &mutex_);
            }

            
          if (s.ok()) {
              
          // Commit to the new state
              imm_->Unref();
              imm_ 
          = NULL;
              has_imm_.Release_Store(NULL);
              DeleteObsoleteFiles();
            }

            
          return s;
          }

          下面來看看compaction已有文件:

          找出要compaction的文件:

          <db/version_set.cc>
          Compaction* VersionSet::PickCompaction() {
            Compaction
          * c;
            
          int level;

          //是否需要compaction,有兩種compaction,一種基于size大小,另外一種基于被seek的次數 
          const bool size_compaction = (current_->compaction_score_ >= 1);
            
          const bool seek_compaction = (current_->file_to_compact_ != NULL);
            
          if (size_compaction) {
              level 
          = current_->compaction_level_;
              assert(level 
          >= 0);
              assert(level
          +1 < config::kNumLevels);
              c 
          = new Compaction(level);

              
          //每一層有一個compact_pointer,用于記錄compaction key,這樣可以進行循環compaction
              for (size_t i = 0; i < current_->files_[level].size(); i++) {
                FileMetaData
          * f = current_->files_[level][i];
                
          if (compact_pointer_[level].empty() ||
                    icmp_.Compare(f
          ->largest.Encode(), compact_pointer_[level]) > 0) {
                  
          //找到一個文件就可以了
                  c->inputs_[0].push_back(f);
                  
          break;
                }
              }
              
          if (c->inputs_[0].empty()) {
                
          // Wrap-around to the beginning of the key space
                c->inputs_[0].push_back(current_->files_[level][0]);
              }
            } 
          else if (seek_compaction) {
              level 
          = current_->file_to_compact_level_;
              c 
          = new Compaction(level);
              c
          ->inputs_[0].push_back(current_->file_to_compact_);
            } 
          else {
              
          return NULL;
            }

            c
          ->input_version_ = current_;
            c
          ->input_version_->Ref();

            
          // level 0:特殊處理,因為可能有key 重疊,把所有重疊都找出來,一起做compaction
            if (level == 0) {
              InternalKey smallest, largest;
              GetRange(c
          ->inputs_[0], &smallest, &largest);
              
          // Note that the next call will discard the file we placed in
              
          // c->inputs_[0] earlier and replace it with an overlapping set
              
          // which will include the picked file.
              current_->GetOverlappingInputs(0&smallest, &largest, &c->inputs_[0]);
              assert(
          !c->inputs_[0].empty());
            }

            
          //找到level N+1需要compaction的文件
            SetupOtherInputs(c);

            
          return c;
          }

          <db/version_set.cc>
          void VersionSet::SetupOtherInputs(Compaction* c) {
            
          const int level = c->level();
            InternalKey smallest, largest;
            GetRange(c
          ->inputs_[0], &smallest, &largest);

            
          //找到所有在Level N+1層有重疊的文件
             current_->GetOverlappingInputs(level+1&smallest, &largest, &c->inputs_[1]);

            
          //取出key的范圍
            InternalKey all_start, all_limit;
            GetRange2(c
          ->inputs_[0], c->inputs_[1], &all_start, &all_limit);

            
          //檢查是否能從Level N找到更多的文件
            if (!c->inputs_[1].empty()) {
              std::vector
          <FileMetaData*> expanded0;
              current_
          ->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
              
          const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
              
          const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
              
          const int64_t expanded0_size = TotalFileSize(expanded0);
              
          if (expanded0.size() > c->inputs_[0].size() &&
                  inputs1_size 
          + expanded0_size < kExpandedCompactionByteSizeLimit) {
                InternalKey new_start, new_limit;
                GetRange(expanded0, 
          &new_start, &new_limit);
                std::vector
          <FileMetaData*> expanded1;
                current_
          ->GetOverlappingInputs(level+1&new_start, &new_limit,
                                               
          &expanded1);
                
          if (expanded1.size() == c->inputs_[1].size()) {
                  Log(options_
          ->info_log,
                      
          "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
                      level,
                      
          int(c->inputs_[0].size()),
                      
          int(c->inputs_[1].size()),
                      
          long(inputs0_size), long(inputs1_size),
                      
          int(expanded0.size()),
                      
          int(expanded1.size()),
                      
          long(expanded0_size), long(inputs1_size));
                  smallest 
          = new_start;
                  largest 
          = new_limit;
                  c
          ->inputs_[0= expanded0;
                  c
          ->inputs_[1= expanded1;
                  GetRange2(c
          ->inputs_[0], c->inputs_[1], &all_start, &all_limit);
                }
              }
            }

            
          // Compute the set of grandparent files that overlap this compaction
            
          // (parent == level+1; grandparent == level+2)
            if (level + 2 < config::kNumLevels) {
              current_
          ->GetOverlappingInputs(level + 2&all_start, &all_limit,
                                             
          &c->grandparents_);
            }

            
          if (false) {
              Log(options_
          ->info_log, "Compacting %d '%s' .. '%s'",
                  level,
                  smallest.DebugString().c_str(),
                  largest.DebugString().c_str());
            }

            
          //設置新的compact_pointer
            compact_pointer_[level] = largest.Encode().ToString();
            c
          ->edit_.SetCompactPointer(level, largest);
          }

          do compaction task:
          Status DBImpl::DoCompactionWork(CompactionState* compact) {
            
          const uint64_t start_micros = env_->NowMicros();
            int64_t imm_micros 
          = 0;  // Micros spent doing imm_ compactions

            Log(options_.info_log,  
          "Compacting %d@%d + %d@%d files",
                compact
          ->compaction->num_input_files(0),
                compact
          ->compaction->level(),
                compact
          ->compaction->num_input_files(1),
                compact
          ->compaction->level() + 1);

            assert(versions_
          ->NumLevelFiles(compact->compaction->level()) > 0);
            assert(compact
          ->builder == NULL);
            assert(compact
          ->outfile == NULL);
            
          if (snapshots_.empty()) {
              compact
          ->smallest_snapshot = versions_->LastSequence();
            } 
          else {
              compact
          ->smallest_snapshot = snapshots_.oldest()->number_;
            }

            
          // Release mutex while we're actually doing the compaction work
            mutex_.Unlock();

            
          //生成iterator:遍歷要compaction的數據
            Iterator* input = versions_->MakeInputIterator(compact->compaction);
            input
          ->SeekToFirst();
            Status status;
            ParsedInternalKey ikey;
            std::
          string current_user_key;
            
          bool has_current_user_key = false;
            SequenceNumber last_sequence_for_key 
          = kMaxSequenceNumber;
            
          for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
              
          // 如果有memtable要compaction:優先去做
              if (has_imm_.NoBarrier_Load() != NULL) {
                
          const uint64_t imm_start = env_->NowMicros();
                mutex_.Lock();
                
          if (imm_ != NULL) {
                  CompactMemTable();
                  bg_cv_.SignalAll();  
          // Wakeup MakeRoomForWrite() if necessary
                }
                mutex_.Unlock();
                imm_micros 
          += (env_->NowMicros() - imm_start);
              }

              Slice key 
          = input->key();
              
          //檢查是不是中途輸出compaction的結果,避免compaction結果和level N+2 files有過多的重疊
              if (compact->compaction->ShouldStopBefore(key) &&
                  compact
          ->builder != NULL) {
                status 
          = FinishCompactionOutputFile(compact, input);
                
          if (!status.ok()) {
                  
          break;
                }
              }

              
          // Handle key/value, add to state, etc.
              bool drop = false;
              
          if (!ParseInternalKey(key, &ikey)) {
                
          // Do not hide error keys
                current_user_key.clear();
                has_current_user_key 
          = false;
                last_sequence_for_key 
          = kMaxSequenceNumber;
              } 
          else {
                
          if (!has_current_user_key ||
                    user_comparator()
          ->Compare(ikey.user_key,
                                               Slice(current_user_key)) 
          != 0) {
                  
          // First occurrence of this user key
                  current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
                  has_current_user_key 
          = true;
                  last_sequence_for_key 
          = kMaxSequenceNumber;
                }

                
          if (last_sequence_for_key <= compact->smallest_snapshot) {
                  
          // Hidden by an newer entry for same user key
                  drop = true;    // (A)
                } else if (ikey.type == kTypeDeletion &&
                           ikey.sequence 
          <= compact->smallest_snapshot &&
                           compact
          ->compaction->IsBaseLevelForKey(ikey.user_key)) {
                  
          // For this user key:
                  
          // (1) there is no data in higher levels
                  
          // (2) data in lower levels will have larger sequence numbers
                  
          // (3) data in layers that are being compacted here and have
                  
          //     smaller sequence numbers will be dropped in the next
                  
          //     few iterations of this loop (by rule (A) above).
                  
          // Therefore this deletion marker is obsolete and can be dropped.
                  drop = true;
                }

                last_sequence_for_key 
          = ikey.sequence;
              }

              
          if (!drop) {
                
          // Open output file if necessary
                if (compact->builder == NULL) {
                  status 
          = OpenCompactionOutputFile(compact);
                  
          if (!status.ok()) {
                    
          break;
                  }
                }
                
          if (compact->builder->NumEntries() == 0) {
                  compact
          ->current_output()->smallest.DecodeFrom(key);
                }
                compact
          ->current_output()->largest.DecodeFrom(key);
                compact
          ->builder->Add(key, input->value());

                
          // 達到sst文件大小,重新寫文件
                if (compact->builder->FileSize() >=
                    compact
          ->compaction->MaxOutputFileSize()) {
                  status 
          = FinishCompactionOutputFile(compact, input);
                  
          if (!status.ok()) {
                    
          break;
                  }
                }
              }

              input
          ->Next();
            }

            
          if (status.ok() && shutting_down_.Acquire_Load()) {
              status 
          = Status::IOError("Deleting DB during compaction");
            }
            
          if (status.ok() && compact->builder != NULL) {
              status 
          = FinishCompactionOutputFile(compact, input);
            }
            
          if (status.ok()) {
              status 
          = input->status();
            }
            delete input;
            input 
          = NULL;

           
          //更新compaction的一些統計數據
            CompactionStats stats;
            stats.micros 
          = env_->NowMicros() - start_micros - imm_micros;
            
          for (int which = 0; which < 2; which++) {
              
          for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
                stats.bytes_read 
          += compact->compaction->input(which, i)->file_size;
              }
            }
            
          for (size_t i = 0; i < compact->outputs.size(); i++) {
              stats.bytes_written 
          += compact->outputs[i].file_size;
            }

            mutex_.Lock();
            stats_[compact
          ->compaction->level() + 1].Add(stats);

            
          if (status.ok()) {
              status 
          = InstallCompactionResults(compact);
            }
            VersionSet::LevelSummaryStorage tmp;
            Log(options_.info_log,
                
          "compacted to: %s", versions_->LevelSummary(&tmp));
            
          return status;
          }





          主站蜘蛛池模板: 延长县| 青州市| 嘉峪关市| 潞西市| 安宁市| 蓝田县| 德兴市| 屏南县| 绵阳市| 锡林浩特市| 图木舒克市| 城固县| 内江市| 北碚区| 开封市| 富顺县| 恩施市| 巩义市| 迁西县| 库车县| 墨竹工卡县| 清苑县| 凤冈县| 平舆县| 秦皇岛市| 彰化市| 凌源市| 民勤县| 汨罗市| 运城市| 万源市| 大连市| 郯城县| 瑞丽市| 荃湾区| 定州市| 浙江省| 明光市| 邹平县| 齐河县| 宜黄县|