應用MapReduce制作壓測利器
引言
眾所周知,MapReduce編程框架(以下簡稱MR)一直是大并發運算以及海量數據讀寫應用設計的利器。在MR編程體系下,一個job通常會把輸入的數據集切分為若干塊,由map task以完全并行的方式處理消化這些數據塊。框架會對map的輸出先進行排序,然后把結果作為輸入提交給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。典型的MR程序有如下重要模塊結構構成:
類似hive這樣的應用,拿到用戶一句簡單sql查詢(假設無需二次執行的簡單sql)后,將hdfs上的海量數據進行切分,然后每個map task分別對自己負責的那部分數據執行相同的sql查詢,最后將各自獲得的結果匯總輸出給用戶,這便可以保證在海量數據中以較快的速度獲得查詢結果。
簡單介紹完MR編程框架后,我們再來談談常規壓力測試的特點和需求。
以LoadRunner和JMeter為例,這兩種工具都可以對web應用進行大并發訪問,模擬線上的高并發壓力測試,并且也都相應的提供了多機聯合產生負載這樣的方式進一步模擬現實情況增大被測對象的壓力。這是為了解決“如果一臺測試機器模擬的虛擬用戶數過多,他本身性能的下降也會直接影響到測試效果”這個問題。分析LR和JMeter的多機聯合產生負載這種測試方式,我們不難發現類似MR框架的一些特點,即測試分作如下幾步(以LR為例):
1. 設置測試機,即在多臺用于測試的機器上安裝Load Generator
2. 設置測試任務,即各種configure
3. 同時調度測試任務,通過agent執行對web應用的訪問
4. Controller負責統一調度運行場景并收集測試信息和執行結果
無論是LR還是JMeter都是優秀的壓測工具,但是總有一些非常規的壓力測試場景無法通過LR或JMeter方便的實現,例如對分布式系統做數據讀寫壓力測試,被測目標并非一個單獨的節點,而是由很多節點組成的,這樣的壓力測試場景意味著多機聯合對單一節點產生的負載被分擔到了很多個節點上。LR和JMeter針對這樣的場景往往在設置上就很復雜。
此外,對很多特定的壓測目標,測試人員在設計了專屬測試工具之后,往往也需要有一個類似上述LR測試步驟的過程,即工具分發、調度執行、收集結果和過程信息這樣一個測試執行框架,如果自己去實現這一套框架,耗費的人月數都是相當可觀的,且復用程度有限。于是在云梯項目中我通過自己的實踐,想到了將MR編程框架體系與壓力測試需求相結合。
從事例說起
先從簡單實現類似LR多機聯合負載這樣一個壓測場景展開。被測目標是這樣的:一個web應用服務,用于收集分布式系統的跨機房流量信息,后端采用hbase作為存儲數據庫,接口為單一節點的http listen端口,需要模擬真實跨機房場景,利用較少的機器數量(約真實系統的50分之一)模擬線上系統的并發度。
有了工具代碼之后,我們通過實現Mapper來封裝該工具,因此上述代碼中我使用了“org.apache.hadoop.mapred.Reporter”的方法“incrCounter(Stringarg0, String arg1, long arg2)”來對測試中的重要過程數據進行計數,該方法會將所有map/reduce task中匯報的arg0|arg1定義的值arg2進行相加,輸出到MR的jobtracker頁面上,通過觀察作業執行頁面可以實時獲取這些測試執行過程信息。此外,我還調度了“setStatus(String arg0)”方法,該方法可以實時更新當前所處task的頁面信息,提供更詳細的單個task執行情況信息。jobdetails.jsp頁面觀察結果如下所示:
測試工具代碼是發送http request部分,出于安全考慮,重要部分略過:
while(System.currentTimeMillis() - start <= runtime){ StringBuffer sb = new StringBuffer(); List<String> data = new ArrayList<String>(); HttpURLConnection httpurlconnection = null; try{ URL url = new URL(this.reportAd); httpurlconnection = (HttpURLConnection) url.openConnection(); httpurlconnection.setConnectTimeout(5000); httpurlconnection.setReadTimeout(5000); httpurlconnection.setDoOutput(true); httpurlconnection.setRequestMethod("POST"); httpurlconnection.setRequestProperty("Content-type", "text/plain"); for(long i=0; i<this.recordnum; i++){ 。。。。。。 s = Math.abs(R.nextLong())%102400000+1024; staticWriteSize += s; reporter.incrCounter("TestTool", "Write Size", s); staticWriteTime += (endTime - startTime); reporter.incrCounter("TestTool", "Write Time", endTime - startTime); 。。。。。。 }else{ 。。。。。。 reporter.incrCounter("TestTool", "Read Size", s); staticReadTime += (endTime - startTime); reporter.incrCounter("TestTool", "Read Time", endTime - startTime); 。。。。。。 } Pair p = value.get(R.nextInt(value.size())); 。。。。。。 staticCount++; } reporter.incrCounter("TestTool", "Record num", this.recordnum); reporter.setStatus("Record: "+staticCount+"("+staticWrite+"w, "+staticRead+"r), Write Size: " +staticWriteSize+", Write Time: "+staticWriteTime +", Read Size: "+staticReadSize+", Read Time: "+staticReadTime); httpurlconnection.getOutputStream().write(sb.toString().getBytes()); httpurlconnection.getOutputStream().flush(); httpurlconnection.getOutputStream().close(); int code = httpurlconnection.getResponseCode(); if(code != 200) { LOG.warn("send data to master server failed, code=" + code); } reporter.incrCounter("TestTool", "Http Post num", 1); map.staticPost.addAndGet(1); Thread.sleep(interval); } catch (Exception e) { map.staticPost.addAndGet(1); reporter.incrCounter("TestTool", e.getClass().toString(), 1); LOG.warn(e.getMessage(), e); } finally { if (httpurlconnection != null) { httpurlconnection.disconnect(); } } |