小明思考

          Just a software engineer
          posts - 124, comments - 36, trackbacks - 0, articles - 0
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          leveldb研究10- 流程分析:寫數據

          Posted on 2012-03-21 14:41 小明 閱讀(3458) 評論(0)  編輯  收藏 所屬分類: 分布式計算
          總體來說,leveldb的寫操作有兩個步驟,首先是針對log的append操作,然后是對memtable的插入操作。

          影響寫性能的因素有:
          1. write_buffer_size
          2. kL0_SlowdownWritesTrigger and kL0_StopWritesTrigger.提高這兩個值,能夠增加寫的性能,但是降低讀的性能

          看看WriteOptions有哪些參數可以指定
          struct WriteOptions {
            
          //設置sync=true,leveldb會調用fsync(),這會降低插入性能
            
          //同時會增加數據的安全性 
            
          //Default: false
            bool sync;

            WriteOptions()
                : sync(
          false) {
            }
          };


          首先把Key,value轉成WriteBatch
          Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
            WriteBatch batch;
            batch.Put(key, value);
            
          return Write(opt, &batch);
          }

          接下來就是真正的插入了
          這里使用了兩把鎖,主要是想提高并發能力,減少上鎖的時間。
          首先是檢查是否可寫,然后append log,最后是插入memtable
          <db/dbimpl.cc>

          Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
            Status status;
            
          //加鎖
            MutexLock l(&mutex_);
            LoggerId self;
            
          //拿到寫log的權利
            AcquireLoggingResponsibility(&self);
            
          //檢查是否可寫
            status = MakeRoomForWrite(false);  // May temporarily release lock and wait
            uint64_t last_sequence = versions_->LastSequence();
            
          if (status.ok()) {
              WriteBatchInternal::SetSequence(updates, last_sequence 
          + 1);
              last_sequence 
          += WriteBatchInternal::Count(updates);

              
          // Add to log and apply to memtable.  We can release the lock during
              
          // this phase since the "logger_" flag protects against concurrent
              
          // loggers and concurrent writes into mem_.
              {
                assert(logger_ 
          == &self);
                mutex_.Unlock();
                
          //IO操作:寫入LOG
                status = log_->AddRecord(WriteBatchInternal::Contents(updates));
                
          if (status.ok() && options.sync) {
                  status 
          = logfile_->Sync();
                }
                
          //插入memtable
                if (status.ok()) {
                  status 
          = WriteBatchInternal::InsertInto(updates, mem_);
                }
                mutex_.Lock();
                assert(logger_ 
          == &self);
              }
              
          //設置新的seqence number
              versions_->SetLastSequence(last_sequence);
            }
            
          //釋放寫LOG鎖
            ReleaseLoggingResponsibility(&self);
            
          return status;
          }

          寫流量控制:
          <db/dbimpl.cc>
          Status DBImpl::MakeRoomForWrite(bool force) {
            mutex_.AssertHeld();
            assert(logger_ 
          != NULL);
            
          bool allow_delay = !force;
            Status s;
            
          while (true) {
              
          if (!bg_error_.ok()) {
                
          // Yield previous error
                s = bg_error_;
                
          break;
              } 
          else if ( 
                  allow_delay 
          &&
                  versions_
          ->NumLevelFiles(0>= config::kL0_SlowdownWritesTrigger) {
                mutex_.Unlock();
                
          //如果level0的文件大于kL0_SlowdownWritesTrigger閾值,則sleep 1s,這樣給compaction更多的CPU
                env_->SleepForMicroseconds(1000);
                allow_delay 
          = false;  // Do not delay a single write more than once
                mutex_.Lock();
              } 
          else if (!force &&
                         (mem_
          ->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
                
          //可寫
                break;
              } 
          else if (imm_ != NULL) {
                
          // imm_:之前的memtable 沒有被compaction,需要等待
                bg_cv_.Wait();
              } 
          else if (versions_->NumLevelFiles(0>= config::kL0_StopWritesTrigger) {
                
          // level0文件個數大于kL0_StopWritesTrigger,需要等待
                Log(options_.info_log, "waiting\n");
                bg_cv_.Wait();
              } 
          else {
                
          //生成新的額memtable和logfile,把當前memtable傳給imm_
                assert(versions_->PrevLogNumber() == 0);
                uint64_t new_log_number 
          = versions_->NewFileNumber();
                WritableFile
          * lfile = NULL;
                s 
          = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
                
          if (!s.ok()) {
                  
          break;
                }
                delete log_;
                delete logfile_;
                logfile_ 
          = lfile;
                logfile_number_ 
          = new_log_number;
                log_ 
          = new log::Writer(lfile);
                imm_ 
          = mem_;
                has_imm_.Release_Store(imm_);
                mem_ 
          = new MemTable(internal_comparator_);
                mem_
          ->Ref();
                force 
          = false;   // Do not force another compaction if have room
                // 發起compaction,dump imm_
                MaybeScheduleCompaction();
              }
            }
            
          return s;
          }

          主站蜘蛛池模板: 乡宁县| 萝北县| 安康市| 瑞安市| 汝南县| 寿光市| 丹凤县| 霍邱县| 金沙县| 金平| 湛江市| 万源市| 阿勒泰市| 长兴县| 潞西市| 延庆县| 琼中| 西宁市| 拉孜县| 蓬溪县| 中江县| 大埔县| 高密市| 凌海市| 延边| 梁平县| 横峰县| 广宗县| 塘沽区| 荔浦县| 万安县| 孟村| 罗山县| 吉木萨尔县| 青河县| 金溪县| 仙桃市| 邯郸县| 林口县| 余干县| 光泽县|