測試工具——Distcp工具深入分析
引言
DistCp命令是hadoop用戶最常使用的命令之一,它位于hadoop tools包中,代碼不多,約1300多行,主要用于在兩個HDFS集群之間快速拷貝數據。DistCp工具代碼結構清晰易懂,通過分析該工具的代碼有助于我們更好的理解MR編程框架,并可以對hdfs文件系統有一個初步的了解。
用法
DistCp使用方法如下表所示:
OPTIONS: -p[rbugp] Preserve status r: replication number b: block size u: user g: group p: permission -p alone is equivalent to -prbugp -i Ignore failures -log <logdir> Write logs to <logdir> -m <num_maps> Maximum number of simultaneous copies -overwrite Overwrite destination -update Overwrite if src size different from dst size -f <urilist_uri> Use list at <urilist_uri> as src list -filelimit <n> Limit the total number of files to be <= n -sizelimit <n> Limit the total size to be <= n bytes -delete Delete the files existing in the dst but not in src |
這里-p、-m、-overwrite都是常用參數,大多數情況下我們期望拷貝后數據權限保持一致,通過-p參數來完成權限一致性,拷貝并行度則由-m參數來調節。至于-overwrite往往和-delete合用,用來起到dst和src的一個diff功能。至于-update是很不靠譜的參數,因為只有當源和目標文件的大小不一致時distcp才會覆蓋拷貝,如果大小一致,雖然內容不同distcp也依然會跳過這個文件不做拷貝。
源代碼與過程分析
DistCp實現了org.apache.hadoop.util.Tool這個接口,這個接口實際只有一個有用的方法聲明,即“int run(InputStream in, OutputStream out, OutputStream err,String... arguments);”通過ToolRunner這個類調度運行。
DistCp解析完參數后,首先通過源路徑檢測并獲得文件系統句柄。然后進入setup方法:
private static void setup(Configuration conf, JobConf jobConf,
final Arguments args)
該方法是DistCp做準備工作的地方,首先是結合一個隨機數生成一個工作目錄,并將該目錄路徑作為參數傳遞給Mapper,在這個目錄下會生成兩個文件“_distcp_src_files”和“_distcp_dst_files”,這兩個文件都是SequenceFile,即Key/Value結構的序列化文件,這里將記錄所有需要拷貝的源目錄/文件信息列表。其中_distcp_src_files 的key是源文件的size,如果是目錄則記錄為0,value是自己實現的Writable接口類FilePair,記錄目標節點的org.apache.hadoop.fs.FileStatus和路徑。_distcp_dst_files的key是目標路徑,和節點的FileStatus。這兩個文件是DistCp工具的關鍵點,在setup方法中,DistCp通過遞歸遍歷了要拷貝的所有源頭數據列表,生成了這兩個文件。
隨后,DistCp會以268435456字節(256MB)為切分單位計算map數,這個數值可以通過-sizelimit參數進行人為修改。DistCp構造了自己的InputSplit,將_distcp_src_files文件以剛才所說的值為單位進行切分,如果設定了-m參數,則會按照該參數設定的map數為基準進行切分。這里需要注意切分的map數不會恰好等于-m參數設定的值,由于不能整除的原因,總會或多或少的偏離一點設定值。
map數的確定算法如下:
private static void setMapCount(long totalBytes, JobConf job) throws IOException { int numMaps = (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); numMaps = Math.min(numMaps, job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * new JobClient(job).getClusterStatus().getTaskTrackers())); job.setNumMapTasks(Math.max(numMaps, 1)); } |
這里可以看到,DistCp其實還判斷了集群實際tasktracker數量,防止map數設置的太多,導致很多map需要等待一輪輪的調度。
切分代碼如下:
SequenceFile.Reader sl = null; try { sl = new SequenceFile.Reader(fs, src, job); for (; sl.next(key, value); last = sl.getPosition()) { // if adding this split would put this split past the target size, // cut the last split and put this next file in the next split. if (acc + key.get() > targetsize && acc != 0) { long splitsize = last - pos; splits.add(new FileSplit(src, pos, splitsize, (String[])null)); cbrem -= splitsize; pos = last; acc = 0L; } acc += key.get(); } } finally { checkAndClose(sl); } |
split之后就進入Mapper執行階段,map task起來后就會根據自己分配到的那段文件列表來進行點對點的拷貝,拷貝過程會保持Permission、Replication、Block Size的一致性,如果設定了-update則會做一下是否需要update的判斷,如果設定了-overwrite則會刪除已有的文件。這里Owner信息沒有保持一致,而是放到了服務端所有map執行完之后,這一點很讓我覺得糾結,為什么不在map里面拷貝完之后直接同步文件Owner呢?如果有哪位大師知道希望可以提點我一下。因為是實現了Tool接口,而Tool接口是繼承了Configurable接口的,所以-D指定的值對于DistCp來說也是可以生效的。例如設定“-Ddfs.replication=1”,那么拷貝時目標文件的replication數就將保持為1,這樣一來由于目標端需要寫入的數據變少了,拷貝速度就可以大大加快,但是不推薦這么做,因為拷貝過程中如果有一臺Datanode掛了,那么丟失的數據由于無備份,就將真正丟了,這臺機器恢復不了的話,整個distcp過程就會因為丟數據而失敗了。
拷貝過程關鍵代碼如下:
FSDataInputStream in = null; FSDataOutputStream out = null; try { // open src file in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath()); reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); // open tmp file out = create(tmpfile, reporter, srcstat); // copy file for(int cbread; (cbread = in.read(buffer)) >= 0; ) { out.write(buffer, 0, cbread); cbcopied += cbread; reporter.setStatus( String.format("%.2f ", cbcopied*100.0/srcstat.getLen()) + absdst + " [ " + StringUtils.humanReadableInt(cbcopied) + " / " + StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); } } finally { checkAndClose(in); checkAndClose(out); } |
Mapper執行完之后,DistCp工具的服務端執行過程就全部完成了,回到客戶端還會做一些掃尾的工作,例如同步Owner權限。這里會有一些問題,稍后我們一并分析。
問題分析
DistCp存在三大問題,下面來一一剖析:
1.任務失敗,map task報“DFS Read: java.io.IOException: Could not obtain block”
這是由于“_distcp_src_files”這個文件的備份數是系統默認值,例如hadoop-site.xml里面設置了dfs.replication=3,那么_distcp_src_files文件的備份數則創建之后就為3了。當map數非常多,以至于超過了_distcp_src_files文件三個副本所在datanode最大容納上限的時候,部分map task就會出現獲取不了block的問題。對于DistCp來說“-i”參數一般是絕對不能使用的,因為設置了該參數,這個問題就會被掩蓋,帶來的后果就是拷貝完缺失了部分數據。比較好的做法是在計算了總map數之后,自動增加_distcp_src_files這個文件的備份數,這樣一來訪問容納上限也會跟著提高,上述問題就不會再出現了。當前社區已對此有了簡單fix,直接將備份數設置成了一個較高的數值。一般說來對于計算資源有限的集群來說,過多的maptask并不會提高拷貝的效率,因此我們可以通過-m參數來設定合理的map數量。一般說來通過觀察ganglia,bytes_in、bytes_out達到上限就可以了。
2.Owner同步問題
DistCp工具的提示信息非常少,對于海量數據來說,DistCp初始階段準備拷貝文件列表和結束階段設定Owner同步耗時都比較長,但卻沒有任何提示信息。這是一個很奇怪的地方,拷貝過程中,mapred會打印進度信息到客戶端,這時候可以看到百分比,等結束的時候可以看到過程中的一些統計信息。如果你設置了-p參數,此時就會處于一個停滯的狀態,沒有任何輸出了。由于Owner同步沒有在map task里面去做,放在客戶端就必然成為一個單線程的工作,耗時也會比較長。我以前犯過的錯誤就是啟動distcp后看jobtracker頁面出現作業了,就kill了客戶端的進程,這樣一來就導致Owner不會同步?,F在做法都是用“nohup nice -n 0”把進程放到后臺讓其自動結束。
3.長尾問題
DistCp切分map的時候,充分考慮了每個map需要拷貝的數據量,盡量保持平均,但是卻完全沒有考慮碎文件和整塊文件拷貝耗時不同的問題。此外,某些task所在tasktracker機器由于故障之類原因也會導致性能較差,拖慢了整體節奏??截惔罅繑祿臅r候總會因為這些原因出現長尾。通過在InputSplit的時候同時考慮數據量和文件個數的均衡可以解決碎文件和整文件拷貝耗時不同的問題。而部分task運行慢的問題,目前看起來則沒有很好的解決方案。
用途
DistCp這個工具不僅可以用來做數據拷貝遷移工作,同時也是一個很好的制造集群負載的工具。用來模擬一定壓力下的集成測試是非常有效的。在跨機房項目中,我們使用該工具負載兩個機房之間的帶寬,通過控制同時工作map數來調整帶寬的增減是非常有效的。拓展該工具的代碼思路,我們在跨機房項目中制作出來的很多壓力測試、性能測試工具也都發揮了作用。下面簡單用一幅流程圖來說明一下distcp工具的思想:
總結
DistCp工具是一個非常易于使用的拷貝工具,在Hadoop生態圈眾多怪獸級應用中,DistCp的代碼是優美且短小精悍的。也因為其代碼易讀性非常好,因此作為MR編程框架的入門教材也非常適合。小心的使用這個工具,我們可以在很多測試場景下模擬真實的線上情況。因此建議每位剛入Hadoop門的碼農都能鉆研一下DistCp的源碼,增加對MR編程框架和HDFS文件系統的深入理解。