posts - 28, comments - 37, trackbacks - 0, articles - 0

          周末無事,故翻譯sheepdog design.

          原文地址: https://github.com/collie/sheepdog/wiki/Sheepdog-Design

          Sheepdog 設計

          Sheeepdog采用完全對稱的結構,沒有類似元數據服務的中心節點. 這種設計有以下的特點.

          1)       性能于容量的線性的可擴展性.

          當需要提升性能或容量時,只需向集群中增加新的機器便能使Sheeepdog線性成長.

          2)       沒有單點故障

          即使某臺機器發生故障,依然可以通過其他機器訪問數據.

          3)       容易管理

          不需要配置機器角色,當管理員在新增的機器開啟Sheepdog守護進程時, Sheepdog會自動檢測新加入的機器并配置它成為存儲系統中的一員.

          結構概述



          Sheepdog是一個分布式存儲系統.Sheepdog客戶端(QEMU的塊驅動程序)提供對象存儲(類似于簡單的鍵值對). 接下來幾章將更加詳細的闡述Sheepdog各個部分.

          1)       對象存儲(Object對象存儲(Object Storage)

          2)       )

          Sheepdog不同于一般的文件系統, Sheepdog進程為QEMU(Sheepdog進程名)創建一個分布式對象存儲系統,它可以存儲對象”.這里的對象數據大小可變,并有唯一的標識,通過標識可以進行讀//創建/刪除操作.對象存儲組成網關對象管理器”.

          3)       網關(getway)

          Getway接收QEMU塊驅動的I/O請求(對象id,偏移,長度和操作類型),通過一直散列算法獲得目標節點,然后轉發I/O請求至該節點.

          4)       對象管理器(Object manager)

          對象管理器接收getway轉發過來的I/O請求,然后對磁盤執行讀/寫操作.

          5)       集群管理器(Cluster manager)

          集群管理器管理管理節點成員(探測失敗/新增節點并報告節點成員的變化),以及一些需要節點一致的操作(vdi 創建, 快照 vdi).當前集群管理器使用corosync集群引擎.

          6)       QEMU 塊驅動

          QEMU塊驅動把VM image分為固定大小(默認4M),并通過其getway存儲到對象存儲中

          對象存儲(Object Storage)

          每個對象使用一個64bit的整數作為全局標識,并在多臺機器存有備份,QEMU塊驅動并不關心存儲的位置,對象存儲系統負責管理存儲的位置.

          對象類型(object types)

          Sheepdog的對象分為以下四種:

          1)       數據類型(data object)

          它包括虛擬磁盤映射的真實數據,虛擬磁盤映射分為固定大小的數據對象, Sheepdog客戶端訪問這個對象.

          2)       vdi object

          它包括虛擬磁盤映射的元數據(:映射名,磁盤大小,創建時間,vdi的數據對象ID).

          3)       vmstate object

          它存儲運行中的VM狀態映射.管理員通過它獲取實時快照信息.

          4)       vdi attr object

          使用它存儲各個vdi的屬性,屬性為鍵值對類型,類似于普通文件的擴展信息.

          對象ID規則(object ID rules)

          1) 0 - 31 (32 bits): 對象類型詳細信息

          2) 32 - 55 (24 bits): vdi id

          3) 56 - 59 ( 4 bits): 預留

          4) 60 - 63 ( 4 bits): 對象類型標識符

          每個VDI有一個全局唯一的ID(vdi id), 通過VDI名求得的散列值,低三十二位使用如下:

          對象類型

          32位的作用

          數據類型

          虛擬磁盤映射的索引號

          Vdi對象

          未使用(0)

          Vm狀態對象

          Vm狀態映射索引

          Vdi屬性對象

          鍵名的散列值

           

          對象格式(object format)

          1)       數據對象

          虛擬磁盤映射的塊

          2)       Vdi對象

           

           1  struct sheepdog_inode {
           2      char name[SD_MAX_VDI_LEN];               /* the name of this VDI*/
           3      char tag[SD_MAX_VDI_TAG_LEN];           /* the snapshot tag name */
           4      uint64_t ctime;                                    /* creation time of this VDI */
           5      uint64_t snap_ctime;                            /* the time snapshot is taken */
           6      uint64_t vm_clock_nsec;                       /* vm clock (used for live snapshot) */
           7      uint64_t vdi_size;                                 /* the size of VDI */
           8      uint64_t vm_state_size;                        /* the size of vm state (used for live snapshot) */
           9      uint16_t copy_policy;                           /* reserved */
          10      uint8_t  nr_copies;                              /* the number of object redundancy */
          11      uint8_t  block_size_shift;                      /* info about the size of the data object */
          12      uint32_t snap_id;                                /* the snapshot id */
          13      uint32_t vdi_id;                                  /* the vdi id */
          14      uint32_t parent_vdi_id;                        /* the parent snapshot vdi id of this VDI */
          15      uint32_t child_vdi_id[MAX_CHILDREN];    /* the children VDIs of this VDI */
          16      uint32_t data_vdi_id[MAX_DATA_OBJS];   /* the data object IDs this VDI contains*/
          17  };

          3)       Vm狀態對象

          Vm狀態映射塊

          4)       Vdi屬性對象

          SD_MAX_VDI_ATTR_KEY_LEN(256)為屬性的鍵名,余下的是屬性指.

          只讀/可寫對象(read-only/writable objects)

          從如何訪問對象的角度,我們還可以把Sheepdog對象分為以下兩類.

          1)       只讀對象(:VDI快照數據對象)

          只允許一個VM對其讀寫,其他vm無法訪問

          2)       可寫對象

          不允許任何VM對其寫,所有VM都可讀

          其他功能(other features)

          Sheepdog對象存儲可接收正在寫時復制(copy-on-write)的請求.當一個客戶端發送一個創建和寫的請求時,同時可以指定基本對象(CoW操作的來源),這用于快照和克隆操作.

          網關(Gateway)

          對象存在哪(where to store objects)

          Sheepdog使用一致性哈希算法決定存放對象的位置,一致性哈希算法提供哈希表,而且增加或介紹節點不回顯著的改變對象映射,通過哈希表能使I/O負載均衡.

          副本(replication)

          Sheepdog的數據副本很簡單,我們假設只有一個寫,所以寫沖突不會發生,客戶端可以并行發生請求到目標節點,發生讀請求到一個目標節點如果客戶端自己處理I/O請求順序.

          I/O(write I/O flow)

          Getway使用一致性哈希算法計算目標節點并發送寫請求到所有目標節點,只有所有副本均更新成功寫請求才會成功,這是因為如果一個副本未更新,getway有可能從未更新的節點讀取舊數據.

          I/O(read I/O flow)

          Getway使用一致性哈希算法計算目標節點,并發送讀請求到一個目標節點.

          1)       修復對象一致性

          當某節點在轉發I/O請求時crash,有可能破壞副本的一致性,所以當getway第一次讀取對象時會試圖修復其一致性,從一節點讀取整個對象并用它覆蓋所有副本.

          重試I/O請求(retrying I/O requests)

          Sheepdog存儲所有成員節點的歷史信息,我們把歷史版本號叫做”epoch”(詳見章節對象恢復’). 如果getway轉發I/O請求至目標節點并且getway與目標節點epoch號不相符,I/O請求失敗且getway重試請求直到epcho號相符,這就需要保持副本強一致性.

          I/O重試也可能發生在目標節點掛了導致無法完成I/O操作.

          對象管理器(Object Manager)

          對象管理器把對象存儲到本地磁盤,當前把每個對象存儲為一個文件,這中方法簡單.我們也可以使用DBMS(: BerkeleyDB, Tokyo Cabinet) 作為對象存儲器,但還不支持.

          路徑命名規則(path name rule)

          對象存儲成如下路徑:

                  /store_dir/obj/[epoch number]/[object ID]

          所有對象文件有一個擴展屬性: 副本數(sheepdog.copies),

          寫日志(write journaling)

          sheep進程在寫操作過程中失敗,對象有可能至少部分更新,一般情況這不會有問題,因為如果VM未接收成功消息,不保證所寫部分的內容.然而對于vdi對象,我們必須整體更新或整體未更新,因為如果vdi對象只是部分更新,VDI的元數據有可能被破壞. 為例防止這個問題,我們使用日志記錄對vdi對象的寫操作. 日志過程很簡單:

          1)       創建日志文件"/store_dir/journal/[epoch]/[vdi object id]"

          2)       首先寫數據到日志文件

          3)       寫一個數據到vdi對象

          4)       刪除日志文件

          集群管理器(Cluster Manager)

          大多情況, Sheepdo客戶端單獨訪問它們的映射因為我們不允許兩個客戶端同時訪問一個映射,但是某些VDI操作(:克隆VDI,創建VDI)必須做,因為這些操作更新全局信息,我們使用Corosync集群引擎完成而不是中心服務器.

          我們將擴展Sheepdog以支持其他集群管理系統.

          本章正在編輯

          QEMU 塊驅動(QEMU Block Driver)

          Sheepdog卷被分為4M的數據對象,剛創建的對象未分配,也就是說,只有寫對象被分配.

          Open

          首先QEMU塊驅動通過getwaybdrv_open()從對象存儲讀取vdi

          /(read/write)

          塊驅動通過請求的部分偏移量和大小計算數據對象id, 并向getway發送請求. 當塊驅動發送寫請求到那些不屬于其當前vdi的數據對象是,塊驅動發送CoW請求分配一個新的數據對象.

          寫入快照vdi(write to snapshot vdi)

          我們可以把快照VDI附加到QEMU, 當塊驅動第一次發送寫請求到快照VDI, 塊驅動創建一個新的可寫VDI作為子快照,并發送請求到新的VDI.

          VDI操作(VDI Operations)

          查找(lookup)

          當查找VDI對象時:

          1)       通過求vdi名的哈希值得到vdi id

          2)       通過vdi id計算di對象

          3)       發送讀請求到vdi對象

          4)       如果此vdi不是請求的那個,增加vdi id并重試發送讀請求

          快照,克隆(snapshot, cloning)

          快照可克隆操作很簡單,

          1)       讀目標VDI

          2)       創建一個與目標一樣的新VDI

          3)       把新vdi‘'parent_vdi_id''設為目標VDIid

          4)       設置目標vdi''child_vdi_id''為新vdiid.

          5)       設置目標vdi''snap_ctime''為當前時間, vdi變為當前vdi對象

          刪除(delete)

          TODO:當前,回收未使用的數據對象是不會被執行,直到所有相關VDI對象(相關的快照VDI和克隆VDI)被刪除.

          所有相關VDI被刪除后, Sheepdog刪除所有此VDI的數據對象,設置此VDI對象名為空字符串.

          對象恢復(Object Recovery)

          epoch

          Sheepdog把成員節點歷史存儲在存儲路徑, 路徑名如下:

                  /store_dir/epoch/[epoch number]

          每個文件包括節點在epoch的列表信息(IP地址,端口,虛擬節點個數).

          恢復過程(recovery process)

          1)       從所有節點接收存儲對象ID

          2)       計算選擇那個對象

          3)       創建對象ID list文件"/store_dir/obj/[the current epoch]/list"

          4)       發送一個讀請求以獲取id存在于list文件的對象. 這個請求被發送到包含前一次epoch的對象的節點.( The requests are sent to the node which had the object at the previous epoch.)

          5)       把對象存到當前epoch路徑.

          沖突的I/O(conflicts I/Os)

          如果QEMU發送I/O請求到某些未恢復的對象, Sheepdog阻塞此請求并優先恢復對象.

          協議(Protocol)

          Sheepdog的所有請求包含固定大小的頭部(48)和固定大小的數據部分,頭部包括協議版本,操作碼,epoch,數據長度等.

          between sheep and QEMU

          操作碼

          描述

          SD_OP_CREATE_AND_WRITE_OBJ

          發送請求以創建新對象并寫入數據,如果對象存在,操作失敗

          SD_OP_READ_OBJ

          讀取對象中的數據

          SD_OP_WRITE_OBJ

          向對象寫入數據,如果對象不存在,失敗

          SD_OP_NEW_VDI

          發送vdi名到對象存儲并創建新vdi對象, 返回應答vdi的唯一的vdi id

          SD_OP_LOCK_VDI

          SD_OP_GET_VDI_INFO相同

          SD_OP_RELEASE_VDI

          未使用

          SD_OP_GET_VDI_INFO

          獲取vdi信息(:vdi id)

          SD_OP_READ_VDIS

          獲取已經使用的vdi id

          between sheep and collie

          操作碼

          描述

          SD_OP_DEL_VDI

          刪除VDI

          SD_OP_GET_NODE_LIST

          獲取sheepdog的節點列表

          SD_OP_GET_VM_LIST

          未使用

          SD_OP_MAKE_FS

          創建sheepdog集群

          SD_OP_SHUTDOWN

          停止sheepdog集群

          SD_OP_STAT_SHEEP

          獲取本地磁盤使用量

          SD_OP_STAT_CLUSTER

          獲取sheepdog集群信息

          SD_OP_KILL_NODE

          退出sheep守護進程

          SD_OP_GET_VDI_ATTR

          獲取vdi屬性對象id

          between sheeps

          操作碼

          描述

          SD_OP_REMOVE_OBJ

          刪除對象

          SD_OP_GET_OBJ_LIST

          獲取對象id列表,并存儲到目標節點

           

          posted @ 2011-08-28 17:02 俞靈 閱讀(3284) | 評論 (0)編輯 收藏

          Ant是什么?
          Ant是一種基于Java和XML的build工具.
          1 編寫build.xml

          Ant的buildfile是用XML寫的.每個buildfile含有一個project.

          buildfile中每個task元素可以有一個id屬性,可以用這個id值引用指定的任務.這個值是唯一的.(詳情請參考下面的Task小節)

          1.1 Projects

          project有下面的屬性:
          Attribute Description Required
          name 項目名稱. No
          default 當沒有指定target時使用的缺省target Yes
          basedir 用于計算所有其他路徑的基路徑.該屬性可以被basedir property覆蓋.當覆蓋時,該屬性被忽略.如果屬性和basedir property都沒有設定,就使用buildfile文件的父目錄. No


          項目的描述以一個頂級的<description>元素的形式出現(參看description小節).

          一個項目可以定義一個或多個target.一個target是一系列你想要執行的.執行Ant時,你可以選擇執行那個target.當沒有給定target時,使用project的default屬性所確定的target.

          1.2 Targets

          一個target可以依賴于其他的target.例如,你可能會有一個target用于編譯程序,一個target用于生成可執行文件.你在生成可執行文件之前先編譯通過,生成可執行文件的target依賴于編譯target.Ant會處理這種依賴關系.

          然而,應當注意到,Ant的depends屬性只指定了target應該被執行的順序-如果被依賴的target無法運行,這種depends對于指定了依賴關系的target就沒有影響.

          Ant會依照depends屬性中target出現的順序(從左到右)依次執行每個target.然而,要記住的是只要某個target依賴于一個target,后者就會被先執行.
          <target name="A"/>
          <target name="B" depends="A"/>
          <target name="C" depends="B"/>
          <target name="D" depends="C,B,A"/>
          假定我們要執行target D.從它的依賴屬性來看,你可能認為先執行C,然后B,A被執行.錯了,C依賴于B,B依賴于A,先執行A,然后B,然后C,D被執行.

          一個target只能被執行一次,即時有多個target依賴于它(看上面的例子).

          如 果(或如果不)某些屬性被設定,才執行某個target.這樣,允許根據系統的狀態(java version, OS, 命令行屬性定義等等)來更好地控制build的過程.要想讓一個target這樣做,你就應該在target元素中,加入if(或unless)屬性,帶 上target因該有所判斷的屬性.例如:
          <target name="build-module-A" if="module-A-present"/>
          <target name="build-own-fake-module-A" unless="module-A-present"/>
          如果沒有if或unless屬性,target總會被執行.

          可選的description屬性可用來提供關于target的一行描述,這些描述可由-projecthelp命令行選項輸出.

          將你的tstamp task在一個所謂的初始化target是很好的做法,其他的target依賴這個初始化target.要確保初始化target是出現在其他target依賴表中的第一個target.在本手冊中大多數的初始化target的名字是"init".

          target有下面的屬性:
          Attribute Description Required
          name target的名字 Yes
          depends 用逗號分隔的target的名字列表,也就是依賴表. No
          if 執行target所需要設定的屬性名. No
          unless 執行target需要清除設定的屬性名. No
          description 關于target功能的簡短描述. No


          1.3 Tasks

          一個task是一段可執行的代碼.

          一個task可以有多個屬性(如果你愿意的話,可以將其稱之為變量).屬性只可能包含對property的引用.這些引用會在task執行前被解析.

          下面是Task的一般構造形式:
          <name attribute1="value1" attribute2="value2" ... />
          這里name是task的名字,attributeN是屬性名,valueN是屬性值.

          有一套內置的(built-in)task,以及一些可選task,但你也可以編寫自己的task.

          所有的task都有一個task名字屬性.Ant用屬性值來產生日志信息.

          可以給task賦一個id屬性:
          <taskname id="taskID" ... />
          這里taskname是task的名字,而taskID是這個task的唯一標識符.通過這個標識符,你可以在腳本中引用相應的task.例如,在腳本中你可以這樣:
          <script ... >
          task1.setFoo("bar");
          </script>
          設定某個task實例的foo屬性.在另一個task中(用java編寫),你可以利用下面的語句存取相應的實例.
          project.getReference("task1").
          注意1:如果task1還沒有運行,就不會被生效(例如:不設定屬性),如果你在隨后配置它,你所作的一切都會被覆蓋.

          注意2:未來的Ant版本可能不會兼容這里所提的屬性,很有可能根本沒有task實例,只有proxies.

          1.4 Properties

          一個project可以有很多的properties.可以在buildfile中用 property task來設定,或在Ant之外設定.一個property有一個名字和一個值.property可用于task的屬性值.這是通過將屬性名放在"${" 和"}"之間并放在屬性值的位置來實現的.例如如果有一個property builddir的值是"build",這個property就可用于屬性值:${builddir}/classes.這個值就可被解析為build /classes.

          內置屬性

          如果你使用了<property> task 定義了所有的系統屬性,Ant允許你使用這些屬性.例如,${os.name}對應操作系統的名字.

          要想得到系統屬性的列表可參考the Javadoc of System.getProperties.

          除了Java的系統屬性,Ant還定義了一些自己的內置屬性:
          basedir project基目錄的絕對路徑 (與<project>的basedir屬性一樣).
          ant.file buildfile的絕對路徑.
          ant.version Ant的版本.
          ant.project.name 當前執行的project的名字;由<project>的name屬性設定.
          ant.java.version Ant檢測到的JVM的版本; 目前的值有"1.1", "1.2", "1.3" and "1.4".

          例子
          <project name="MyProject" default="dist" basedir=".">

          <!-- set global properties for this build -->
          <property name="src" value="."/>
          <property name="build" value="build"/>
          <property name="dist" value="dist"/>

          <target name="init">
          <!-- Create the time stamp -->
          <tstamp/>
          <!-- Create the build directory structure used by compile -->
          <mkdir dir="${build}"/>
          </target>

          <target name="compile" depends="init">
          <!-- Compile the java code from ${src} into ${build} -->
          <javac srcdir="${src}" destdir="${build}"/>
          </target>

          <target name="dist" depends="compile">
          <!-- Create the distribution directory -->
          <mkdir dir="${dist}/lib"/>
          <!-- Put everything in ${build} into the MyProject-${DSTAMP}.jar file -->
          <jar jarfile="${dist}/lib/MyProject-${DSTAMP}.jar" basedir="${build}"/>
          </target>

          <target name="clean">
          <!-- Delete the ${build} and ${dist} directory trees -->
          <delete dir="${build}"/>
          <delete dir="${dist}"/>
          </target>

          </project>

          1.5 Path-like Structures
          你可以用":"和";"作為分隔符,指定類似PATH和CLASSPATH的引用.Ant會把分隔符轉換為當前系統所用的分隔符.

          當需要指定類似路徑的值時,可以使用嵌套元素.一般的形式是
          <classpath>
          <pathelement path="${classpath}"/>
          <pathelement location="lib/helper.jar"/>
          </classpath>
          location屬性指定了相對于project基目錄的一個文件和目錄,而path屬性接受逗號或分號分隔的一個位置列表.path屬性一般用作預定義的路徑--其他情況下,應該用多個location屬性.

          為簡潔起見,classpath標簽支持自己的path和location屬性.
          <classpath>
          <pathelement path="${classpath}"/>
          </classpath>
          可以被簡寫作:
          <classpath path="${classpath}"/>
          也可通過<fileset>元素指定路徑.構成一個fileset的多個文件加入path-like structure的順序是未定的.
          <classpath>
          <pathelement path="${classpath}"/>
          <fileset dir="lib">
          <include name="**/*.jar"/>
          </fileset>
          <pathelement location="classes"/>
          </classpath>
          上面的例子構造了一個路徑值包括:${classpath}的路徑,跟著lib目錄下的所有jar文件,接著是classes目錄.

          如果你想在多個task中使用相同的path-like structure,你可以用<path>元素定義他們(與target同級),然后通過id屬性引用--參考Referencs例子.

          path-like structure可能包括對另一個path-like structurede的引用(通過嵌套<path>元素):
          <path id="base.path">
          <pathelement path="${classpath}"/>
          <fileset dir="lib">
          <include name="**/*.jar"/>
          </fileset>
          <pathelement location="classes"/>
          </path>
          <path id="tests.path">
          <path refid="base.path"/>
          <pathelement location="testclasses"/>
          </path>
          前面所提的關于<classpath>的簡潔寫法對于<path>也是有效的,如:
          <path id="tests.path">
          <path refid="base.path"/>
          <pathelement location="testclasses"/>
          </path>
          可寫成:
          <path id="base.path" path="${classpath}"/>
          命令行變量

          有些task可接受參數,并將其傳遞給另一個進程.為了能在變量中包含空格字符,可使用嵌套的arg元素.
          Attribute Description Required
          value 一個命令行變量;可包含空格字符. 只能用一個
          line 空格分隔的命令行變量列表.
          file 作為命令行變量的文件名;會被文件的絕對名替代.
          path 一個作為單個命令行變量的path-like的字符串;或作為分隔符,Ant會將其轉變為特定平臺的分隔符.

          例子
          <arg value="-l -a"/>
          是一個含有空格的單個的命令行變量.
          <arg line="-l -a"/>
          是兩個空格分隔的命令行變量.
          <arg path="/dir;/dir2:dir3"/>
          是一個命令行變量,其值在DOS系統上為dir;dir2;dir3;在Unix系統上為/dir:/dir2:/dir3 .

          References

          buildfile元素的id屬性可用來引用這些元素.如果你需要一遍遍的復制相同的XML代碼塊,這一屬性就很有用--如多次使用<classpath>結構.

          下面的例子:
          <project ... >
          <target ... >
          <rmic ...>
          <classpath>
          <pathelement location="lib/"/>
          <pathelement path="${java.class.path}/"/>
          <pathelement path="${additional.path}"/>
          </classpath>
          </rmic>
          </target>
          <target ... >
          <javac ...>
          <classpath>
          <pathelement location="lib/"/>
          <pathelement path="${java.class.path}/"/>
          <pathelement path="${additional.path}"/>

          </classpath>
          </javac>
          </target>
          </project>
          可以寫成如下形式:
          <project ... >
          <path id="project.class.path">
          <pathelement location="lib/"/>
          <pathelement path="${java.class.path}/"/>
          <pathelement path="${additional.path}"/>
          </path>
          <target ... >
          <rmic ...>
          <classpath refid="project.class.path"/>
          </rmic>
          </target>
          <target ... >
          <javac ...>
          <classpath refid="project.class.path"/>
          </javac>
          </target>
          </project>
          所有使用PatternSets, FileSets 或 path-like structures嵌套元素的task也接受這種類型的引用.

          轉自:
          http://www.linux521.com/2009/java/200904/1760.html

          posted @ 2011-08-07 11:46 俞靈 閱讀(1149) | 評論 (0)編輯 收藏

          最近看了hadoop進度顯示部分代碼,做了個ppt算是一個總結吧.
          /Files/shenh062326/hadoop進度計算.ppt

          posted @ 2011-07-03 09:44 俞靈 閱讀(324) | 評論 (0)編輯 收藏

          本文轉自 http://coolshell.cn/articles/3463.html

          對于SQL的Join,在學習起來可能是比較亂的。我們知道,SQL的Join語法有很多inner的,有outer的,有left的,有時候,對于Select出來的結果集是什么樣子有點不是很清楚。Coding Horror上有一篇文章(實在不清楚為什么Coding Horror也被墻)通過 文氏圖 Venn diagrams 解釋了SQL的Join。我覺得清楚易懂,轉過來。

          假設我們有兩張表。

          • Table A 是左邊的表。
          • Table B 是右邊的表。

          其各有四條記錄,其中有兩條記錄是相同的,如下所示:

          id name       id  name
          -- ----       --  ----
          1  Pirate     1   Rutabaga
          2  Monkey     2   Pirate
          3  Ninja      3   Darth Vader
          4  Spaghetti  4   Ninja

          下面讓我們來看看不同的Join會產生什么樣的結果。

          SELECT * FROM TableA
          INNER JOIN TableB
          ON TableA.name = TableB.name
          
          id  name       id   name
          --  ----       --   ----
          1   Pirate     2    Pirate
          3   Ninja      4    Ninja

          Inner join
          產生的結果集中,是A和B的交集。

          Venn diagram of SQL inner join
          SELECT * FROM TableA
          FULL OUTER JOIN TableB
          ON TableA.name = TableB.name
          
          id    name       id    name
          --    ----       --    ----
          1     Pirate     2     Pirate
          2     Monkey     null  null
          3     Ninja      4     Ninja
          4     Spaghetti  null  null
          null  null       1     Rutabaga
          null  null       3     Darth Vader

          Full outer join 產生A和B的并集。但是需要注意的是,對于沒有匹配的記錄,則會以null做為值。

          Venn diagram of SQL cartesian join
          SELECT * FROM TableA
          LEFT OUTER JOIN TableB
          ON TableA.name = TableB.name
          
          id  name       id    name
          --  ----       --    ----
          1   Pirate     2     Pirate
          2   Monkey     null  null
          3   Ninja      4     Ninja
          4   Spaghetti  null  null

          Left outer join 產生表A的完全集,而B表中匹配的則有值,沒有匹配的則以null值取代。

          Venn diagram of SQL left join
          SELECT * FROM TableA
          LEFT OUTER JOIN TableB
          ON TableA.name = TableB.name
          WHERE TableB.id IS null 
          
          id  name       id     name
          --  ----       --     ----
          2   Monkey     null   null
          4   Spaghetti  null   null

          產生在A表中有而在B表中沒有的集合。

          join-left-outer.png
          SELECT * FROM TableA
          FULL OUTER JOIN TableB
          ON TableA.name = TableB.name
          WHERE TableA.id IS null
          OR TableB.id IS null
          
          id    name       id    name
          --    ----       --    ----
          2     Monkey     null  null
          4     Spaghetti  null  null
          null  null       1     Rutabaga
          null  null       3     Darth Vader

          產生A表和B表都沒有出現的數據集。

          join-outer.png

          還需要注冊的是我們還有一個是“交差集” cross join, 這種Join沒有辦法用文式圖表示,因為其就是把表A和表B的數據進行一個N*M的組合,即笛卡爾積。表達式如下:

          SELECT * FROM TableA
          CROSS JOIN TableB

          這個笛卡爾乘積會產生 4 x 4 = 16 條記錄,一般來說,我們很少用到這個語法。但是我們得小心,如果不是使用嵌套的select語句,一般系統都會產生笛卡爾乘積然再做過濾。這是對于性能來說是非常危險的,尤其是表很大的時候。

          posted @ 2011-07-01 15:06 俞靈 閱讀(302) | 評論 (0)編輯 收藏

          簡單的jar使用方法
          JAR files are packaged with the ZIP file format, so you can use them for "ZIP-like" tasks such as lossless data compression, archiving, decompression, and archive unpacking. These are among the most common uses of JAR files, and you can realize many JAR file benefits using only these basic features.

          Even if you want to take advantage of advanced functionality provided by the JAR file format such as electronic signing, you'll first need to become familiar with the fundamental operations.

          To perform basic tasks with JAR files, you use the Java Archive Tool provided as part of the Java Development Kit. Because the Java Archive tool is invoked by using the jar command, for convenience we'll call it the "Jar tool".

          As a synopsis and preview of some of the topics to be covered in this lesson, the following table summarizes common JAR-file operations:

          OperationCommand
          To create a JAR filejar cf jar-file input-file(s)
          To view the contents of a JAR filejar tf jar-file
          To extract the contents of a JAR filejar xf jar-file
          To extract specific files from a JAR filejar xf jar-file archived-file(s)
          To run an application packaged as a JAR file 
          (version 1.1)
          jre -cp app.jar MainClass
          To run an application packaged as a JAR file 
          (version 1.2 -- requires Main-Class
          manifest header)
          java -jar app.jar
          To invoke an applet packaged as a JAR file
          <applet code=AppletClassName.class
                  archive="JarFileName.jar"
                  width=width height=height>
          </applet>

          posted @ 2011-07-01 11:18 俞靈 閱讀(762) | 評論 (0)編輯 收藏

          SIEGE is an http load tester and benchmarking utility. It was designed to let web developers measure the performance of their code under duress, to see how it will stand up to load on the internet. It lets the user hit a web server with a configurable number of concurrent simulated users. Those users place the server "under siege." 

          posted @ 2011-03-23 11:15 俞靈 閱讀(170) | 評論 (0)編輯 收藏

          下載與安裝 
          去官網下載tar  或Google 找RPM 皆可,個人都習慣用tar  裝,安裝方法同一般的程式 
          $>./configure --prefix=/usr/local 
          $>make 
          $>make install 
          Complier 過程中會有幾個Warning,但是對整個環境并沒有影響.基本上安裝部份都不會有什么問題, rrdtool 的tarball 內即可附了libgd,zlib 等自用的lib,不會像mrtg FAQ一樣裝好了試一下打rrdtool ,看會不會出現類似訊息 
          以下是rrdtool中一些參數的解釋:
          1. DS :DS 用于定義 Data Soure 。也就是用于存放腳本的結果的變量名(DSN)。
          2. DST :DST 就是 Data Source Type 的意思。有 COUNTER、GUAGE、DERIVE、ABSOLUTE、COMPUTE 5種。
          3. RRA :RRA 用于指定數據如何存放。我們可以把一個RRA 看成一個表,各保存不同 interval 的統計結果
          4. PDP :Primary Data Point 。正常情況下每個 interval RRDtool 都會收到一個值;RRDtool 在收到腳本給來的值后 會計算出另外一個值(例如平均值),這個 值就是 PDP,這個值代表的一般是“xxx/秒”的含義
          5. CF :CF 就是 Consolidation Function 的縮寫。也就是合并(統計)功能。有 AVERAGE、MAX、MIN、LAST 四種分別表示對多個PDP 進行取平均、取最大值、取最小值、取當前值四種類型
          6. CDP :Consolidation Data Point 。RRDtool 使用多個 PDP 合并為(計算出)一個 CDP。也就是執行上面 的CF 操作后的結果。這個值就是存入 RRA的數據,繪圖時使用的也是這些數據
          7. xff:是 xfile factor 的縮寫。定義:he xfiles factor defines what part of a consolidation interval may be made up from *UNKNOWN* data while the consolidated value is still regarded as known. It is given as the ratio of allowed *UNKNOWN* PDPs to。Xff 字段實際就是一個比例值。0.5 表示一個 CDP 中的所有 PDP 如果超過一半的值為 UNKNOWN ,則該 CDP 的值就被標為 UNKNOWN。
          8. step :就是 RRDtool “期望” 每隔多長時間就收到一個值

          posted @ 2011-03-09 08:55 俞靈 閱讀(602) | 評論 (1)編輯 收藏

          最近看了hadoop的mapreduce部分代碼,看了之后總結了一下,算是成果吧。以下是程序執行的主要流程,其中參考了網上的一些文章。


          概括

          Hadoop包括hdfsmapreduce兩部分,在試用期期間我主要看了mapreduce部分,即hadoop執行作業的部分。

          1. mapreduce中幾個主要的概念

                 mapreduce整體上可以分為這么幾條執行的線索,jobclientJobTrackerTaskTracker

            1. JobClient

                         每一個job都會在客戶端通過JobClient類將應用程序以及配置參數打包成jar文件存儲在HDFS,然后向JobTracker提交作業,JobTracker創建Task(即MapTaskReduceTask)并將它們分發到各個TaskTracker服務中去執行。


            1. JobTracker

                          JobTracker是一個master服務,hadoop服務端啟動之后JobTracker接收job,負責調度job的每一個子任務task運行于TaskTracker上,并監控它們,如果發現有失敗的task就重新運行它。一般情況應該把JobTracker部署在單獨的機器上。


            1. TaskTracker

                         TaskTracker是運行于多個節點上的slaver服務。TaskTracker主動與JobTracker通信,接收作業,并負責直接執行每一個任務。

          下圖簡單的描述了三者之間的關系:(上傳不了圖片,抱歉!)


          1. 數據結構

          2.1 JobInProgress

          JobClient提交job后,JobTracker會創建一個JobInProgress來跟蹤和調度這個job,并把它添加到job隊列里。JobInProgress會根據提交的job jar中定義的輸入數據集(已分解成FileSplit)創建對應的一批TaskInProgress用于監控和調度MapTask,同時在創建指定數目的TaskInProgress用于監控和調度ReduceTask,缺省為1ReduceTask


          2.2 TaskInProgress

          JobTracker啟動任務時通過每一個TaskInProgress來launchTask,這時會把Task對象(即MapTaskReduceTask)序列化寫入相應的TaskTracker服務中,TaskTracker收到后會創建對應的TaskInProgress(此TaskInProgress實現非JobTracker中使用的TaskInProgress,作用類似)用于監控和調度該Task。啟動具體的Task進程是通過TaskInProgress管理的TaskRunner對象來運行的。TaskRunner會自動裝載job jar,并設置好環境變量后啟動一個獨立的java child進程來執行Task,即MapTask或者ReduceTask,但它們不一定運行在同一個TaskTracker中。


          2.3 MapTaskReduceTask

          一個完整的job會自動依次執行MapperCombiner(在JobConf指定了Combiner時執行)和Reducer,其中MapperCombiner是由MapTask調用執行,Reducer則由ReduceTask調用,Combiner實際也是Reducer接口類的實現。Mapper會根據job jar中定義的輸入數據集按<key1,value1>對讀入,處理完成生成臨時的<key2,value2>對,如果定義了CombinerMapTask會在Mapper完成調用該Combiner將相同key的值做合并處理,以減少輸出結果集。MapTask的任務全完成即交給ReduceTask進程調用Reducer處理,生成最終結果<key3,value3>對。

           

          1. 整體流程

          一道MapRedcue作業是通過JobClient.rubJob(job)master節點的JobTracker提交的, JobTracker接到JobClient的請求后把其加入作業隊列中。JobTracker一直在等待JobClient通過RPC提交作業,TaskTracker一直通過RPCJobTracker發送心跳heartbeat詢問有沒有任務可做,如果有,讓其派發任務給它執行。如果JobTracker的作業隊列不為空, TaskTracker發送的心跳將會獲得JobTracker給它派發的任務。這是一道pull過程。slave節點的TaskTracker接到任務后在其本地發起Task,執行任務。以下是簡略示意圖:



          下圖比較詳細的解釋了程序的流程:



           

          1. Jobclient

          在編寫MapReduce程序時通常是上是這樣寫的:

          Configuration conf = new Configuration(); // 讀取hadoop配置

          Job job = new Job(conf, "作業名稱"); // 實例化一道作業

          job.setMapperClass(Mapper類型);

          job.setCombinerClass(Combiner類型);

          job.setReducerClass(Reducer類型);

          job.setOutputKeyClass(輸出Key的類型);

          job.setOutputValueClass(輸出Value的類型);

          FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑));

          FileOutputFormat.setOutputPath(job, new Path(輸出hdfs路徑));

          // 其它初始化配置

          JobClient.runJob(job);

          4.1 配置Job

          JobConf是用戶描述一個job的接口。下面的信息是MapReduce過程中一些較關鍵的定制信息:


          4.2 JobClient.runJob():運行Job并分解輸入數據集


          runJob()提交作業,如何等待返回的狀態,根據狀態返回不同的結構給客戶端。

          其中runJob()使用submitJob(job)方法向 master提交作業。

          submitJob(Job)方法的流程



           

          一個MapReduceJob會通過JobClient類根據用戶在JobConf類中定義的InputFormat實現類來將輸入的數據集分解成一批小的數據集,每一個小數據集會對應創建一個MapTask來處理。JobClient會使用缺省的FileInputFormat類調用FileInputFormat.getSplits()方法生成小數據集,如果判斷數據文件是isSplitable()的話,會將大的文件分解成小的FileSplit,當然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會統一打包到jobFilejar中。


          hadoop分布系統文件系統hdfs依次上傳三個文件: job.jar, job.splitjob.xml。 

          job.xml: 作業配置,例如Mapper, Combiner, Reducer的類型,輸入輸出格式的類型等。 

          job.jar: jar,里面包含了執行此任務需要的各種類,比如 Mapper,Reducer等實現。 

          job.split: 文件分塊的相關信息,比如有數據分多少個塊,塊的大小(默認64m)等。 

          這三個文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統路徑mapred.system.dir屬性 + jobid決定。mapred.system.dir屬性默認是/tmp/hadoop-user_name/mapred/system。寫完這三個文 件之后, 此方法會通過RPC調用master節點上的JobTracker.submitJob(job)方法,等待返回狀態,此時作業已經提交完成。

          接下來轉到JobTracker上執行。

          (事實上這里還涉及到一些相關的類與方法)

          4.3 提交Job

          jobFile的提交過程是通過RPC(遠程進程調用)模塊來實現的。大致過程是,JobClient類中通過RPC實現的Proxy接口調用JobTrackersubmitJob()方法,而JobTracker必須實現JobSubmissionProtocol接口。

          JobTracker創建job成功后會給JobClient傳回一個JobStatus對象用于記錄job的狀態信息,如執行時間、MapReduce任務完成的比例等。JobClient會根據這個JobStatus對象創建一個NetworkedJobRunningJob對象,用于定時從JobTracker獲得執行過程的統計數據來監控并打印到用戶的控制臺。

          與創建Job過程相關的類和方法如下圖所示


           

          1. JobTracker

          5.1 JobTracker啟動

          JobTracker類中有一個main()函數,在軟件啟動的時候執行此main()函數啟動JobTracker進程,main()中生成一個JobTracker的對象,然后通過tracker.offerService()語句啟動服務,即啟動一些線程,下面是幾個主要的線程:

          taskScheduler:一個抽象類,被JobTracker用于安排執行在TaskTrackers上的task任務,它使用一個或多個JobInProgressListeners接收jobs的通知。另外一個任務是調用JobInProgress.initTask()job初始化tasks。啟動,提交作業,設置配置參數,終止等方法。


          completedJobsStoreThread對應completedJobStatusStoreCompletedJobStatusStore類:把JobInProgress中的job信息存儲到DFS中;提供一些讀取狀態信息的方法;是一個守護進程,用于刪除DFS中的保存時間超過規定時間的job status刪除,


          interTrackerServer,抽象類Server類型的實例。一個IPC (Inter-Process Communication,進程間通信)服務器,IPC調用一個以一個參數的形式調用Writable,然后返回一個Writable作為返回值,在某個端口上運行。提供了call,listener,responder,connection,handle類。包括start(),stop(),join(),getListenerAddress(),call()等方法。

          這些線程啟動之后,便可開始工作了。



          job是統一由JobTracker來調度的,把具體的Task分發給各個TaskTracker節點來執行。下面來詳細解析執行過程,首先先從JobTracker收到JobClient的提交請求開始。

            1. JobTracker初始化Job

          5.2.1 JobTracker.submitJob() 收到請求

          JobTracker接收到新的job請求(即submitJob()函數被調用)后,會創建一個JobInProgress對象并通過它來管理和調度任務。JobInProgress在創建的時候會初始化一系列與任務有關的參數,調用到FileSystem,把在JobClient端上傳的所有任務文件下載到本地的文件系統中的臨時目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。

          5.2 JobTracker.JobInitThread 通知初始化線程

          JobTracker 中的監聽器類EagerTaskInitializationListener負責任務Task的初始化。JobTracker使用jobAdded(job)加入jobEagerTaskInitializationListener中一個專門管理需要初始化的隊列里,即一個list成員變量jobInitQueue里。resortInitQueue方法根據作業的優先級排序。然后調用notifyAll()函數,會喚起一個用于初始化job的線程JobInitThread來處理???JobInitThread收到信號后即取出最靠前的job,即優先級別最高的job,調用TaskTrackerManagerinitJob最終調用JobInProgress.initTasks()執行真正的初始化工作。

          5.3 JobInProgress.initTasks() 初始化TaskInProgress

          任務Task分兩種: MapTask reduceTask,它們的管理對象都是TaskInProgress

          首先JobInProgress會創建Map的監控對象。在initTasks()函數里通過調用JobClientreadSplitFile()獲得已分解的輸入數據的RawSplit列表,然后根據這個列表創建對應數目的Map執行管理對象TaskInProgress。在這個過程中,還會記錄該RawSplit塊對應的所有在HDFS里的blocks所在的DataNode節點的host,這個會在RawSplit創建時通過FileSplitgetLocations()函數獲取,該函數會調用DistributedFileSystem的getFileCacheHints()獲得。當然如果是存儲在本地文件系統中,即使用LocalFileSystem時當然只有一個location即“localhost”了。

          創建這些TaskInProgress對象完畢后,initTasks()方法會通 過createCache()方法為這些TaskInProgress對象產生一個未執行任務的Map緩存nonRunningMapCacheslave端的 TaskTrackermaster發送心跳時,就可以直接從這個cache中取任務去執行。

          其次JobInProgress會創建Reduce的監控對象,這個比較簡單,根據JobConf里指定的Reduce數目創建,缺省只創建1Reduce任務。監控和調度Reduce任務的是TaskInProgress類,不過構造方法有所不同,TaskInProgress會根據不同參數分別創建具體的MapTask或者ReduceTask。同樣地,initTasks()也會通過createCache()方法產生nonRunningReduces成員。

          JobInProgress創建完TaskInProgress后,最后構造JobStatus并記錄job正在執行中,然后再調用JobHistory.JobInfo.logStarted()記錄job的執行日志。到這里JobTracker里初始化job的過程全部結束。


          5.3.2 JobTracker調度Job

          hadoop默認的調度器是FIFO策略的JobQueueTaskScheduler,它有兩個成員變量 jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一個監聽器類,它包含了一個映射,用來管理和調度所有的JobInProgress。jobAdded(job)同時會加入job到JobQueueJobInProgressListener中的映射。

          JobQueueTaskScheduler最重要的方法是assignTasks ,他實現了工作調度。具體實現:JobTracker 接到TaskTracker heartbeat() 調用后,首先會檢查上一個心跳響應是否完成,是沒要求啟動或重啟任務,如果一切正常,則會處理心跳。首先它會檢查 TaskTracker 端還可以做多少個 map reduce 任務,將要派發的任務數是否超出這個數,是否超出集群的任務平均剩余可負載數。如果都沒超出,則為此 TaskTracker 分配一個 MapTask ReduceTask 。產生 Map 任務使用 JobInProgress obtainNewMapTask() 方法,實質上最后調用了 JobInProgress findNewMapTask() 訪問 nonRunningMapCache

          上面講解任務初始化時說過,createCache()方法會在網絡拓撲結構上掛上需要執行的TaskInProgressfindNewMapTask()從近到遠一層一層地尋找,首先是同一節點,然后在尋找同一機柜上的節點,接著尋找相同數據中心下的節點,直到找了maxLevel層結束。這樣的話,在JobTrackerTaskTracker派發任務的時候,可以迅速找到最近的TaskTracker,讓它執行任務。

          最終生成一個Task類對象,該對象被封裝在一個LanuchTaskAction 中,發回給TaskTracker,讓它去執行任務。

          產生 Reduce 任務過程類似,使用 JobInProgress.obtainNewReduceTask() 方法,實質上最后調用了 JobInProgress findNewReduceTask() 訪問 nonRunningReduces

          6. TaskTracker

          6.1 TaskTracker的啟動

          JobTracker一樣,里面包含一個main()方法,在hadoop啟動的時候啟動此進程。

          Main()方法最主要的一句話

          TaskTracker(conf).run()

          TaskTracker(conf)獲取本機的一些配置信息,初始化服務器并啟動服務器(StatusHttpServer);然后調用initialize(),這個方法才是真正構造TaskTracker的地方,把它作為一個單獨的方法便可以再次調用并可以在close()之后回收對象,就是初始化一些變量對象,最后啟動線程:

          taskMemoryManagerTaskMemoryManagerThread類的對象。管理本機上task運行時內存的使用,殺死任何溢出和超出內存限制的task-trees

          mapLauncherreduceLauncher都是TaskLauncher類的對象,其作用是啟動maptaskreducetask任務線程。根據tasksToLaunch判斷是否需要新建任務,其中的調用的關系為:run()startNewTask()localizeJob()launchTaskForJoblaunchTask()localizeTask


          run()方法中啟動TaskTracker服務器然后一直循環。循環會嘗試連接到的JobTracker。主要調用了兩個方法startCleanupThreads(),offerService()

          startCleanupThreads()啟動為守護進程,可以用來刪除一個獨立線程的路徑。

          offerService()類似于JobTracker中的offerService()方法,即服務器執行的主循環。規定的時間內給JobTracker發送心跳信息,并處理返回的命令。

          下面具體介紹流程中的每一步。

          6.2 TaskTracker加載Task到子進程

          Task的執行實際是由TaskTracker發起的,TaskTracker會定期與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等。如果發現有自己需要執行的新任務也會在這時啟動,即是在TaskTracker調用JobTrackerheartbeat()方法時進行,此調用底層是通過IPC層調用Proxy接口實現。

          6.2.1 TaskTracker.run() 連接JobTracker

          TaskTracker的啟動過程會初始化一系列參數和服務,然后嘗試連接JobTracker(即必須實現InterTrackerProtocol接口),如果連接斷開,則會循環嘗試連接JobTracker,并重新初始化所有成員和參數。

          6.2.2 TaskTracker.offerService() 主循環

          如果連接JobTracker服務成功,TaskTracker就會調用offerService()函數進入主執行循環中。這個循環會每隔10秒與JobTracker通訊一次,調用transmitHeartBeat(),獲得HeartbeatResponse信息。然后調用HeartbeatResponsegetActions()函數獲得JobTracker傳過來的所有指令即一個TaskTrackerAction數組。再遍歷這個數組,如果是一個新任務指令即LaunchTaskAction則調用調用addToTaskQueue加入到待執行

          隊列,否則加入到tasksToCleanup隊列,交給一個taskCleanupThread線程來處理,如執行KillJobAction或者KillTaskAction等。

          6.2.3 TaskTracker.transmitHeartBeat() 獲取JobTracker指令

          transmitHeartBeat()函數處理中,TaskTracker會創建一個新的TaskTrackerStatus對象記錄目前任務的執行狀況,檢查目前執行的Task數目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設置heartbeat()askForNewTask參數為true。然后通過IPC接口調用JobTrackerheartbeat()方法發送過去,heartbeat()返回值TaskTrackerAction數組。

          6.2.4 TaskTracker.addToTaskQueue,交給TaskLauncher處理

          TaskLauncher是用來處理新任務的線程類,包含了一個待運行任務的隊列 tasksToLaunchTaskTracker.addToTaskQueue會調用TaskTrackerregisterTask,創建TaskInProgress對象來調度和監控任務,并把它加入到runningTasks隊列中。同時將這個TaskInProgress加到tasksToLaunch 中,并notifyAll()喚醒一個線程運行,該線程從隊列tasksToLaunch取出一個待運行任務,調用TaskTrackerstartNewTask運行任務。

          6.2.5 TaskTracker.startNewTask() 啟動新任務

          調用localizeJob()真正初始化Task并開始執行。

          6.2.6 TaskTracker.localizeJob() 初始化job目錄等

          此函數主要任務是初始化工作目錄workDir,再將job jar包從HDFS復制到本地文件系統中,調用RunJar.unJar()將包解壓到工作目錄。然后創建一個RunningJob并調用addTaskToJob()函數將它添加到runningJobs監控隊列中。addTaskToJob方法把一個任務加入到該任務屬于的runningJobtasks列表中。如果該任務屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調用launchTaskForJob()開始執行Task

          6.2.7 TaskTracker.launchTaskForJob() 執行任務

          啟動Task的工作實際是調用TaskTracker$TaskInProgresslaunchTask()函數來執行的。

          6.2.8 TaskTracker$TaskInProgress.launchTask() 執行任務

          執行任務前先調用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調用TaskcreateRunner()方法創建TaskRunner對象并調用其start()方法最后啟動Task獨立的java執行子進程。

          6.2.9 Task.createRunner() 創建啟動Runner對象

          Task有兩個實現版本,即MapTaskReduceTask,它們分別用于創建MapReduce任務。MapTask會創建MapTaskRunner來啟動Task子進程,而ReduceTask則創建ReduceTaskRunner來啟動。

          6.2.10 TaskRunner.start() 啟動子進程

          TaskRunner負責將一個任務放到一個進程里面來執行。它會調用run()函數來處理,主要的工作就是初始化啟動java子進程的一系列環境變量,包括設定工作目錄workDir,設置CLASSPATH環境變量等。然后裝載job jar包。JvmManager用于管理該TaskTracker上所有運行的Task子進程。每一個進程都是由JvmRunner來管理的,它也是位于單獨線程中的。JvmManagerlaunchJvm方法,根據任務是map還是reduce,生成對應的JvmRunner并放到對應JvmManagerForType的進程容器中進行管理。JvmManagerForTypereapJvm()

          分配一個新的JVM進程。如果JvmManagerForType槽滿,就尋找idle的進程,如果是同Job的直接放進去,否則殺死這個進程,用一個新的進程代替。如果槽沒有滿,那么就啟動新的子進程。生成新的進程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner線程的run方法,run方法用于生成一個新的進程并運行它,具體實現是調用runChild

          6.3 子進程執行MapTask

          真實的執行載體,是Child,它包含一個 main函數,進程執行,會將相關參數傳進來,它會拆解這些參數,通過getTask(jvmId)向父進程索取任務,并且構造出相關的Task實例,然后使用Taskrun()啟動任務。

          6.3.1 run

          方法相當簡單,配置完系統的TaskReporter后,就根據情況執行runJobCleanupTaskrunJobSetupTaskrunTaskCleanupTask或執行map

          6.3.2 mapper

          首先構造Mapper的輸出,是通過MapOutputCollector進行的,也分兩種情況,如果沒有Reducer,那么,用DirectMapOutputCollector,否則,用MapOutputBuffer。然后構造Mapper處理的InputSplit,然后就開始創建MapperRecordReader,最終得到map的輸入。構造完Mapper的輸入輸出,通過構造配置文件中配置的MapRunnable,就可以執行Mapper了。目前系統有兩個MapRunnableMapRunnerMultithreadedMapRunnerMapRunner是單線程執行器,比較簡單,他會使用反射機制生成用戶定義的Mapper接口實現類,作為他的一個成員。

          6.3.3 MapRunnerrun方法

          會先創建對應的keyvalue對象,然后,對InputSplit的每一對<keyvalue>,調用用戶實現的Mapper接口實現類的map方法,每處理一個數據對,就要使用OutputCollector收集每次處理kv對后得到的新的kv對,把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。

          6.3.4 OutputCollector

          OutputCollector的作用是收集每次調用map后得到的新的kv對,并把他們spill到文件或者放到內存,以做進一步的處理,比如排序,combine等。

          MapOutputCollector 有兩個子類:MapOutputBufferDirectMapOutputCollectorDirectMapOutputCollector用在不需要Reduce階段的時候。如果Mapper后續有reduce任務,系統會使用MapOutputBuffer做為輸出, MapOutputBuffer使用了一個緩沖區對map的處理結果進行緩存,放在內存中,又使用幾個數組對這個緩沖區進行管理。



          在適當的時機,緩沖區中的數據會被spill到硬盤中。



          向硬盤中寫數據的時機:

          1)當內存緩沖區不能容下一個太大的k v對時。spillSingleRecord方法。

          2)內存緩沖區已滿時。SpillThread線程。

          3Mapper的結果都已經collect了,需要對緩沖區做最后的清理。Flush方法。

          2.5 spillThread線程:將緩沖區中的數據spill到硬盤中。

          1)需要spill時調用函數sortAndSpill,按照partitionkey做排序。默認使用的是快速排序QuickSort

          2)如果沒有combiner,則直接輸出記錄,否則,調用CombinerRunnercombine,先做combin然后輸出。

          6.4 子進程執行ReduceTask

          ReduceTask.run方法開始和MapTask類似,包括initialize()初始化 ,runJobCleanupTask()runJobSetupTask()runTaskCleanupTask()。之后進入正式的工作,主要有這么三個步驟:CopySortReduce

          6.4.1 Copy

          就是從執行各個Map任務的服務器那里,收羅到map的輸出文件。拷貝的任務,是由ReduceTask.ReduceCopier 類來負責。

          6.4.1.1 類圖:



          6.4.1.2 流程: 使用ReduceCopier.fetchOutputs開始

          1)索取任務。使用GetMapEventsThread線程。該線程的run方法不停的調用getMapCompletionEvents方法,該方法又使用RPC調用TaskUmbilicalProtocol協議的getMapCompletionEvents,方法使用所屬的jobID向其父TaskTracker詢問此作業個Map任務的完成狀況(TaskTracker要向JobTracker詢問后再轉告給它...)。返回一個數組TaskCompletionEvent events[]TaskCompletionEvent包含taskidip地址之類的信息。

          2)當獲取到相關Map任務執行服務器的信息后,有一個線程MapOutputCopier開啟,做具體的拷貝工作。 它會在一個單獨的線程內,負責某個Map任務服務器上文件的拷貝工作。MapOutputCopierrun循環調用copyOutputcopyOutput又調用getMapOutput,使用HTTP遠程拷貝。

          3getMapOutput遠程拷貝過來的內容(當然也可以是本地了...),作為MapOutput對象存在,它可以在內存中也可以序列化在磁盤上,這個根據內存使用狀況來自動調節。

          4) 同時,還有一個內存Merger線程InMemFSMergeThread和一個文件Merger線程LocalFSMerger在同步工作,它們將下載過來的文件(可能在內存中,簡單的統稱為文件...),做著歸并排序,以此,節約時間,降低輸入文件的數量,為后續的排序工作減 負。InMemFSMergeThreadrun循環調用doInMemMerge, 該方法使用工具類Merger實現歸并,如果需要combine,則combinerRunner.combine

          6.4.2 Sort

          排序工作,就相當于上述排序工作的一個延續。它會在所有的文件都拷貝完畢后進行。使用工具類Merger歸并所有的文件。經過這一個流程,一個合并了所有所需Map任務輸出文件的新文件產生了。而那些從其他各個服務器網羅過來的 Map任務輸出文件,全部刪除了。

          6.4.3 Reduce

          Reduce任務的最后一個階段。他會準備好 keyClass"mapred.output.key.class""mapred.mapoutput.key.class", valueClass("mapred.mapoutput.value.class""mapred.output.value.class")Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后調用runOldReducer方法。(也是兩套API,我們分析runOldReducer

          6.4.4 runReducer

          1)輸出方面。它會準備一個OutputCollector收集輸出,與MapTask不同,這個OutputCollector更為簡單,僅僅是打開一個RecordWritercollect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統,基本都是分布式文件系統, 或者說是HDFS

          2)輸入方面,ReduceTask會用準備好的KeyClassValueClassKeyComparator等等之類的自定義類,構造出Reducer所需的鍵類型, 和值的迭代類型Iterator(一個鍵到了這里一般是對應一組值)。

          3)有了輸入,有了輸出,不斷循環調用自定義的Reducer,最終,Reduce階段完成。



           

           

          posted @ 2011-01-14 09:05 俞靈 閱讀(8378) | 評論 (7)編輯 收藏

          僅列出標題
          共3頁: 上一頁 1 2 3 
          主站蜘蛛池模板: 保靖县| 偏关县| 上犹县| 红河县| 景德镇市| 塔河县| 观塘区| 余江县| 紫金县| 新平| 卓资县| 南康市| 堆龙德庆县| 扶绥县| 祁东县| 偃师市| 桦川县| 永康市| 平陆县| 富宁县| 剑阁县| 射阳县| 沁源县| 阿克陶县| 都江堰市| 五华县| 包头市| 南雄市| 榆树市| 东兴市| 东乡| 安义县| 民勤县| 新密市| 聂拉木县| 喀喇沁旗| 成武县| 长岛县| 平南县| 贵港市| 兴宁市|