測(cè)試工具——Distcp工具深入分析
引言
DistCp命令是hadoop用戶最常使用的命令之一,它位于hadoop tools包中,代碼不多,約1300多行,主要用于在兩個(gè)HDFS集群之間快速拷貝數(shù)據(jù)。DistCp工具代碼結(jié)構(gòu)清晰易懂,通過(guò)分析該工具的代碼有助于我們更好的理解MR編程框架,并可以對(duì)hdfs文件系統(tǒng)有一個(gè)初步的了解。
用法
DistCp使用方法如下表所示:
OPTIONS: -p[rbugp] Preserve status r: replication number b: block size u: user g: group p: permission -p alone is equivalent to -prbugp -i Ignore failures -log <logdir> Write logs to <logdir> -m <num_maps> Maximum number of simultaneous copies -overwrite Overwrite destination -update Overwrite if src size different from dst size -f <urilist_uri> Use list at <urilist_uri> as src list -filelimit <n> Limit the total number of files to be <= n -sizelimit <n> Limit the total size to be <= n bytes -delete Delete the files existing in the dst but not in src |
這里-p、-m、-overwrite都是常用參數(shù),大多數(shù)情況下我們期望拷貝后數(shù)據(jù)權(quán)限保持一致,通過(guò)-p參數(shù)來(lái)完成權(quán)限一致性,拷貝并行度則由-m參數(shù)來(lái)調(diào)節(jié)。至于-overwrite往往和-delete合用,用來(lái)起到dst和src的一個(gè)diff功能。至于-update是很不靠譜的參數(shù),因?yàn)橹挥挟?dāng)源和目標(biāo)文件的大小不一致時(shí)distcp才會(huì)覆蓋拷貝,如果大小一致,雖然內(nèi)容不同distcp也依然會(huì)跳過(guò)這個(gè)文件不做拷貝。
源代碼與過(guò)程分析
DistCp實(shí)現(xiàn)了org.apache.hadoop.util.Tool這個(gè)接口,這個(gè)接口實(shí)際只有一個(gè)有用的方法聲明,即“int run(InputStream in, OutputStream out, OutputStream err,String... arguments);”通過(guò)ToolRunner這個(gè)類調(diào)度運(yùn)行。
DistCp解析完參數(shù)后,首先通過(guò)源路徑檢測(cè)并獲得文件系統(tǒng)句柄。然后進(jìn)入setup方法:
private static void setup(Configuration conf, JobConf jobConf,
final Arguments args)
該方法是DistCp做準(zhǔn)備工作的地方,首先是結(jié)合一個(gè)隨機(jī)數(shù)生成一個(gè)工作目錄,并將該目錄路徑作為參數(shù)傳遞給Mapper,在這個(gè)目錄下會(huì)生成兩個(gè)文件“_distcp_src_files”和“_distcp_dst_files”,這兩個(gè)文件都是SequenceFile,即Key/Value結(jié)構(gòu)的序列化文件,這里將記錄所有需要拷貝的源目錄/文件信息列表。其中_distcp_src_files 的key是源文件的size,如果是目錄則記錄為0,value是自己實(shí)現(xiàn)的Writable接口類FilePair,記錄目標(biāo)節(jié)點(diǎn)的org.apache.hadoop.fs.FileStatus和路徑。_distcp_dst_files的key是目標(biāo)路徑,和節(jié)點(diǎn)的FileStatus。這兩個(gè)文件是DistCp工具的關(guān)鍵點(diǎn),在setup方法中,DistCp通過(guò)遞歸遍歷了要拷貝的所有源頭數(shù)據(jù)列表,生成了這兩個(gè)文件。
隨后,DistCp會(huì)以268435456字節(jié)(256MB)為切分單位計(jì)算map數(shù),這個(gè)數(shù)值可以通過(guò)-sizelimit參數(shù)進(jìn)行人為修改。DistCp構(gòu)造了自己的InputSplit,將_distcp_src_files文件以剛才所說(shuō)的值為單位進(jìn)行切分,如果設(shè)定了-m參數(shù),則會(huì)按照該參數(shù)設(shè)定的map數(shù)為基準(zhǔn)進(jìn)行切分。這里需要注意切分的map數(shù)不會(huì)恰好等于-m參數(shù)設(shè)定的值,由于不能整除的原因,總會(huì)或多或少的偏離一點(diǎn)設(shè)定值。
map數(shù)的確定算法如下:
private static void setMapCount(long totalBytes, JobConf job) throws IOException { int numMaps = (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP)); numMaps = Math.min(numMaps, job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE * new JobClient(job).getClusterStatus().getTaskTrackers())); job.setNumMapTasks(Math.max(numMaps, 1)); } |
這里可以看到,DistCp其實(shí)還判斷了集群實(shí)際tasktracker數(shù)量,防止map數(shù)設(shè)置的太多,導(dǎo)致很多map需要等待一輪輪的調(diào)度。
切分代碼如下:
SequenceFile.Reader sl = null; try { sl = new SequenceFile.Reader(fs, src, job); for (; sl.next(key, value); last = sl.getPosition()) { // if adding this split would put this split past the target size, // cut the last split and put this next file in the next split. if (acc + key.get() > targetsize && acc != 0) { long splitsize = last - pos; splits.add(new FileSplit(src, pos, splitsize, (String[])null)); cbrem -= splitsize; pos = last; acc = 0L; } acc += key.get(); } } finally { checkAndClose(sl); } |
split之后就進(jìn)入Mapper執(zhí)行階段,map task起來(lái)后就會(huì)根據(jù)自己分配到的那段文件列表來(lái)進(jìn)行點(diǎn)對(duì)點(diǎn)的拷貝,拷貝過(guò)程會(huì)保持Permission、Replication、Block Size的一致性,如果設(shè)定了-update則會(huì)做一下是否需要update的判斷,如果設(shè)定了-overwrite則會(huì)刪除已有的文件。這里Owner信息沒(méi)有保持一致,而是放到了服務(wù)端所有map執(zhí)行完之后,這一點(diǎn)很讓我覺(jué)得糾結(jié),為什么不在map里面拷貝完之后直接同步文件Owner呢?如果有哪位大師知道希望可以提點(diǎn)我一下。因?yàn)槭菍?shí)現(xiàn)了Tool接口,而Tool接口是繼承了Configurable接口的,所以-D指定的值對(duì)于DistCp來(lái)說(shuō)也是可以生效的。例如設(shè)定“-Ddfs.replication=1”,那么拷貝時(shí)目標(biāo)文件的replication數(shù)就將保持為1,這樣一來(lái)由于目標(biāo)端需要寫(xiě)入的數(shù)據(jù)變少了,拷貝速度就可以大大加快,但是不推薦這么做,因?yàn)榭截愡^(guò)程中如果有一臺(tái)Datanode掛了,那么丟失的數(shù)據(jù)由于無(wú)備份,就將真正丟了,這臺(tái)機(jī)器恢復(fù)不了的話,整個(gè)distcp過(guò)程就會(huì)因?yàn)閬G數(shù)據(jù)而失敗了。
拷貝過(guò)程關(guān)鍵代碼如下:
FSDataInputStream in = null; FSDataOutputStream out = null; try { // open src file in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath()); reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen()); // open tmp file out = create(tmpfile, reporter, srcstat); // copy file for(int cbread; (cbread = in.read(buffer)) >= 0; ) { out.write(buffer, 0, cbread); cbcopied += cbread; reporter.setStatus( String.format("%.2f ", cbcopied*100.0/srcstat.getLen()) + absdst + " [ " + StringUtils.humanReadableInt(cbcopied) + " / " + StringUtils.humanReadableInt(srcstat.getLen()) + " ]"); } } finally { checkAndClose(in); checkAndClose(out); } |
Mapper執(zhí)行完之后,DistCp工具的服務(wù)端執(zhí)行過(guò)程就全部完成了,回到客戶端還會(huì)做一些掃尾的工作,例如同步Owner權(quán)限。這里會(huì)有一些問(wèn)題,稍后我們一并分析。
問(wèn)題分析
DistCp存在三大問(wèn)題,下面來(lái)一一剖析:
1.任務(wù)失敗,map task報(bào)“DFS Read: java.io.IOException: Could not obtain block”
這是由于“_distcp_src_files”這個(gè)文件的備份數(shù)是系統(tǒng)默認(rèn)值,例如hadoop-site.xml里面設(shè)置了dfs.replication=3,那么_distcp_src_files文件的備份數(shù)則創(chuàng)建之后就為3了。當(dāng)map數(shù)非常多,以至于超過(guò)了_distcp_src_files文件三個(gè)副本所在datanode最大容納上限的時(shí)候,部分map task就會(huì)出現(xiàn)獲取不了block的問(wèn)題。對(duì)于DistCp來(lái)說(shuō)“-i”參數(shù)一般是絕對(duì)不能使用的,因?yàn)樵O(shè)置了該參數(shù),這個(gè)問(wèn)題就會(huì)被掩蓋,帶來(lái)的后果就是拷貝完缺失了部分?jǐn)?shù)據(jù)。比較好的做法是在計(jì)算了總map數(shù)之后,自動(dòng)增加_distcp_src_files這個(gè)文件的備份數(shù),這樣一來(lái)訪問(wèn)容納上限也會(huì)跟著提高,上述問(wèn)題就不會(huì)再出現(xiàn)了。當(dāng)前社區(qū)已對(duì)此有了簡(jiǎn)單fix,直接將備份數(shù)設(shè)置成了一個(gè)較高的數(shù)值。一般說(shuō)來(lái)對(duì)于計(jì)算資源有限的集群來(lái)說(shuō),過(guò)多的maptask并不會(huì)提高拷貝的效率,因此我們可以通過(guò)-m參數(shù)來(lái)設(shè)定合理的map數(shù)量。一般說(shuō)來(lái)通過(guò)觀察ganglia,bytes_in、bytes_out達(dá)到上限就可以了。
2.Owner同步問(wèn)題
DistCp工具的提示信息非常少,對(duì)于海量數(shù)據(jù)來(lái)說(shuō),DistCp初始階段準(zhǔn)備拷貝文件列表和結(jié)束階段設(shè)定Owner同步耗時(shí)都比較長(zhǎng),但卻沒(méi)有任何提示信息。這是一個(gè)很奇怪的地方,拷貝過(guò)程中,mapred會(huì)打印進(jìn)度信息到客戶端,這時(shí)候可以看到百分比,等結(jié)束的時(shí)候可以看到過(guò)程中的一些統(tǒng)計(jì)信息。如果你設(shè)置了-p參數(shù),此時(shí)就會(huì)處于一個(gè)停滯的狀態(tài),沒(méi)有任何輸出了。由于Owner同步?jīng)]有在map task里面去做,放在客戶端就必然成為一個(gè)單線程的工作,耗時(shí)也會(huì)比較長(zhǎng)。我以前犯過(guò)的錯(cuò)誤就是啟動(dòng)distcp后看jobtracker頁(yè)面出現(xiàn)作業(yè)了,就kill了客戶端的進(jìn)程,這樣一來(lái)就導(dǎo)致Owner不會(huì)同步。現(xiàn)在做法都是用“nohup nice -n 0”把進(jìn)程放到后臺(tái)讓其自動(dòng)結(jié)束。
3.長(zhǎng)尾問(wèn)題
DistCp切分map的時(shí)候,充分考慮了每個(gè)map需要拷貝的數(shù)據(jù)量,盡量保持平均,但是卻完全沒(méi)有考慮碎文件和整塊文件拷貝耗時(shí)不同的問(wèn)題。此外,某些task所在tasktracker機(jī)器由于故障之類原因也會(huì)導(dǎo)致性能較差,拖慢了整體節(jié)奏。拷貝大量數(shù)據(jù)的時(shí)候總會(huì)因?yàn)檫@些原因出現(xiàn)長(zhǎng)尾。通過(guò)在InputSplit的時(shí)候同時(shí)考慮數(shù)據(jù)量和文件個(gè)數(shù)的均衡可以解決碎文件和整文件拷貝耗時(shí)不同的問(wèn)題。而部分task運(yùn)行慢的問(wèn)題,目前看起來(lái)則沒(méi)有很好的解決方案。
用途
DistCp這個(gè)工具不僅可以用來(lái)做數(shù)據(jù)拷貝遷移工作,同時(shí)也是一個(gè)很好的制造集群負(fù)載的工具。用來(lái)模擬一定壓力下的集成測(cè)試是非常有效的。在跨機(jī)房項(xiàng)目中,我們使用該工具負(fù)載兩個(gè)機(jī)房之間的帶寬,通過(guò)控制同時(shí)工作map數(shù)來(lái)調(diào)整帶寬的增減是非常有效的。拓展該工具的代碼思路,我們?cè)诳鐧C(jī)房項(xiàng)目中制作出來(lái)的很多壓力測(cè)試、性能測(cè)試工具也都發(fā)揮了作用。下面簡(jiǎn)單用一幅流程圖來(lái)說(shuō)明一下distcp工具的思想:
總結(jié)
DistCp工具是一個(gè)非常易于使用的拷貝工具,在Hadoop生態(tài)圈眾多怪獸級(jí)應(yīng)用中,DistCp的代碼是優(yōu)美且短小精悍的。也因?yàn)槠浯a易讀性非常好,因此作為MR編程框架的入門(mén)教材也非常適合。小心的使用這個(gè)工具,我們可以在很多測(cè)試場(chǎng)景下模擬真實(shí)的線上情況。因此建議每位剛?cè)際adoop門(mén)的碼農(nóng)都能鉆研一下DistCp的源碼,增加對(duì)MR編程框架和HDFS文件系統(tǒng)的深入理解。
posted on 2013-11-04 13:13 順其自然EVO 閱讀(4138) 評(píng)論(0) 編輯 收藏