posts - 56,  comments - 12,  trackbacks - 0

          StorageWrapper 的作用:把文件片斷進一步切割為子片斷,并且為這些子片斷發送 request 消息。在獲得子片斷后,將數據寫入磁盤。

          請結合 Storage 類的分析來看。

           

          幾點說明:

          1、  為了獲取傳輸性能, BT 把文件片斷切割為多個子片斷。

          2、  BT 為獲取一個子片斷,需要向擁有該子片斷的 peer 發送 request 消息(關于 request 消息,參見《 BT 協議規范》)。

          3、 例如一個 256k 大小的片斷,索引號是 10 ,被劃分為 16 16k 大小的子片斷。那么需要為這 16 個子片斷分別產生一個 request 消息。這些 request 消息在發出之前,以 list 的形式保存在 inactive_requests 這個 list 中。例如對這個片斷,就保存在 inactive_requests 下標為 10 (片斷的索引號)的地方,值是如下的 list [(0,16k),(16k, 16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k), (112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k, 16k), (208k, 16k), (224k, 16k), (240k, 16k)] 。這個處理過程在 _make_inactive() 函數中。因為這些 request 還沒有發送出去,所以叫做 inactive request (未激活的請求)。如果一個 request 發送出去了,那么叫做 active request 。為每個片斷已經發送出去的 request 個數記錄在 numactive 中。如果收到一個子片斷,那么 active request 個數就要減 1 amount_inactive 記錄了尚沒有發出 request 的子片斷總的大小。

          4、 每當獲得一個子片段,都要寫入磁盤。如果子片斷所屬的片斷在磁盤上還沒有分配空間,那么首先需要為整個片斷分配空間。如何為片斷分配空間?這正是 StorageWrapper 類中最難理解的一部分代碼。這個“空間分配算法”說起來很簡單,但是在沒有任何注釋的情況下去看代碼,耗費了我好幾天的時間。具體的算法分析,請看 _piece_came_in() 的注釋。

           

           

          class StorageWrapper:

          def __init__(self, storage, request_size, hashes,

                      piece_size, finished, failed,

                      statusfunc = dummy_status, flag = Event(), check_hashes = True,

          data_flunked = dummy_data_flunked):

           

                  self.storage = storage              # Storage 對象

                  self.request_size = request_size # 子片斷大小

                  self.hashes = hashes        # 文件片斷摘要信息

                  self.piece_size = piece_size  # 片斷大小

                  self.data_flunked = data_flunked        # 一個函數,用來檢查片斷的完整性

                  self.total_length = storage.get_total_length()         # 文件總大小

                  self.amount_left = self.total_length        # 未下載完的文件大小

                 

                        # 文件總大小的有效性檢查

                        # 因為最后一個片斷長度可能小于 piece_size

                  if self.total_length <= piece_size * (len(hashes) - 1):

                      raise Error, 'bad data from tracker - total too small'

                  if self.total_length > piece_size * len(hashes):

                      raise Error, 'bad data from tracker - total too big'

           

                  # 兩個事件,分布在下載完成和下載失敗的時候設置

                  self.finished = finished

                  self.failed = failed

                  

                       

          這幾個變量的作用在前面已經介紹過了。

                  self.numactive = [0] * len(hashes)

                        inactive_request

          inactive_requests 的值全部被初始化為 1 ,這表示每個片斷都需要發送 request 。后面在對磁盤文件檢查之后,那些已經獲得的片斷,在 inactive_requests 中對應的是 None ,表示不需要再為這些片斷發送 request 了。

                  self.inactive_requests = [1] * len(hashes)

                  self.amount_inactive = self.total_length

           

          # 是否進入 EndGame 模式?關于 endgame 模式,在《 Incentives Build Robustness in BitTorrent 》的“片斷選擇算法”中有介紹。后面可以看到,在為最后一個“子片斷”產生請求后,進入 endgame 模式。

                  self.endgame = False 

                 

                  self.have = Bitfield(len(hashes))

                  # 該片是否檢查了完整性

                  self.waschecked = [check_hashes] * len(hashes)

                 

          這兩個變量用于“空間分配算法”

                  self.places = { }

                  self.holes = [ ]

           

                  if len(hashes) == 0:

                      finished()

                      return

           

                  targets = {}

                  total = len(hashes)

           

          # 檢查每一個片斷,,,

                  for i in xrange(len(hashes)):

           

          # 如果磁盤上,還沒有完全為這個片斷分配空間,那么這個片斷需要被下載,在 targets 字典中添加一項(如果已經存在,就不用添加了),它的關鍵字( key )是該片斷的摘要值,它的值( )是一個列表, 這個片斷的索引號被添加到這個列表中。

          這里一度讓我非常迷惑,因為一直以為不同的文件片斷肯定具有不同的摘要值。后來才想明白了,那就是:兩個不同的文件片斷,可能擁有相同的摘要值。不是么?只要這兩個片斷的內容是一樣的。

          這一點,對后面的分析非常重要。

           

          if not self._waspre(i):

                               targets.setdefault(hashes[i], []).append(i)

                          total -= 1

           

                  numchecked = 0.0

                  if total and check_hashes:

                      statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})

           

           

           

          # 這是一個內嵌在函數中的函數。在 c++ 中,可以有內部類,不過好像沒有內部函數的說法。這個函數只能在 __init__() 內部使用。

          這個函數在一個片段被確認獲得后調用

          # piece: 片斷的索引號

          # pos: 這個片斷在磁盤上存儲的位置

          例如,片斷 5 可能存儲在片斷 2 的位置上。請參看后面的“空間分配算法”

           

                  def markgot(piece, pos, self = self, check_hashes = check_hashes):

                      self.places[piece] = pos

                      self.have[piece] = True

                      self.amount_left -= self._piecelen(piece)

          self.amount_inactive -= self._piecelen(piece)

          不用再為這個片斷發送 request 消息了

                      self.inactive_requests[piece] = None

                      self.waschecked[piece] = check_hashes

                 

                  lastlen = self._piecelen(len(hashes) - 1)        # 最后一個片斷的長度

           

          # 對每一個片斷

                  for i in xrange(len(hashes)):

          # 如果磁盤上,還沒有完全為這個片斷分配空間,那么在 holes 中添加該片斷的索引號。

          if not self._waspre(i):

                          self.holes.append(i)

                              

          # 否則,也就是空間已經分配。但是還是不能保證這個片斷已經完全獲得了,正如分析 Storage 時提到的那樣,可能存在“空洞”

                              

          # 如果不需要進行有效性檢查,那么簡單調用 markgot() 表示已經獲得了該片斷。這顯然是一種不負責任的做法。

                      elif not check_hashes:

                          markgot(i, i)

           

          # 如果需要進行有效性檢查

          else:

          sha python 內置的模塊,它封裝了 SHA-1 摘要算法。 SHA-1 摘要算法對一段任意長的數據進行計算,得出一個 160bit (也就是 20 個字節)長的消息摘要。在 torrent 文件中,保存了每個片斷的消息摘要。接收方在收到一個文件片斷之后,再計算一次消息摘要,然后跟 torrent 文件中對應的值進行比較,如果結果不一致,那么說明數據在傳輸過程中發生了變化,這樣的數據應該被丟棄。

           

          這里,首先,根據片斷 i 的起始位置開始, lastlen 長的一段數據構造一個 sha 對象。

                          sh = sha(self.storage.read(piece_size * i, lastlen))

          計算這段數據的消息摘要

                          sp = sh.digest()

          然后,更新 sh 這個 sha 對象,注意,是根據片斷 i 剩下的數據來更新的。關于 sha::update() 的功能,請看 python 的幫助。如果有兩段數據 a b ,那么

          sh = sha(a)

          sh.update(b) ,等效于

          sh = sha(a+b)

          所以,下面這個表達式等于

          sh.update(self.storage.read(piece_size*i, self._piecelen(i)))

           

                          sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))

          所以,這次計算出來的就是片斷 i 的摘要

          (原來的困惑:為什么不直接計算 i 的摘要,要這么繞一下了?后來分析清楚“空間分配算法”之后,這后面一段代碼也就沒有什么問題了。)

                          s = sh.digest()

          如果計算出來的摘要和 hashes[i] 一致(后者是從 torrent 文件中獲得的),那么,這個片斷有效且已經存在于磁盤上。

                          if s == hashes[i]:

          markgot(i, i)

                                     

                          elif targets.get(s)

          and self._piecelen(i) == self._piecelen(targets[s][-1]):

                              markgot(targets[s].pop(), i)

                          elif not self.have[len(hashes) - 1]

          and sp == hashes[-1]

          and (i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):

          markgot(len(hashes) - 1, i)

           

                                      else:

                              self.places[i] = i

                           if flag.isSet():

                              return

                          numchecked += 1

                          statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})

           

          # 如果所有片斷都下載完了,那么結束。

                  if self.amount_left == 0:

                      finished()

           

           

          # 檢查某個片斷,是否已經在磁盤上分配了空間,調用的是 Storage:: was_preallocated()

          def _waspre(self, piece):

                  return self.storage.was_preallocated(piece * self.piece_size,

           self._piecelen(piece))

           

              # 獲取指定片斷的長度,只有最后一個片斷大小可能小于 piece_size

              def _piecelen(self, piece):

                  if piece < len(self.hashes) - 1:

                      return self.piece_size

                  else:

          return self.total_length - piece * self.piece_size

           

           

                 # 返回剩余文件的大小

              def get_amount_left(self):

                  return self.amount_left

           

                 # 判斷是否已經獲得了一些文件片斷

              def do_I_have_anything(self):

                  return self.amount_left < self.total_length

           

                 # 將指定片斷切割為“子片斷”

          def _make_inactive(self, index):

           

                 # 先獲取該片斷的長度

                  length = min(self.piece_size, self.total_length - self.piece_size * index)

                  l = []

                  x = 0

           

          # 為了獲得更好的傳輸性能, BT 把每個文件片斷又分為更小的“子片斷”,我們可以在 download.py 文件中 default 變量中,找到“子片斷”大小的定義:

          'download_slice_size', 2 ** 14,      "How many bytes to query for per request."

          這里定義的“子片斷”大小是 16k 。

          下面這個循環,就是將一個片斷進一步切割為“子片斷”的過程。

           

                  while x + self.request_size < length:

                      l.append((x, self.request_size))

          x += self.request_size

                  l.append((x, length - x))

           

          # l 保存到 inactive_requests 這個列表中

                  self.inactive_requests[index] = l

           

          # 是否處于 endgame 模式,關于 endgame 模式,參加《 Incentives Build Robustness in BitTorrent

              def is_endgame(self):

                  return self.endgame

           

              def get_have_list(self):

                  return self.have.tostring()

           

              def do_I_have(self, index):

                  return self.have[index]

           

          # 判斷指定的片斷,是否還有 request 沒有發出?如果有,那么返回 true ,否則返回 false 。

              def do_I_have_requests(self, index):

                  return not not self.inactive_requests[index]

           

          為指定片斷創建一個 request 消息,返回的是一個二元組,例如( 32k, 16k ),表示“子片斷”的起始位置是 32k ,大小是 16k

              def new_request(self, index):

                  # returns (begin, length)

          # 如果還沒有為該片斷創建 request 。,那么調用 _make_inactive() 創建 request 列表。( inactive_requests[index] 初始化的值是 1

                  if self.inactive_requests[index] == 1:

          self._make_inactive(index)

                       

          # numactive[index] 記錄了已經為該片斷發出了多少個 request 。

                  self.numactive[index] += 1

                  rs = self.inactive_requests[index]

           

                 # inactive_request 中移出最小的那個 request (也就是起始位置最?。?。

                  r = min(rs)

                  rs.remove(r)

           

                 # amount_inactive 記錄了尚沒有發出 request 的子片斷總的大小。

                  self.amount_inactive -= r[1]

           

                 # 如果這是最后一個“子片斷”,那么進入 endgame 模式

                  if self.amount_inactive == 0:

          self.endgame = T.rue

                 # 返回這個 request

                  return r

           

              def piece_came_in(self, index, begin, piece):

                  try:

                      return self._piece_came_in(index, begin, piece)

                  except IOError, e:

                      self.failed('IO Error ' + str(e))

                      return True

           

          如果獲得了某個“子片斷”,那么調用這個函數。

          index :“子片斷”所在片斷的索引號,

          begin :“子片斷”在片斷中的起始位置,

          piece :實際數據

          def _piece_came_in(self, index, begin, piece):

                

          # 如果之前沒有獲得過該片斷中任何“子片斷”,那么首先需要在磁盤上為整個片斷分配空間。

          空間分配的算法如下:

          假設一共是 6 個片斷,現在已經為 0 、 1 4 三個片斷分配了空間,那么

          holes [2, 3, 5]

          places {0:0, 1:1, 4:4}

          現在要為片斷 5 分配空間,思路是把片斷 5 的空間暫時先分配在片斷 2 應該在的空間上。這樣分配以后,

          holes [3, 5]

          places: {0:0, 1:1, 4:4, 5:2}

          假設下一步為片斷 2 分配空間,因為 2 的空間已經被 5 占用,所以把 5 的數據轉移到 3 上, 2 才可以使用自己的空間。這樣分配之后,

          holes [5]

          places {0:0, 1:1, 2:2, 4:4, 5:3}

          最后,為 3 分配空間,因為 3 的空間被 5 占用,所以把 5 的數據轉移到 5 自己的空間上, 3 就可以使用自己的空間了。這樣分配之后,

          holes []

          places {0:0, 1:1, 2:2, 3:3, 4:4, 5:5}

           

          下面這段比較晦澀的代碼,實現的就是這種空間分配算法。

           

                  if not self.places.has_key(index):

          n = self.holes.pop(0)

           

                      if self.places.has_key(n):

                          oldpos = self.places[n]

                          old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))

                          if self.have[n] and sha(old).digest() != self.hashes[n]:

                              self.failed('data corrupted on disk - maybe you have two copies running?')

                              return True

                          self.storage.write(self.piece_size * n, old)

                          self.places[n] = n

                          if index == oldpos or index in self.holes:

                              self.places[index] = oldpos

                          else:

                              for p, v in self.places.items():

                                  if v == index:

                                      break

                              self.places[index] = index

                              self.places[p] = oldpos

                              old = self.storage.read(self.piece_size * index, self.piece_size)

          self.storage.write(self.piece_size * oldpos, old)

                              

          elif index in self.holes or index == n:

                          if not self._waspre(n):

          self.storage.write(self.piece_size * n,

          self._piecelen(n) * chr(0xFF))

                          self.places[index] = n

                      else:

                          for p, v in self.places.items():

                              if v == index:

                                  break

                          self.places[index] = index

                          self.places[p] = n

                          old = self.storage.read(self.piece_size * index, self._piecelen(n))

                          self.storage.write(self.piece_size * n, old)

                 

                 # 調用 Stoarge::write() 將這個子片斷寫入磁盤,注意是寫到 places[index] 所在的空間上。

          self.storage.write(self.places[index] * self.piece_size + begin, piece)

           

          # 既然獲得了一個子片斷,那么發出的 request 個數顯然要減少一個。

                  self.numactive[index] -= 1

                 

          # 如果既沒有尚未發出的 request ,而且也沒有已發出的 request (每當獲得一個子片斷, numactive[index] 減少 1 numactive[index] 0 ,說明所有發出的 request 都已經接收到了響應的數據),那么顯然整個片斷已經全部獲得了。

          if not self.inactive_requests[index] and not self.numactive[index]:

                 檢查整個片斷的有效性,如果通過檢查

          if sha(self.storage.read(self.piece_size * self.places[index],

           self._piecelen(index))).digest() == self.hashes[index]:

                              

          # “我”已經擁有了這個片斷

                          self.have[index] = True

                          self.inactive_requests[index] = None

                               # 也檢查過了有效性

                          self.waschecked[index] = True

                                     

                          self.amount_left -= self._piecelen(index)

                          if self.amount_left == 0:

          self.finished()

                        如果沒有通過有效性檢查

                      else:

                          self.data_flunked(self._piecelen(index))

                               得丟棄這個片斷

                          self.inactive_requests[index] = 1

                          self.amount_inactive += self._piecelen(index)

                          return False

                  return True

           

          # 如果向某個 peer 發送的獲取“子片斷”的請求丟失了,那么調用此函數

              def request_lost(self, index, begin, length):

                  self.inactive_requests[index].append((begin, length))

                  self.amount_inactive += length

                  self.numactive[index] -= 1

           

              def get_piece(self, index, begin, length):

                  try:

                      return self._get_piece(index, begin, length)

                  except IOError, e:

                      self.failed('IO Error ' + str(e))

                      return None

           

              def _get_piece(self, index, begin, length):

                  if not self.have[index]:

                      return None

           

                  if not self.waschecked[index]:

                        # 檢查片斷的 hash 值,如果錯誤,返回 None

          if sha(self.storage.read(self.piece_size * self.places[index],

          self._piecelen(index))).digest() != self.hashes[index]:

                          self.failed('told file complete on start-up, but piece failed hash check')

                          return None

                        # 通過 hash 檢查

          self.waschecked[index] = True

           

                 # 檢查一下“子片斷”長度是否越界

                  if begin + length > self._piecelen(index):

          return None

                       

          # 調用 Storage::read() ,將該“子片斷”數據從磁盤上讀出來,返回值就是這段數據。

                  return self.storage.read(self.piece_size * self.places[index] + begin, length)
          posted on 2007-01-19 00:21 苦笑枯 閱讀(327) 評論(0)  編輯  收藏 所屬分類: P2P
          收藏來自互聯網,僅供學習。若有侵權,請與我聯系!

          <2007年1月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿(2)

          隨筆分類(56)

          隨筆檔案(56)

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 荆州市| 桐乡市| 景宁| 新宁县| 江门市| 太仆寺旗| 视频| 武定县| 梨树县| 津市市| 天等县| 安康市| 南木林县| 平和县| 揭东县| 内江市| 沅陵县| 洪雅县| 永新县| 海城市| 尉犁县| 阿坝县| 新蔡县| 新绛县| 定边县| 德州市| 德钦县| 枞阳县| 芒康县| 嫩江县| 元阳县| 阳泉市| 彝良县| 平泉县| 兰考县| 深圳市| 苍梧县| 龙里县| 什邡市| 贵德县| 云龙县|