小明思考

          Just a software engineer
          posts - 124, comments - 36, trackbacks - 0, articles - 0
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          leveldb研究4- 數據文件的格式和生成

          Posted on 2012-03-12 18:21 小明 閱讀(3877) 評論(1)  編輯  收藏 所屬分類: 分布式計算
          leveldb使用SSTable格式來保存數據。

          格式為:(當前沒有META BLOCK)
          SSTABLE = |DATA BLOCK1|DATA BLOCK2|...|DATA BLOCK N|META BLOCK1|...|META BLOCK N|META INDEX BLOCK|DATA INDEX BLOCK|Footer|
          DATA BLOCK = |KeyValues|Restart arrays|array size|Compress Type|CRC
          Footer(定長) = META INDEX BLOCK offset | DATA Index Block offset| Magic Numbers

          比較細節的地方是數據塊的壓縮,針對key使用了前綴壓縮法。

          下面看看具體的實現。

          //builder.cc
          //dbname:數據庫名稱
          //env:OS接口
          //iter:指向MemTable的一個iterator
          Status BuildTable(const std::string& dbname,
                            Env
          * env,
                            
          const Options& options,
                            TableCache
          * table_cache,
                            Iterator
          * iter,
                            FileMetaData
          * meta) {
            Status s;
            meta
          ->file_size = 0;
            iter
          ->SeekToFirst();

            
          //生成文件名:格式 "0000x.sst"
            std::string fname = TableFileName(dbname, meta->number);
            
          if (iter->Valid()) {
              WritableFile
          * file;
              
          //創建一個可寫文件
              s = env->NewWritableFile(fname, &file);
              
          if (!s.ok()) {
                
          return s;
              }

              
          //TableBuilder負責table生成和寫入
              TableBuilder* builder = new TableBuilder(options, file);
              
          //META:最小key
              meta->smallest.DecodeFrom(iter->key());
              
          for (; iter->Valid(); iter->Next()) {
                Slice key 
          = iter->key();
                
          //META:最大key
                meta->largest.DecodeFrom(key);
                
          //增加數據到builder
                builder->Add(key, iter->value());
              }

              
          // Finish and check for builder errors
              if (s.ok()) {
                
          //完成寫入
                s = builder->Finish();
                
          if (s.ok()) {
                  meta
          ->file_size = builder->FileSize();
                  assert(meta
          ->file_size > 0);
                }
              } 
          else {
                builder
          ->Abandon();
              }
              delete builder;

              
          // Finish and check for file errors
              if (s.ok()) {
                s 
          = file->Sync();
              }
              
          if (s.ok()) {
                
          //sync & close,寫入磁盤
                s = file->Close();
              }
              delete file;
              file 
          = NULL;

              
          if (s.ok()) {
                
          // Verify that the table is usable
                Iterator* it = table_cache->NewIterator(ReadOptions(),
                                                        meta
          ->number,
                                                        meta
          ->file_size);
                s 
          = it->status();
                delete it;
              }
            }

            
          // Check for input iterator errors
            if (!iter->status().ok()) {
              s 
          = iter->status();
            }

            
          if (s.ok() && meta->file_size > 0) {
              
          // Keep it
            } else {
              env
          ->DeleteFile(fname);
            }
            
          return s;
          }

          }  
          // namespace leveldb

          我們來看看TableBuilder類,主要的細節都在這個類中實現了

          TableBuilder中含有一個Rep的數據結構的指針,主要是用于保存builder的一些狀態和數據。為什么不在TableBuilder頭文件中直接定義這些變量?主要是不想暴露過多的細節給使用者,真是一個很好的做法。

          struct TableBuilder::Rep {
            Options options;
            Options index_block_options;
            WritableFile
          * file; //sstable文件指針
            uint64_t offset;
            Status status;
            BlockBuilder data_block; 
          //數據塊
            BlockBuilder index_block; //索引塊
            std::string last_key;//上一次的key,用于比較和建立索引
            int64_t num_entries; //
            bool closed;          // 是否結束
            bool pending_index_entry; //是否要新增索引塊
            BlockHandle pending_handle;  // Handle to add to index block
            std::string compressed_output;

            Rep(
          const Options& opt, WritableFile* f)
                : options(opt),
                  index_block_options(opt),
                  file(f),
                  offset(
          0),
                  data_block(
          &options),
                  index_block(
          &index_block_options),
                  num_entries(
          0),
                  closed(
          false),
                  pending_index_entry(
          false) {
              index_block_options.block_restart_interval 
          = 1;
            }
          };

          新加一條記錄:
          //增加一條數據記錄
          void TableBuilder::Add(const Slice& key, const Slice& value) {
            Rep
          * r = rep_;
            assert(
          !r->closed);
            
          if (!ok()) return;
            
          if (r->num_entries > 0) {
              
          //檢查是不是順序添加
              assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
            }

            
          if (r->pending_index_entry) { //是否生成新的index block
              
          //檢查當前是否是一個新的BLOCK
              assert(r->data_block.empty());
              
          //根據當前的key和上一個DATA BLOCK的最后一個主鍵生成最短的索引
              r->options.comparator->FindShortestSeparator(&r->last_key, key);
              std::
          string handle_encoding;
              r
          ->pending_handle.EncodeTo(&handle_encoding);
              
          //增加新的INDEX BLOCK,但不立即寫入
              r->index_block.Add(r->last_key, Slice(handle_encoding));
              r
          ->pending_index_entry = false;
            }

            r
          ->last_key.assign(key.data(), key.size());
            r
          ->num_entries++;
            r
          ->data_block.Add(key, value);

            
          const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
            
          //檢查是否已經達到BLOCK SIZE,默認4K
            if (estimated_block_size >= r->options.block_size) {
              Flush();
            }
          }

          //寫一個DATA BLOCK
          void TableBuilder::Flush() {
            Rep
          * r = rep_;
            assert(
          !r->closed);
            
          if (!ok()) return;
            
          if (r->data_block.empty()) return;
            assert(
          !r->pending_index_entry);
            WriteBlock(
          &r->data_block, &r->pending_handle);
            
          if (ok()) {
              r
          ->pending_index_entry = true;
              r
          ->status = r->file->Flush();
            }
          }

          //寫BLOCK
          void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
            
          //文件格式: 數據+類型(1個字節)+ CRC(4個字節)
            assert(ok());
            Rep
          * r = rep_;
          //生成binary
            Slice raw = block->Finish();

          //壓縮數據
            Slice block_contents;
            CompressionType type 
          = r->options.compression;
            
          switch (type) {
              
          case kNoCompression:
                block_contents 
          = raw;
                
          break;

              
          case kSnappyCompression: {
                std::
          string* compressed = &r->compressed_output;
                
          if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
                    compressed
          ->size() < raw.size() - (raw.size() / 8u)) {
                  block_contents 
          = *compressed;
                } 
          else {
                  
          // Snappy not supported, or compressed less than 12.5%, so just
                  
          // store uncompressed form
                  block_contents = raw;
                  type 
          = kNoCompression;
                }
                
          break;
              }
            }
            handle
          ->set_offset(r->offset);
            handle
          ->set_size(block_contents.size());
            r
          ->status = r->file->Append(block_contents);
            
          if (r->status.ok()) {
              
          char trailer[kBlockTrailerSize];
              trailer[
          0= type;
              uint32_t crc 
          = crc32c::Value(block_contents.data(), block_contents.size());
              crc 
          = crc32c::Extend(crc, trailer, 1);  // Extend crc to cover block type
              EncodeFixed32(trailer+1, crc32c::Mask(crc));
              r
          ->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
              
          if (r->status.ok()) {
                r
          ->offset += block_contents.size() + kBlockTrailerSize;
              }
            }
            r
          ->compressed_output.clear();
            block
          ->Reset();
          }

          完成文件的寫入:
          Status TableBuilder::Finish() {
            Rep
          * r = rep_;
            Flush();
            assert(
          !r->closed);
            r
          ->closed = true;
            BlockHandle metaindex_block_handle;
            BlockHandle index_block_handle;
            
          if (ok()) {
              
          //寫入META INDEX BLOCK
              BlockBuilder meta_index_block(&r->options);
              
          // TODO(postrelease): Add stats and other meta blocks
              WriteBlock(&meta_index_block, &metaindex_block_handle);
            }
            
          if (ok()) {
              
          if (r->pending_index_entry) {
                r
          ->options.comparator->FindShortSuccessor(&r->last_key);
                std::
          string handle_encoding;
                r
          ->pending_handle.EncodeTo(&handle_encoding);
                r
          ->index_block.Add(r->last_key, Slice(handle_encoding));
                r
          ->pending_index_entry = false;
              }
              
          //寫入索引塊
              WriteBlock(&r->index_block, &index_block_handle);
            }
            
          if (ok()) {
              
          //寫入Footer,包含META INDEX BLOCK和INDEX HANDLE的offset
              Footer footer;
              footer.set_metaindex_handle(metaindex_block_handle);
              footer.set_index_handle(index_block_handle);
              std::
          string footer_encoding;
              footer.EncodeTo(
          &footer_encoding);
              r
          ->status = r->file->Append(footer_encoding);
              
          if (r->status.ok()) {
                r
          ->offset += footer_encoding.size();
              }
            }
            
          return r->status;
          }


          這里面有兩個類BlockBuilder和BlockHandle,BlockBuilder負責把數據按照一定格式進行序列化,而BlockHandle負責記錄offset,size等,可以理解為BLOCK的文件中指針。

          我們看看BlockBuilder的實現,這里leveldb實現了前綴壓縮法,因為一個BLOCK的key很接近,所以前后兩個key相差不會很大,所以采取了<shared_size><non_shared_size><value_size><non_shared_data><value_data>的格式,節省了空間。
          其中size采用了變長格式,很有意思的格式,主要是針對小整形做的一個優化,用最多8個字節來表示4個字節的整形,每個byte的最高一個bit用來指示還有沒有后續數據,如果最高位為0,則表示沒有后續的bytes.這樣小于7F的數據只需要一個字節來表示。
          可以參考這篇文章具體看實現variant32格式。

          //完成寫入
          Slice BlockBuilder::Finish() {
            
          // 寫入restart數組,每隔options_->block_restart_interval(default:16)生成一個restart offset
            for (size_t i = 0; i < restarts_.size(); i++) {
              PutFixed32(
          &buffer_, restarts_[i]);
            }
            
          //寫入restart的大小
            PutFixed32(&buffer_, restarts_.size());
            finished_ 
          = true;
            
          return Slice(buffer_);
          }

          void BlockBuilder::Add(const Slice& key, const Slice& value) {
            Slice last_key_piece(last_key_);
            assert(
          !finished_);
            assert(counter_ 
          <= options_->block_restart_interval);
            assert(buffer_.empty() 
          // No values yet?
                   || options_->comparator->Compare(key, last_key_piece) > 0);
            size_t shared 
          = 0;
            
          //counter_內部計數器,用于記錄當前restart后的個數
            if (counter_ < options_->block_restart_interval) {
              
          //看看當前的key和上一個有多少相同的bytes
              const size_t min_length = std::min(last_key_piece.size(), key.size());
              
          while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
                shared
          ++;
              }
            } 
          else {
              
          // Restart compression
              restarts_.push_back(buffer_.size());
              counter_ 
          = 0;
            }
            
          const size_t non_shared = key.size() - shared;

            
          // 寫入 "<shared><non_shared><value_size>" to 緩沖
            PutVarint32(&buffer_, shared);
            PutVarint32(
          &buffer_, non_shared);
            PutVarint32(
          &buffer_, value.size());

            
          // 寫入 non_shared data和value
            buffer_.append(key.data() + shared, non_shared);
            buffer_.append(value.data(), value.size());

            
          // 設置 last_key_ 等于 當前的key
            last_key_.resize(shared);
            last_key_.append(key.data() 
          + shared, non_shared);
            assert(Slice(last_key_) 
          == key);
            counter_
          ++;
          }






          評論

          # re: leveldb研究4- 數據文件的格式和生成[未登錄]  回復  更多評論   

          2012-03-13 13:38 by tbw
          研究得不出啊
          主站蜘蛛池模板: 自治县| 通渭县| 永城市| 虎林市| 巴林右旗| 灵台县| 句容市| 麻江县| 唐山市| 博野县| 九江县| 汶上县| 河源市| 湖北省| 驻马店市| 新营市| 广平县| 原平市| 同德县| 泸溪县| 瑞金市| 定陶县| 尚义县| 安仁县| 阳信县| 香港 | 淮阳县| 临泽县| 青神县| 桐庐县| 门头沟区| 恩施市| 南涧| 桦川县| 格尔木市| 灵石县| 南安市| 昌乐县| 莱阳市| 邵阳县| 汕头市|