小明思考

          Just a software engineer
          posts - 124, comments - 36, trackbacks - 0, articles - 0
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

          leveldb研究3-數據庫日志文件格式

          Posted on 2012-03-09 16:00 小明 閱讀(3645) 評論(1)  編輯  收藏 所屬分類: 分布式計算
          leveldb在每次數據庫操作之前都會把操作記錄下來。
          主要實現在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我們來具體看看實現。

          日志格式
          db\log_format.h
          log是分塊的,每塊為32K,每條記錄的記錄頭為7個字節,前四個為CRC,然后是長度(2個字節),最后是記錄類型(1個字節)
          ---------------------------------------
          BLOCK1|BLOCK2|BLOCK3|...|BLOCKN
          ---------------------------------------

          enum RecordType {
           
          // Zero is reserved for preallocated files
            kZeroType = 0,

            kFullType 
          = 1,

            
          // For fragments
            kFirstType = 2,
            kMiddleType 
          = 3,
            kLastType 
          = 4
          };
          static const int kMaxRecordType = kLastType;

          static const int kBlockSize = 32768;

          // Header is checksum (4 bytes), type (1 byte), length (2 bytes).
          static const int kHeaderSize = 4 + 1 + 2;

          }  
          // namespace log
          }  // namespace leveldb

          寫日志操作
          db\log_writer.cc
          請注意這里的處理,由于1條記錄可能超過一個BLOCK的大小,所以需要分成多個片段寫入。
          //增加一條記錄
          Status Writer::AddRecord(const Slice& slice) {
            
          const char* ptr = slice.data();
            size_t left 
          = slice.size();

            
          // Fragment the record if necessary and emit it.  Note that if slice
            
          // is empty, we still want to iterate once to emit a single
            
          // zero-length record
            Status s;
            
          bool begin = true;
            
          do {
              
          const int leftover = kBlockSize - block_offset_; //當前剩余多少字節
              assert(leftover >= 0);
              
          if (leftover < kHeaderSize) { //不夠文件頭大小7bytes
                
          // 轉入新的block
                if (leftover > 0) {
                  
          //用0來填充空白
                  assert(kHeaderSize == 7);
                  dest_
          ->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
                }
                block_offset_ 
          = 0;
              }

              
          // Invariant: we never leave < kHeaderSize bytes in a block.
              assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

              
          //avail:除掉頭還算多少字節
              const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
              
          //實際寫入大小
              const size_t fragment_length = (left < avail) ? left : avail;

              RecordType type;
              
          const bool end = (left == fragment_length); //記錄是否結束
              if (begin && end) {
                type 
          = kFullType; //完整記錄
              } else if (begin) {
                type 
          = kFirstType; //開頭
              } else if (end) {
                type 
          = kLastType; //結尾
              } else {
                type 
          = kMiddleType; //中間
              }
              
          //寫入
              s = EmitPhysicalRecord(type, ptr, fragment_length);
              ptr 
          += fragment_length;
              left 
          -= fragment_length;
              begin 
          = false;
            } 
          while (s.ok() && left > 0);
            
          return s;
          }

          //實際寫入日志文件
          Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
            assert(n 
          <= 0xffff);  // Must fit in two bytes
            assert(block_offset_ + kHeaderSize + n <= kBlockSize);

            
          // 記錄頭
            char buf[kHeaderSize];
            buf[
          4= static_cast<char>(n & 0xff);
            buf[
          5= static_cast<char>(n >> 8);
            buf[
          6= static_cast<char>(t);

            
          // 計算CRC
            uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
            crc 
          = crc32c::Mask(crc);                 // Adjust for storage
            EncodeFixed32(buf, crc);

            
          // 寫入頭部
            Status s = dest_->Append(Slice(buf, kHeaderSize));
            
          if (s.ok()) {
              
          //寫入記錄片段
              s = dest_->Append(Slice(ptr, n));
              
          if (s.ok()) {
                s 
          = dest_->Flush();
              }
            }
            block_offset_ 
          += kHeaderSize + n;
            
          return s;
          }

          讀日志操作
          這里可以看出使用BLOCK的好處,能夠減少文件IO次數,讀日志基本上就是寫日志反向過程。

          //讀取記錄,scratch為緩沖,record是結果
          bool Reader::ReadRecord(Slice* record, std::string* scratch) {
            
          if (last_record_offset_ < initial_offset_) { //需要跳過文件頭部信息,目前未實現
              if (!SkipToInitialBlock()) {
                
          return false;
              }
            }

            scratch
          ->clear();
            record
          ->clear();
            
          bool in_fragmented_record = false//是否是碎片記錄
          // Record offset of the logical record that we're reading
            
          // 0 is a dummy value to make compilers happy
            uint64_t prospective_record_offset = 0;
            Slice fragment;
            
          while (true) {
              uint64_t physical_record_offset 
          = end_of_buffer_offset_ - buffer_.size();
              
          //從文件中讀取一個BLOCK
              const unsigned int record_type = ReadPhysicalRecord(&fragment);
              
          switch (record_type) {
                
          case kFullType: //完整Record
                  if (in_fragmented_record) {
                    
          // Handle bug in earlier versions of log::Writer where
                    
          // it could emit an empty kFirstType record at the tail end
                    
          // of a block followed by a kFullType or kFirstType record
                    
          // at the beginning of the next block.
                    if (scratch->empty()) {
                      in_fragmented_record 
          = false;
                    } 
          else {
                      ReportCorruption(scratch
          ->size(), "partial record without end(1)");
                    }
                  }
                  prospective_record_offset 
          = physical_record_offset;
                  scratch
          ->clear();
                  
          *record = fragment;
                  last_record_offset_ 
          = prospective_record_offset;
                  
          return true;

                
          case kFirstType: //Record開始
                  if (in_fragmented_record) {
                    
          // Handle bug in earlier versions of log::Writer where
                    
          // it could emit an empty kFirstType record at the tail end
                    
          // of a block followed by a kFullType or kFirstType record
                    
          // at the beginning of the next block.
                    if (scratch->empty()) {
                      in_fragmented_record 
          = false;
                    } 
          else {
                      ReportCorruption(scratch
          ->size(), "partial record without end(2)");
                    }
                  }
                  prospective_record_offset 
          = physical_record_offset;
                  scratch
          ->assign(fragment.data(), fragment.size());
                  in_fragmented_record 
          = true;
                  
          break;

                
          case kMiddleType://Record中間
                  if (!in_fragmented_record) {
                    ReportCorruption(fragment.size(),
                                     
          "missing start of fragmented record(1)");
                  } 
          else {
                    scratch
          ->append(fragment.data(), fragment.size());
                  }
                  
          break;

                
          case kLastType://Record結尾
                  if (!in_fragmented_record) {
                    ReportCorruption(fragment.size(),
                                     
          "missing start of fragmented record(2)");
                  } 
          else {
                    scratch
          ->append(fragment.data(), fragment.size());
                    
          *record = Slice(*scratch);
                    last_record_offset_ 
          = prospective_record_offset;
                    
          return true;
                  }
                  
          break;

                
          case kEof://文件結束
                  if (in_fragmented_record) {
                    ReportCorruption(scratch
          ->size(), "partial record without end(3)");
                    scratch
          ->clear();
                  }
                  
          return false;

                
          case kBadRecord://壞記錄
                  if (in_fragmented_record) {
                    ReportCorruption(scratch
          ->size(), "error in middle of record");
                    in_fragmented_record 
          = false;
                    scratch
          ->clear();
                  }
                  
          break;

                
          default: {//無法識別
                  char buf[40];
                  snprintf(buf, 
          sizeof(buf), "unknown record type %u", record_type);
                  ReportCorruption(
                      (fragment.size() 
          + (in_fragmented_record ? scratch->size() : 0)),
                      buf);
                  in_fragmented_record 
          = false;
                  scratch
          ->clear();
                  
          break;
                }
              }
            }
            
          return false;
          }

          //從文件中讀取
          unsigned int Reader::ReadPhysicalRecord(Slice* result) {
            
          while (true) {
              
          if (buffer_.size() < kHeaderSize) {
                
          if (!eof_) {
                  
          // Last read was a full read, so this is a trailer to skip
                  buffer_.clear();
                  
          //讀入一個BLOCK
                  Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
                  end_of_buffer_offset_ 
          += buffer_.size();
                  
          if (!status.ok()) {
                    buffer_.clear();
                    ReportDrop(kBlockSize, status);
                    eof_ 
          = true;
                    
          return kEof;
                  } 
          else if (buffer_.size() < kBlockSize) {
                    eof_ 
          = true;
                  }
                  
          continue;
                } 
          else if (buffer_.size() == 0) {
                  
          // End of file
                  return kEof;
                } 
          else {
                  size_t drop_size 
          = buffer_.size();
                  buffer_.clear();
                  ReportCorruption(drop_size, 
          "truncated record at end of file");
                  
          return kEof;
                }
              }

              
          // 解析record頭
              const char* header = buffer_.data();
              
          const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
              
          const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
              
          const unsigned int type = header[6];
              
          const uint32_t length = a | (b << 8);
              
          if (kHeaderSize + length > buffer_.size()) {
                size_t drop_size 
          = buffer_.size();
                buffer_.clear();
                ReportCorruption(drop_size, 
          "bad record length");
                
          return kBadRecord;
              }

              
          if (type == kZeroType && length == 0) {
                
          // Skip zero length record without reporting any drops since
                
          // such records are produced by the mmap based writing code in
                
          // env_posix.cc that preallocates file regions.
                buffer_.clear();
                
          return kBadRecord;
              }

              
          // 檢查CRC
              if (checksum_) {
                uint32_t expected_crc 
          = crc32c::Unmask(DecodeFixed32(header));
                uint32_t actual_crc 
          = crc32c::Value(header + 61 + length);
                
          if (actual_crc != expected_crc) {
                  
          // Drop the rest of the buffer since "length" itself may have
                  
          // been corrupted and if we trust it, we could find some
                  
          // fragment of a real log record that just happens to look
                  
          // like a valid log record.
                  size_t drop_size = buffer_.size();
                  buffer_.clear();
                  ReportCorruption(drop_size, 
          "checksum mismatch");
                  
          return kBadRecord;
                }
              }

              buffer_.remove_prefix(kHeaderSize 
          + length);

              
          // Skip physical record that started before initial_offset_
              if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
                  initial_offset_) {
                result
          ->clear();
                
          return kBadRecord;
              }

              
          *result = Slice(header + kHeaderSize, length);
              
          return type;
            }
          }



          評論

          # re: leveldb研究3-數據庫日志文件格式  回復  更多評論   

          2012-03-12 09:23 by tb
          呵呵 不錯啊
          主站蜘蛛池模板: 合作市| 明溪县| 盖州市| 永登县| 龙门县| 宜宾市| 林口县| 余干县| 汉沽区| 怀宁县| 武宣县| 彰化市| 金坛市| 杨浦区| 肥东县| 荣成市| 金乡县| 襄樊市| 茂名市| 班玛县| 改则县| 开江县| 全椒县| 阿勒泰市| 湘西| 宁津县| 马公市| 青川县| 通辽市| 合川市| 灯塔市| 平江县| 顺平县| 湖南省| 陇南市| 南开区| 塔河县| 金坛市| 黄山市| 克山县| 崇左市|