posts - 56,  comments - 12,  trackbacks - 0

          概要
           上一節(jié)我們分析了BT客戶端主程序的框架:一個循環(huán)的服務(wù)器程序。在這個循環(huán)過程中,客戶端要完成如下任務(wù):
          ? 與 tracker 服? 務(wù)器通信;匯報自己的狀態(tài),同? 時獲取其它 peers 的信息;
          ? 接受其它 peers 的連接請求;
          ? 主動向某些 peers 發(fā)起連接請求;
          ? 對BT對等協(xié)議的分析處理;
          ? 啟用片斷選擇算法,? 通過這些主動或被動的連接去下載所需要的片斷;
          ? 為其它 peers 提供片斷上傳;啟用阻塞算法,? 阻塞某些 peers 的上傳請求;
          ? 將下載的片斷寫入磁盤;
          ? 其它任務(wù);

            這一節(jié)我們分析BT客戶端與 tracker 服務(wù)器的通信過程。在整個下載過程中,客戶端必須不斷的與 tracker 通信,報告自己的狀態(tài),同時也從 tracker 那里獲取當(dāng)前參與下載的其它 peers 的信息(ip地址、端口等)。一旦不能與 tracker 通信,則下載過程無法繼續(xù)下去。
           客戶端與 tracker 之間采用 HTTP 協(xié)議通信,客戶端把狀態(tài)信息放在 HTTP協(xié)議的GET命令的參數(shù)中傳遞給 tracker,tracker 則把 peers列表等信息放在 中傳遞給 客戶端。這種設(shè)計非常簡捷巧妙。采用HTTP協(xié)議也更使得穿越防火墻的阻攔更容易(不過實(shí)際應(yīng)用中,tracker通常不會使用 80 端口)。
           請務(wù)必參考服務(wù)器端源碼分析的幾篇相關(guān)文章(論壇中有)。
           Rerequester 類全權(quán)負(fù)責(zé)與 tracker 的通信任務(wù),我們首先看客戶端主程序 download.py中相應(yīng)的代碼:

          【download.py】
          ①rerequest = Rerequester(response['announce'], config['rerequest_interval'],
                  rawserver.add_task, connecter.how_many_connections,
                  config['min_peers'], encoder.start_connection,
                  rawserver.add_task, storagewrapper.get_amount_left,
                  upmeasure.get_total, downmeasure.get_total, listen_port,
                  config['ip'], myid, infohash, config['http_timeout'], errorfunc,
                  config['max_initiate'], doneflag, upmeasure.get_rate,
          downmeasure.get_rate,
                  encoder.ever_got_incoming
          ②rerequest.begin()

          Rerequester 類的構(gòu)造函數(shù)中傳遞了一大堆參數(shù),重點(diǎn)是 rawserver.add_task和 encoder.start_connection(rawserver和encoder 這兩個對象,在上一節(jié)中,我們已經(jīng)看到了它們的構(gòu)造過程),分別對應(yīng)RawServer::add_task() 和 Encoder::start_connection(),稍后會看到它們的作用。
          一切從 Rerequest::begin() 開始。

          【rerequester.py】

          class Rerequester:
           # 這個構(gòu)造函數(shù)傳遞了一大堆參數(shù),現(xiàn)在沒必要搞的那么清楚,關(guān)鍵是 sched 和 connect 這兩個參數(shù),對照前面的代碼,很容易知道它們是什么。
              def __init__(self, url, interval, sched, howmany, minpeers,
                      connect, externalsched, amount_left, up, down,
                      port, ip, myid, infohash, timeout, errorfunc, maxpeers, doneflag,
                      upratefunc, downratefunc, ever_got_incoming):
                  self.url = ('%s?info_hash=%s&peer_id=%s&port=%s&key=%s' %
                      (url, quote(infohash), quote(myid), str(port),
                      b2a_hex(''.join([chr(randrange(256)) for i in xrange(4)]))))
                  if ip != '':
                      self.url += '&ip=' + quote(ip)
                  self.interval = interval
                  self.last = None
                  self.trackerid = None
                  self.announce_interval = 30 * 60
                  self.sched = sched # -》RawServer::add_task()
                  self.howmany = howmany # 當(dāng)前有多少個連接 peer
                  self.minpeers = minpeers
                  self.connect = connect #-》 Encoder::start_connection()
                  self.externalsched = externalsched
                  self.amount_left = amount_left
                  self.up = up
                  self.down = down
                  self.timeout = timeout
                  self.errorfunc = errorfunc
                  self.maxpeers = maxpeers
                  self.doneflag = doneflag
                  self.upratefunc = upratefunc
                  self.downratefunc = downratefunc
                  self.ever_got_incoming = ever_got_incoming
                  self.last_failed = True
                  self.last_time = 0

          ④ def c(self):
            # 再調(diào)用一次 add_task(),把自身加入到客戶端的任務(wù)隊列中,這樣就形成了任務(wù)循環(huán),每隔一段時間,就會執(zhí)行這個函數(shù)。從而,客戶端與 tracker 的通信能夠持續(xù)的進(jìn)行下去。
                  self.sched(self.c, self.interval)
                 
                  if self.ever_got_incoming():
                      # 如果曾經(jīng)接受到過外來的連接,那么說明這個客戶端是可以接受外來連接的,也就是說很可能處于“公網(wǎng)”之中,所以并非很迫切的需要從 tracker 那里獲取 peers 列表。(既可以主動連接別人,也可以接受別人的連接,所以有點(diǎn)不緊不慢的味道)。
                      getmore = self.howmany() <= self.minpeers / 3
                  else:
                      # 如果從來沒有接受到過外來的連接,那么很有可能處于內(nèi)網(wǎng)之中,根本無法接受外來的連接。這種情況下,只有主動的去連接外部的 peers,所以要盡可能的向 tracker 請求更多的 peers。(只能去連接別人,所以更迫切一些)。
                      getmore = self.howmany() < self.minpeers
                  if getmore or time() - self.last_time > self.announce_interval:
                      # 如果有必要從 tracker 那里獲取 peers 列表,且任務(wù)執(zhí)行時間已到,則調(diào)用 announce() 開始與 tracker 通信
                      self.announce()

          ③    def begin(self):
                  # 調(diào)用前面提到的 RawServer::add_task(),把 Rerequester::c() 加入到 rawserver對象的任務(wù)隊列中
                  self.sched(self.c, self.interval)
            # 宣布與 tracker 的通信開始。。。。
                  self.announce(0)

          ⑤    def announce(self, event = None):
                  self.last_time = time()
            # 傳遞給 tracker 的信息,放在 HTTP GET 命令的參數(shù)中。所以,首先就是構(gòu)造這個參數(shù)。
            # 傳遞給 tracker 的信息,必須包括 uploaded、downloaded、left 信息。其它入 last、trackerid、numwant、compact、event 是可選的。這些參數(shù)的解釋請看 BT協(xié)議規(guī)范。
                  s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
                      (self.url, str(self.up()), str(self.down()),
                      str(self.amount_left())))
                  if self.last is not None:
                      s += '&last=' + quote(str(self.last))
                  if self.trackerid is not None:
                      s += '&trackerid=' + quote(str(self.trackerid))
                  if self.howmany() >= self.maxpeers:
                      s += '&numwant=0'
                  else:
                      s += '&compact=1'
                  if event != None:
                      s += '&event=' + ['started', 'completed', 'stopped'][event]
                  set = SetOnce().set

            # 函數(shù)中可以嵌套定義函數(shù),這是 python 語法特別的地方。
            # 調(diào)用 RawServer::add_task() 把 checkfail() 加入到任務(wù)隊列中。在 timeout 時間之后,檢查向 tracker 發(fā)的GET命令是否失敗。
                  def checkfail(self = self, set = set):
                      if set():
                          if self.last_failed and self.upratefunc() < 100 and self.downratefunc() < 100:
                              self.errorfunc('Problem connecting to tracker - timeout exceeded')
                          self.last_failed = True
                  self.sched(checkfail, self.timeout)

                  # 創(chuàng)建一個線程,執(zhí)行 Rerequester::rerequest()
            # 可見,客戶端與 tracker之間的通信是由單獨(dú)的線程完成的
                  Thread(target = self.rerequest, args = [s, set]).start()

          ⑥    def rerequest(self, url, set):
                  try:
             # 調(diào)用 urlopen() ,向 tracker 發(fā)送命令
          h = urlopen(url)
          # read() 返回 tracker 的響應(yīng)數(shù)據(jù)。
                      r = h.read()
                      h.close()
                      if set():
                          def add(self = self, r = r):
          self.last_failed = False
          # 從 tracker 響應(yīng)回來的數(shù)據(jù),由 postrequest() 處理。
          self.postrequest(r)
          # externalsched() 同樣是調(diào)用 RawServer::add_task(),這樣,由主線程調(diào)用 postrequest() 函數(shù),處理 tracker 響應(yīng)的數(shù)據(jù)。(不要忘記,我們現(xiàn)在是處在一個單獨(dú)的子線程中。)
                          self.externalsched(add, 0)
                  except (IOError, error), e:
                      if set():
                          def fail(self = self, r = 'Problem connecting to tracker - ' + str(e)):
                              if self.last_failed:
                                  self.errorfunc(r)
                              self.last_failed = True
                          self.externalsched(fail, 0)

          ⑦    def postrequest(self, data):
                  try:
             # 對服務(wù)器響應(yīng)的數(shù)據(jù)解碼。請參看BT協(xié)議規(guī)范。
          r = bdecode(data)
          # 檢查 peers 有效性?
                      check_peers(r)
                      if r.has_key('failure reason'):
                          self.errorfunc('rejected by tracker - ' + r['failure reason'])
                      else:
                          if r.has_key('warning message'):
          self.errorfunc('warning from tracker - ' + r['warning message'])

           # 根據(jù) tracker 的響應(yīng),調(diào)整BT客戶端的參數(shù)。
                          self.announce_interval = r.get('interval', self.announce_interval)
                          self.interval = r.get('min interval', self.interval)
                          self.trackerid = r.get('tracker id', self.trackerid)
                          self.last = r.get('last')

              # 將其它下載者的信息保存到 peers 數(shù)組中。
                          p = r['peers']
                          peers = []
                          if type(p) == type(''):
                              for x in xrange(0, len(p), 6):
                                  ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
                                  port = (ord(p[x+4]) << 8) | ord(p[x+5])
                                  peers.append((ip, port, None))
                          else:
                              for x in p:
                                  peers.append((x['ip'], x['port'], x.get('peer id')))
                          ps = len(peers) + self.howmany()
                          if ps < self.maxpeers:
                              if self.doneflag.isSet():
                                  if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
                                      self.last = None
                              else:
                                  if r.get('num peers', 1000) > ps * 1.2:
                                      self.last = None
                          # 這里的 connect,即前面的Encoder::start_connection(),
              # 至此,已經(jīng)成功的完成了一次與 tracker 服務(wù)器的通信,并且從它那里獲取了 peers 列表;下面就與這些 peers 挨個建立連接;至于建立連接的過程,就要追蹤到 Encoder 類中了,且聽下回分解。
                          for x in peers:
                              # x[0]:ip, x[1]:port, x[2]:id
                              self.connect((x[0], x[1]), x[2])
                  except Error, e:
                      if data != '':
                          self.errorfunc('bad data from tracker - ' + str(e))


          小結(jié):
            這篇文章,我們重點(diǎn)分析了BT客戶端與 tracker 服務(wù)器通信的過程。我們知道了,客戶端要每隔一段時間,就去連接 tracker 一次,它是以一個單獨(dú)的線程執(zhí)行的。這個線程在接受到 tracker 的響應(yīng)數(shù)據(jù)后,交給主線程(既主程序)來進(jìn)行分析。主程序從響應(yīng)數(shù)據(jù)中,獲得 peers 列表。然后調(diào)用 Encoder::start_connection() 挨個與這些 peers 嘗試建立連接。

          posted on 2007-01-19 00:23 苦笑枯 閱讀(538) 評論(0)  編輯  收藏 所屬分類: P2P
          收藏來自互聯(lián)網(wǎng),僅供學(xué)習(xí)。若有侵權(quán),請與我聯(lián)系!

          <2007年1月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿(2)

          隨筆分類(56)

          隨筆檔案(56)

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 泰宁县| 板桥市| 华安县| 汨罗市| 奉节县| 乐东| 繁峙县| 大同市| 安平县| 昌宁县| 东乌珠穆沁旗| 青海省| 浮山县| 赣州市| 岳阳县| 临清市| 双流县| 越西县| 万盛区| 祁门县| 资溪县| 泊头市| 望都县| 容城县| 喜德县| 青岛市| 福海县| 秦安县| 新余市| 三江| 石城县| 普定县| 溧水县| 资阳市| 长丰县| 广元市| 柞水县| 顺义区| 惠安县| 石台县| 招远市|