qileilove

          blog已經轉移至github,大家請訪問 http://qaseven.github.io/

          應用MapReduce制作壓測利器

           引言
            眾所周知,MapReduce編程框架(以下簡稱MR)一直是大并發運算以及海量數據讀寫應用設計的利器。在MR編程體系下,一個job通常會把輸入的數據集切分為若干塊,由map task以完全并行的方式處理消化這些數據塊。框架會對map的輸出先進行排序,然后把結果作為輸入提交給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。典型的MR程序有如下重要模塊結構構成:
            類似hive這樣的應用,拿到用戶一句簡單sql查詢(假設無需二次執行的簡單sql)后,將hdfs上的海量數據進行切分,然后每個map task分別對自己負責的那部分數據執行相同的sql查詢,最后將各自獲得的結果匯總輸出給用戶,這便可以保證在海量數據中以較快的速度獲得查詢結果。
            簡單介紹完MR編程框架后,我們再來談談常規壓力測試的特點和需求。
            以LoadRunner和JMeter為例,這兩種工具都可以對web應用進行大并發訪問,模擬線上的高并發壓力測試,并且也都相應的提供了多機聯合產生負載這樣的方式進一步模擬現實情況增大被測對象的壓力。這是為了解決“如果一臺測試機器模擬的虛擬用戶數過多,他本身性能的下降也會直接影響到測試效果”這個問題。分析LR和JMeter的多機聯合產生負載這種測試方式,我們不難發現類似MR框架的一些特點,即測試分作如下幾步(以LR為例):
            1. 設置測試機,即在多臺用于測試的機器上安裝Load Generator
            2. 設置測試任務,即各種configure
            3. 同時調度測試任務,通過agent執行對web應用的訪問
            4. Controller負責統一調度運行場景并收集測試信息和執行結果
            無論是LR還是JMeter都是優秀的壓測工具,但是總有一些非常規的壓力測試場景無法通過LR或JMeter方便的實現,例如對分布式系統做數據讀寫壓力測試,被測目標并非一個單獨的節點,而是由很多節點組成的,這樣的壓力測試場景意味著多機聯合對單一節點產生的負載被分擔到了很多個節點上。LR和JMeter針對這樣的場景往往在設置上就很復雜。
            此外,對很多特定的壓測目標,測試人員在設計了專屬測試工具之后,往往也需要有一個類似上述LR測試步驟的過程,即工具分發、調度執行、收集結果和過程信息這樣一個測試執行框架,如果自己去實現這一套框架,耗費的人月數都是相當可觀的,且復用程度有限。于是在云梯項目中我通過自己的實踐,想到了將MR編程框架體系與壓力測試需求相結合。
            從事例說起
            先從簡單實現類似LR多機聯合負載這樣一個壓測場景展開。被測目標是這樣的:一個web應用服務,用于收集分布式系統的跨機房流量信息,后端采用hbase作為存儲數據庫,接口為單一節點的http listen端口,需要模擬真實跨機房場景,利用較少的機器數量(約真實系統的50分之一)模擬線上系統的并發度。


          測試工具代碼是發送http request部分,出于安全考慮,重要部分略過:
          while(System.currentTimeMillis() - start <= runtime){
          StringBuffer sb = new StringBuffer();
          List<String> data = new ArrayList<String>();
          HttpURLConnection httpurlconnection = null;
          try{
          URL url = new URL(this.reportAd);
          httpurlconnection = (HttpURLConnection) url.openConnection();
          httpurlconnection.setConnectTimeout(5000);
          httpurlconnection.setReadTimeout(5000);
          httpurlconnection.setDoOutput(true);
          httpurlconnection.setRequestMethod("POST");
          httpurlconnection.setRequestProperty("Content-type", "text/plain");
          for(long i=0; i<this.recordnum; i++){
          。。。。。。
          s = Math.abs(R.nextLong())%102400000+1024;
          staticWriteSize += s;
          reporter.incrCounter("TestTool", "Write Size", s);
          staticWriteTime += (endTime - startTime);
          reporter.incrCounter("TestTool", "Write Time", endTime - startTime);
          。。。。。。
          }else{
          。。。。。。
          reporter.incrCounter("TestTool", "Read Size", s);
          staticReadTime += (endTime - startTime);
          reporter.incrCounter("TestTool", "Read Time", endTime - startTime);
          。。。。。。
          }
          Pair p = value.get(R.nextInt(value.size()));
          。。。。。。
          staticCount++;
          }
          reporter.incrCounter("TestTool", "Record num", this.recordnum);
          reporter.setStatus("Record: "+staticCount+"("+staticWrite+"w, "+staticRead+"r), Write Size: "
          +staticWriteSize+", Write Time: "+staticWriteTime
          +", Read Size: "+staticReadSize+", Read Time: "+staticReadTime);
          httpurlconnection.getOutputStream().write(sb.toString().getBytes());
          httpurlconnection.getOutputStream().flush();
          httpurlconnection.getOutputStream().close();
          int code = httpurlconnection.getResponseCode();
          if(code != 200) {
          LOG.warn("send data to master server failed, code=" + code);
          }
          reporter.incrCounter("TestTool", "Http Post num", 1);
          map.staticPost.addAndGet(1);
          Thread.sleep(interval);
          } catch (Exception e) {
          map.staticPost.addAndGet(1);
          reporter.incrCounter("TestTool", e.getClass().toString(), 1);
          LOG.warn(e.getMessage(), e);
          } finally {
          if (httpurlconnection != null) {
          httpurlconnection.disconnect();
          }
          }
            有了工具代碼之后,我們通過實現Mapper來封裝該工具,因此上述代碼中我使用了“org.apache.hadoop.mapred.Reporter”的方法“incrCounter(Stringarg0, String arg1, long arg2)”來對測試中的重要過程數據進行計數,該方法會將所有map/reduce task中匯報的arg0|arg1定義的值arg2進行相加,輸出到MR的jobtracker頁面上,通過觀察作業執行頁面可以實時獲取這些測試執行過程信息。此外,我還調度了“setStatus(String arg0)”方法,該方法可以實時更新當前所處task的頁面信息,提供更詳細的單個task執行情況信息。jobdetails.jsp頁面觀察結果如下所示:



           更多的執行過程信息,我們則通過“org.apache.commons.logging.Log”來收集,通過tasklog頁面可以查閱到這些詳細的日志信息。
            作為map task的輸入,我通過在hdfs上生成的一堆隨機數據來實現,InputSplit類讀取了hdfs上作為模擬真實數據的輸入后,將其根據map數切分成n份(n=自定義的map數量),并將其分發給對應的map task,map task拿到自己那份數據后,立即啟動多個線程執行上述測試工具代碼:
          public void map(LongWritable key, List<Pair> value,
          OutputCollector<Text, LongWritable> context, Reporter reporter)
          throws IOException {
          。。。。。。
          for(int i=0; i < this.threadnum; i++){
          SCNThread t = new SCNThread(value, reporter, start, this);
          t.start();
          this.alivethread.addAndGet(1);
          }
            。。。。。。
            各task自行同步線程啟動數量,當所有線程都啟動之后,輸出收集器開始運作:
          long now = System.currentTimeMillis();
          if (now > this.start + this.interval) {
          this.start += this.interval;
          context.collect(new LongWritable(this.start), new LongWritable(
          this.writeBlocks));
          }
            。。。。。。
            可以看到key是時間戳,value是我們想收集的數值,收集器收集到的數據將進一步提供給Reducer來分析,這里有一個壓力測試的關鍵點,即最大并發開始時間點和結束時間點的判斷。觀察Reducer類的reduce方法:
            public void reduce(Text key, Iterator<LongWritable> value,
            OutputCollector<Text, Text> context, Reporter reporter)
            由于所有map都以相同的時間戳作為key,因此同一時刻迭代器value的size代表了有多少個map已經達到了最大并發度,我們判斷這個size,當其與我們預期的map總數一致時,則可以將該時間戳作為最大并發壓力的開始時間點,當size開始小于預期map總數時,則代表最大并發壓力的結束時間點,測試結果分析時可以掐取這一段數據作為測試結果,免去開始準備階段和快結束階段壓力變小對測試結果的干擾。
            更進一步我們可以在hdfs上設計一個標志位,當一個maptask執行完畢之后,通過該標志位通知到其他所有map task,以便快速結束當前的測試。
            測試結果被reducer分析匯總后輸出到hdfs上,最終我們只需要查看一下這個輸出文件的內容就可以得到我們需要的測試結果了。
            其實我們不難發現,這種測試框架與DDoS攻擊很類似,當手握數千臺機器之后,基本上就具備了指哪毀哪的能力,MR框架體系蘊藏的能量的確是非常巨大的。
          流程圖
            沒有流程圖,上述文字描述終歸不夠直觀,因此詳細流程請看下圖所示:
            多語言測試工具的支持
            對于java類測試工具,我們可以應用該流程圖所示方案進行大并發度的壓力測試,對于非java語言類的測試工具,我們一方面可以自行撰寫其他編程語言的進程調度和收集器,另一方面也可以使用hadoop streaming這個編程工具來實現。Hadoop Streaming是 Hadoop提供的一個編程工具,它允許用戶使用任何可執行文件或腳本文件作為 Mapper和 Reducer。這樣一來我們用python或shell編寫的測試工具也可以通過streaming簡單的調度起來執行。有關streaming編程工具的技術細節請自行搜索腦補,這里不再贅述。
            第二個例子
            第二個例子是關于如何使用MR編程框架壓測HDFS文件系統的,該例子涉及到Hadoop更底層的技術細節,以及對性能指標的分析等內容,因此留作《工欲善其事必先利其器》系列的下一彈專門介紹一下,敬請期待。
            總結
            為什么把本篇作為系列文章的首彈,是因為Hadoop相關的測試工具大多不離該篇所使用到的技術。Hadoop是一個生態圈,因此測試工具作為生態圈的一部分也沒必要且不應該脫離這個生態圈去獨立生存。MR編程框架大大縮減了分布式應用程序的開發周期,其編程思想更值得每個碼農去深挖學習。本彈是筆者一個非常粗淺的思考開端,期望能夠拋磚引玉與更多碼農進行分享和探討。系列文章之二《HDFS性能壓測工具淺析》將進一步分享一下有關HDFS相關的技術細節。

          posted on 2013-11-07 11:15 順其自然EVO 閱讀(238) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          <2013年11月>
          272829303112
          3456789
          10111213141516
          17181920212223
          24252627282930
          1234567

          導航

          統計

          常用鏈接

          留言簿(55)

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 唐河县| 驻马店市| 德庆县| 姜堰市| 屯留县| 德安县| 宁化县| 岑巩县| 宜川县| 延长县| 伊宁市| 河曲县| 耒阳市| 邓州市| 中江县| 大埔县| 双流县| 西昌市| 泗洪县| 观塘区| 简阳市| 新建县| 巢湖市| 会昌县| 台州市| 磐石市| 和龙市| 西和县| 德昌县| 靖宇县| 宝兴县| 环江| 沂南县| 海淀区| 东安县| 黑水县| 通渭县| 凉山| 确山县| 璧山县| 甘泉县|