隨筆-314  評論-209  文章-0  trackbacks-0

          我們每次執行hive的hql時,shell里都會提示一段話:

          [python] view plaincopy
          1. ...  
          2. Number of reduce tasks not specified. Estimated from input data size: 500  
          3. In order to change the average load for a reducer (in bytes):  
          4.   set hive.exec.reducers.bytes.per.reducer=<number>  
          5. In order to limit the maximum number of reducers:  
          6.   set hive.exec.reducers.max=<number>  
          7. In order to set a constant number of reducers:  
          8.   set mapred.reduce.tasks=<number>  
          9. ...  

          這個是調優的經常手段,主要有一下三個屬性來決定

          hive.exec.reducers.bytes.per.reducer    這個參數控制一個job會有多少個reducer來處理,依據的是輸入文件的總大小。默認1GB。

           This controls how many reducers a map-reduce job should have, depending on the total size of input files to the job. Default is 1GB
          hive.exec.reducers.max     這個參數控制最大的reducer的數量, 如果 input / bytes per reduce > max  則會啟動這個參數所指定的reduce個數。  這個并不會影響mapre.reduce.tasks參數的設置。默認的max是999。

          This controls the maximum number of reducers a map-reduce job can have.  If input_file_size divided by "hive.exec.bytes.per.reducer" is greater than this value, the map-reduce job will have this value as the number reducers.  Note this does not affect the number of reducers directly specified by the user through "mapred.reduce.tasks" and query hints

          mapred.reduce.tasks  這個參數如果指定了,hive就不會用它的estimation函數來自動計算reduce的個數,而是用這個參數來啟動reducer。默認是-1.

          This overrides the hadoop configuration to make sure we enable the estimation of the number of reducers by the size of the input files. If this value is non-negative, then hive will pass this number directly to map-reduce jobs instead of doing the estimation.

          reduce的個數設置其實對執行效率有很大的影響:

          1、如果reduce太少:  如果數據量很大,會導致這個reduce異常的慢,從而導致這個任務不能結束,也有可能會OOM

          2、如果reduce太多:  產生的小文件太多,合并起來代價太高,namenode的內存占用也會增大。


          如果我們不指定mapred.reduce.tasks, hive會自動計算需要多少個reducer。


          計算的公式:  reduce個數 =  InputFileSize   /   bytes per reducer

          這個數個粗略的公式,詳細的公式在:

          common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

          我們先看下: 

          1、計算輸入文件大小的方法:其實很簡單,遍歷每個路徑獲取length,累加。

          [python] view plaincopy
          1. +   * Calculate the total size of input files.  
          2. +   * @param job the hadoop job conf.  
          3. +   * @return the total size in bytes.  
          4. +   * @throws IOException   
          5. +   */  
          6. +  public static long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {  
          7. +    long r = 0;  
          8. +    FileSystem fs = FileSystem.get(job);  
          9. +    // For each input path, calculate the total size.  
          10. +    for (String path: work.getPathToAliases().keySet()) {  
          11. +      ContentSummary cs = fs.getContentSummary(new Path(path));  
          12. +      r += cs.getLength();  
          13. +    }  
          14. +    return r;  
          15. +  }  


          2、估算reducer的個數,及計算公式:

          注意最重要的一句話:  int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);

          [python] view plaincopy
          1. +  /**  
          2. +   * Estimate the number of reducers needed for this job, based on job input,  
          3. +   * and configuration parameters.  
          4. +   * @return the number of reducers.  
          5. +   */  
          6. +  public int estimateNumberOfReducers(HiveConf hive, JobConf job, mapredWork work) throws IOException {  
          7. +    long bytesPerReducer = hive.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);  
          8. +    int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);  
          9. +    long totalInputFileSize = getTotalInputFileSize(job, work);  
          10. +  
          11. +    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers   
          12. +        + " totalInputFileSize=" + totalInputFileSize);  
          13. +    int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);  
          14. +    reducers = Math.max(1, reducers);  
          15. +    reducers = Math.min(maxReducers, reducers);  
          16. +    return reducers;      
          17. +  }  

          3、真正的計算流程代碼:

          [python] view plaincopy
          1. +  /**  
          2. +   * Set the number of reducers for the mapred work.  
          3. +   */  
          4. +  protected void setNumberOfReducers() throws IOException {  
          5. +    // this is a temporary hack to fix things that are not fixed in the compiler  
          6. +    Integer numReducersFromWork = work.getNumReduceTasks();  
          7. +      
          8. +    if (numReducersFromWork != null && numReducersFromWork >= 0) {  
          9. +      LOG.info("Number of reduce tasks determined at compile: " + work.getNumReduceTasks());  
          10. +    } else if(work.getReducer() == null) {  
          11. +      LOG.info("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");  
          12. +      work.setNumReduceTasks(Integer.valueOf(0));  
          13. +    } else {  
          14. +      int reducers = estimateNumberOfReducers(conf, job, work);  
          15. +      work.setNumReduceTasks(reducers);  
          16. +      LOG.info("Number of reduce tasks not specified. Estimated from input data size: " + reducers);  
          17.      }  
          18.    }  

          這就是reduce個數計算的原理。


          By the way :

          今天中午在群里看到一位朋友問到:

          當前hive的reduce個數的設定是依據map階段輸入的數據量大小來除以每一個reduce能夠處理的數據量來決定有多少個的,但是考慮到經過map階段處理的數據很可能可輸入數據相差很大,這樣子的話,當初設定的reduce個數感覺不太合理。。。請問hive當前能支持依據map階段輸出數據量的大小決定reduce個數么?(但是,reduce任務的開啟是在有某些map任務完成就會開始的,所以要等到所有map全部執行完成再統計數據量來決定reduce個數感覺也不太合理)  有沒有什么好方法?謝謝


          這個問題的大意是,reducer個數是根據輸入文件的大小來估算出來的,但是實際情況下,Map的輸出文件才是真正要到reduce上計算的數據量,如何依據Map的階段輸出數據流覺得reduce的個數,才是實際的問題。


          我給出的思路是:

          1、hack下源碼,計算下每個map輸出的大小×map個數不就估算出map總共輸出的數據量了嗎?不用等它結束,因為每個map的處理量是一定的。

          2、你把源碼的 總輸入量 / 每個reduce處理量  改成 總輸出量 / 每個reduce處理量不就行了?(總輸出=每個Map輸出文件的大小×map個數)


          Ps:最后朋友提到:

          建議不錯,雖然有一定誤差。 謝謝。   不過,如果filter push down的話,每一個map的輸出大小差別可能比較大。。。而且filter push down 現在應該是hive默認支持的了


          大意是,還是會有一些誤差,謂詞下推可能會影響Map的輸出大小。


          本文權且當作回顧加備忘,如有不對之處,請高手指正。

          —EOF——




          posted on 2018-03-07 11:21 xzc 閱讀(1515) 評論(1)  編輯  收藏 所屬分類: hadoop

          評論:
          # re: Hive中reduce個數設定 2018-03-07 11:43 | xzc
          use DBHIVE;
          set mapreduce.job.queuename=root.xqueue;
          set hive.exec.stagingdir=/tmp/hive-staging;
          #set hive.exec.reducers.bytes.per.reducer=1024000000;
          set mapred.reduce.tasks = 50;
          INSERT OVERWRITE DIRECTORY '/user/gxhb/BILL_FEE_1707/EVENT_CDR' row format delimited fields terminated by '#' select rowkeyid,event_type_id,select_col,merge_col,base_table_name from tmp_bill_fee_1707 a where base_table_name='EVENT_CDR';
            回復  更多評論
            
          主站蜘蛛池模板: 石城县| 墨玉县| 浑源县| 山东省| 广水市| 石家庄市| 昭通市| 抚顺市| 廉江市| 宜春市| 隆安县| 论坛| 通化县| 丁青县| 莫力| 饶阳县| 肃宁县| 磴口县| 白城市| 福安市| 阳山县| 隆回县| 项城市| 光山县| 调兵山市| 泰来县| 汉阴县| 德钦县| 德兴市| 定兴县| 怀集县| 集安市| 雷州市| 崇文区| 邵阳县| 合山市| 哈巴河县| 青阳县| 视频| 凉城县| 安丘市|