隨筆-314  評論-209  文章-0  trackbacks-0

          我們每次執(zhí)行hive的hql時,shell里都會提示一段話:

          [python] view plaincopy
          1. ...  
          2. Number of reduce tasks not specified. Estimated from input data size: 500  
          3. In order to change the average load for a reducer (in bytes):  
          4.   set hive.exec.reducers.bytes.per.reducer=<number>  
          5. In order to limit the maximum number of reducers:  
          6.   set hive.exec.reducers.max=<number>  
          7. In order to set a constant number of reducers:  
          8.   set mapred.reduce.tasks=<number>  
          9. ...  

          這個是調(diào)優(yōu)的經(jīng)常手段,主要有一下三個屬性來決定

          hive.exec.reducers.bytes.per.reducer    這個參數(shù)控制一個job會有多少個reducer來處理,依據(jù)的是輸入文件的總大小。默認1GB。

           This controls how many reducers a map-reduce job should have, depending on the total size of input files to the job. Default is 1GB
          hive.exec.reducers.max     這個參數(shù)控制最大的reducer的數(shù)量, 如果 input / bytes per reduce > max  則會啟動這個參數(shù)所指定的reduce個數(shù)。  這個并不會影響mapre.reduce.tasks參數(shù)的設(shè)置。默認的max是999。

          This controls the maximum number of reducers a map-reduce job can have.  If input_file_size divided by "hive.exec.bytes.per.reducer" is greater than this value, the map-reduce job will have this value as the number reducers.  Note this does not affect the number of reducers directly specified by the user through "mapred.reduce.tasks" and query hints

          mapred.reduce.tasks  這個參數(shù)如果指定了,hive就不會用它的estimation函數(shù)來自動計算reduce的個數(shù),而是用這個參數(shù)來啟動reducer。默認是-1.

          This overrides the hadoop configuration to make sure we enable the estimation of the number of reducers by the size of the input files. If this value is non-negative, then hive will pass this number directly to map-reduce jobs instead of doing the estimation.

          reduce的個數(shù)設(shè)置其實對執(zhí)行效率有很大的影響:

          1、如果reduce太少:  如果數(shù)據(jù)量很大,會導致這個reduce異常的慢,從而導致這個任務(wù)不能結(jié)束,也有可能會OOM

          2、如果reduce太多:  產(chǎn)生的小文件太多,合并起來代價太高,namenode的內(nèi)存占用也會增大。


          如果我們不指定mapred.reduce.tasks, hive會自動計算需要多少個reducer。


          計算的公式:  reduce個數(shù) =  InputFileSize   /   bytes per reducer

          這個數(shù)個粗略的公式,詳細的公式在:

          common/src/java/org/apache/hadoop/hive/conf/HiveConf.java

          我們先看下: 

          1、計算輸入文件大小的方法:其實很簡單,遍歷每個路徑獲取length,累加。

          [python] view plaincopy
          1. +   * Calculate the total size of input files.  
          2. +   * @param job the hadoop job conf.  
          3. +   * @return the total size in bytes.  
          4. +   * @throws IOException   
          5. +   */  
          6. +  public static long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {  
          7. +    long r = 0;  
          8. +    FileSystem fs = FileSystem.get(job);  
          9. +    // For each input path, calculate the total size.  
          10. +    for (String path: work.getPathToAliases().keySet()) {  
          11. +      ContentSummary cs = fs.getContentSummary(new Path(path));  
          12. +      r += cs.getLength();  
          13. +    }  
          14. +    return r;  
          15. +  }  


          2、估算reducer的個數(shù),及計算公式:

          注意最重要的一句話:  int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);

          [python] view plaincopy
          1. +  /**  
          2. +   * Estimate the number of reducers needed for this job, based on job input,  
          3. +   * and configuration parameters.  
          4. +   * @return the number of reducers.  
          5. +   */  
          6. +  public int estimateNumberOfReducers(HiveConf hive, JobConf job, mapredWork work) throws IOException {  
          7. +    long bytesPerReducer = hive.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);  
          8. +    int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);  
          9. +    long totalInputFileSize = getTotalInputFileSize(job, work);  
          10. +  
          11. +    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers   
          12. +        + " totalInputFileSize=" + totalInputFileSize);  
          13. +    int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);  
          14. +    reducers = Math.max(1, reducers);  
          15. +    reducers = Math.min(maxReducers, reducers);  
          16. +    return reducers;      
          17. +  }  

          3、真正的計算流程代碼:

          [python] view plaincopy
          1. +  /**  
          2. +   * Set the number of reducers for the mapred work.  
          3. +   */  
          4. +  protected void setNumberOfReducers() throws IOException {  
          5. +    // this is a temporary hack to fix things that are not fixed in the compiler  
          6. +    Integer numReducersFromWork = work.getNumReduceTasks();  
          7. +      
          8. +    if (numReducersFromWork != null && numReducersFromWork >= 0) {  
          9. +      LOG.info("Number of reduce tasks determined at compile: " + work.getNumReduceTasks());  
          10. +    } else if(work.getReducer() == null) {  
          11. +      LOG.info("Number of reduce tasks not specified. Defaulting to 0 since there's no reduce operator");  
          12. +      work.setNumReduceTasks(Integer.valueOf(0));  
          13. +    } else {  
          14. +      int reducers = estimateNumberOfReducers(conf, job, work);  
          15. +      work.setNumReduceTasks(reducers);  
          16. +      LOG.info("Number of reduce tasks not specified. Estimated from input data size: " + reducers);  
          17.      }  
          18.    }  

          這就是reduce個數(shù)計算的原理。


          By the way :

          今天中午在群里看到一位朋友問到:

          當前hive的reduce個數(shù)的設(shè)定是依據(jù)map階段輸入的數(shù)據(jù)量大小來除以每一個reduce能夠處理的數(shù)據(jù)量來決定有多少個的,但是考慮到經(jīng)過map階段處理的數(shù)據(jù)很可能可輸入數(shù)據(jù)相差很大,這樣子的話,當初設(shè)定的reduce個數(shù)感覺不太合理。。。請問hive當前能支持依據(jù)map階段輸出數(shù)據(jù)量的大小決定reduce個數(shù)么?(但是,reduce任務(wù)的開啟是在有某些map任務(wù)完成就會開始的,所以要等到所有map全部執(zhí)行完成再統(tǒng)計數(shù)據(jù)量來決定reduce個數(shù)感覺也不太合理)  有沒有什么好方法?謝謝


          這個問題的大意是,reducer個數(shù)是根據(jù)輸入文件的大小來估算出來的,但是實際情況下,Map的輸出文件才是真正要到reduce上計算的數(shù)據(jù)量,如何依據(jù)Map的階段輸出數(shù)據(jù)流覺得reduce的個數(shù),才是實際的問題。


          我給出的思路是:

          1、hack下源碼,計算下每個map輸出的大小×map個數(shù)不就估算出map總共輸出的數(shù)據(jù)量了嗎?不用等它結(jié)束,因為每個map的處理量是一定的。

          2、你把源碼的 總輸入量 / 每個reduce處理量  改成 總輸出量 / 每個reduce處理量不就行了?(總輸出=每個Map輸出文件的大小×map個數(shù))


          Ps:最后朋友提到:

          建議不錯,雖然有一定誤差。 謝謝。   不過,如果filter push down的話,每一個map的輸出大小差別可能比較大。。。而且filter push down 現(xiàn)在應該是hive默認支持的了


          大意是,還是會有一些誤差,謂詞下推可能會影響Map的輸出大小。


          本文權(quán)且當作回顧加備忘,如有不對之處,請高手指正。

          —EOF——




          posted on 2018-03-07 11:21 xzc 閱讀(1515) 評論(1)  編輯  收藏 所屬分類: hadoop

          評論:
          # re: Hive中reduce個數(shù)設(shè)定 2018-03-07 11:43 | xzc
          use DBHIVE;
          set mapreduce.job.queuename=root.xqueue;
          set hive.exec.stagingdir=/tmp/hive-staging;
          #set hive.exec.reducers.bytes.per.reducer=1024000000;
          set mapred.reduce.tasks = 50;
          INSERT OVERWRITE DIRECTORY '/user/gxhb/BILL_FEE_1707/EVENT_CDR' row format delimited fields terminated by '#' select rowkeyid,event_type_id,select_col,merge_col,base_table_name from tmp_bill_fee_1707 a where base_table_name='EVENT_CDR';
            回復  更多評論
            
          主站蜘蛛池模板: 图们市| 玉山县| 若羌县| 秦安县| 黑水县| 和静县| 高州市| 名山县| 尼勒克县| 铁力市| 青浦区| 肃宁县| 宁陵县| 田林县| 修文县| 资源县| 保德县| 历史| 彭泽县| 北安市| 剑阁县| 乡宁县| 子长县| 繁峙县| 南陵县| 宜黄县| 明水县| 崇文区| 金塔县| 博白县| 确山县| 克东县| 昭苏县| 樟树市| 扎兰屯市| 南投县| 遂昌县| 江北区| 读书| 瑞昌市| 彝良县|