聶永的博客

          記錄工作/學(xué)習(xí)的點(diǎn)點(diǎn)滴滴。

          為什么批量請(qǐng)求要盡可能的合并操作

          前言

          線上情況:

          1. 線上Redis集群,多個(gè)Twemproxy代理(nutcracker),LVS DR路由均衡調(diào)度
          2. 客戶端使用Jedis操作Redis集群,一個(gè)程序進(jìn)程實(shí)例使用原先1024個(gè)工作線程處理請(qǐng)求,若干個(gè)進(jìn)程實(shí)例
          3. 一天超過22億次請(qǐng)求,網(wǎng)絡(luò)一般情況下,一天超過上萬個(gè)連接失敗異常
          4. 運(yùn)維同學(xué)告知,LVS壓力較大

          改進(jìn)工作:

          1. 工作線程由原先1024改用16個(gè)
          2. 每個(gè)線程每次最多操作1000個(gè)Redis命令批量提交

          實(shí)際效果:

          1. 一天不到一億次的請(qǐng)求量
          2. LVS壓力大減
          3. CPU壓力降低到原先1/3以下
          4. 單個(gè)請(qǐng)求抽樣調(diào)研平均減少1-90毫秒時(shí)間(尤其是跨機(jī)房處理)

          Redis支持批量提交

          原生支持批量操作方式

          一般命令前綴若添加上m字符串,表示支持多個(gè)、批量命令提交了。

          顯式的...

          MSET key value [key value ...]
          MSETNX key value [key value ...]
          
          HMGET key field [field ...]
          HMSET key field value [field value ...]
          

          一般方式的...

          HDEL key field [field ...]
          SREM key member [member ...]
          RPUSH key value [value ...]
          ......
          

          更多,請(qǐng)參考:http://redis.cn/commands.html

          pipeline管道方式

          官方文檔:http://redis.io/topics/pipelining

          1. Redis Client把所有命令一起打包發(fā)送到Redis Server,然后阻塞等待處理結(jié)果
          2. Redis Server必須在處理完所有命令前先緩存起所有命令的處理結(jié)果
          3. 打包的命令越多,緩存消耗內(nèi)存也越多
          4. 不是打包的命令越多越好
          5. 實(shí)際環(huán)境需要根據(jù)命令執(zhí)行時(shí)間等各種因素選擇合并命令的個(gè)數(shù),以及測(cè)試效果等

          Java隊(duì)列支持

          一般業(yè)務(wù)、接入前端請(qǐng)求量過大,生產(chǎn)者速度過快,這時(shí)候使用隊(duì)列暫時(shí)緩存會(huì)比較好一些,消費(fèi)者直接直接從隊(duì)列獲取任務(wù),通過隊(duì)列讓生產(chǎn)者和消費(fèi)者進(jìn)行分離這也是業(yè)界普通采用的方式。

          監(jiān)控隊(duì)列

          有的時(shí)候,若可以監(jiān)控一下隊(duì)列消費(fèi)情況,可以監(jiān)控一下,就很直觀。同事為隊(duì)列添加了一個(gè)監(jiān)控線程,清晰明了了解隊(duì)列消費(fèi)情況。

          示范

          示范使用了Redis Pipeline,線程池,準(zhǔn)備數(shù)據(jù),生產(chǎn)者-消費(fèi)者隊(duì)列,隊(duì)列監(jiān)控等,消費(fèi)完畢,程序關(guān)閉。

          /**
           * 以下測(cè)試在Jedis 2.6下測(cè)試通過
           * 
           * @author nieyong
           * 
           */
          public class TestJedisPipeline {
              private static final int NUM = 512;
              private static final int MAX = 1000000; // 100W
          
              private static JedisPool redisPool;
              private static final ExecutorService pool = Executors.newCachedThreadPool();
              protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
                      MAX); // 100W
              private static boolean finished = false;
          
              static {
                  JedisPoolConfig config = new JedisPoolConfig();
                  config.setMaxActive(64);
                  config.setMaxIdle(64);
          
                  try {
                      redisPool = new JedisPool(config, "192.168.192.8", 6379, 10000,
                              null, 0);
                  } catch (Exception e) {
                      System.err.println("Init msg redis factory error! " + e.toString());
                  }
              }
          
              public static void main(String[] args) throws InterruptedException {
                  System.out.println("prepare test data 100W");
                  prepareTestData();
                  System.out.println("prepare test data done!");
          
                  // 生產(chǎn)者,模擬請(qǐng)求100W次
                  pool.execute(new Runnable() {
                      @Override
                      public void run() {
                          for (int i = 0; i < MAX; i++) {
                              if (i % 3 == 0) {
                                  queue.offer("del_key key_" + i);
                              } else {
                                  queue.offer("get_key key_" + i);
                              }
                          }
                      }
                  });
          
                  // CPU核數(shù)*2 個(gè)工作者線程
                  int threadNum = 2 * Runtime.getRuntime().availableProcessors();
          
                  for (int i = 0; i < threadNum; i++)
                      pool.execute(new ConsumerTask());
          
                  pool.execute(new MonitorTask());
          
                  Thread.sleep(10 * 1000);// 10sec
                  System.out.println("going to shutdown server ...");
                  setFinished(true);
                  pool.shutdown();
          
                  pool.awaitTermination(1, TimeUnit.MILLISECONDS);
          
                  System.out.println("colse!");
              }
          
              private static void prepareTestData() {
                  Jedis redis = redisPool.getResource();
                  Pipeline pipeline = redis.pipelined();
          
                  for (int i = 0; i < MAX; i++) {
                      pipeline.set("key_" + i, (i * 2 + 1) + "");
          
                      if (i % (NUM * 2) == 0) {
                          pipeline.sync();
                      }
                  }
                  pipeline.sync();
                  redisPool.returnResource(redis);
              }
          
              // queue monitor,生產(chǎn)者-消費(fèi)隊(duì)列監(jiān)控
              private static class MonitorTask implements Runnable {
          
                  @Override
                  public void run() {
                      while (!Thread.interrupted() && !isFinished()) {
                          System.out.println("queue.size = " + queue.size());
                          try {
                              Thread.sleep(500); // 0.5 second
                          } catch (InterruptedException e) {
                              break;
                          }
                      }
                  }
              }
          
              // consumer,消費(fèi)者
              private static class ConsumerTask implements Runnable {
                  @Override
                  public void run() {
                      while (!Thread.interrupted() && !isFinished()) {
                          if (queue.isEmpty()) {
                              try {
                                  Thread.sleep(100);
                              } catch (InterruptedException e) {
                              }
          
                              continue;
                          }
          
                          List<String> tasks = new ArrayList<String>(NUM);
                          queue.drainTo(tasks, NUM);
                          if (tasks.isEmpty()) {
                              continue;
                          }
          
                          Jedis jedis = redisPool.getResource();
                          Pipeline pipeline = jedis.pipelined();
          
                          try {
                              List<Response<String>> resultList = new ArrayList<Response<String>>(
                                      tasks.size());
          
                              List<String> waitDeleteList = new ArrayList<String>(
                                      tasks.size());
          
                              for (String task : tasks) {
                                  String key = task.split(" ")[1];
                                  if (task.startsWith("get_key")) {
                                      resultList.add(pipeline.get(key));
                                      waitDeleteList.add(key);
                                  } else if (task.startsWith("del_key")) {
                                      pipeline.del(key);
                                  }
                              }
          
                              pipeline.sync();
          
                              // 處理返回列表
                              for (int i = 0; i < resultList.size(); i++) {
                                  resultList.get(i).get();
                                  // handle value here ...
                                  // System.out.println("get value " + value);
                              }
          
                              // 讀取完畢,直接刪除之
                              for (String key : waitDeleteList) {
                                  pipeline.del(key);
                              }
          
                              pipeline.sync();
                          } catch (Exception e) {
                              redisPool.returnBrokenResource(jedis);
                          } finally {
                              redisPool.returnResource(jedis);
                          }
                      }
                  }
              }
          
              private static boolean isFinished(){
                  return finished;
              }
          
              private static void setFinished(boolean bool){
                  finished = bool;
              }
          }
          

          代碼作為示范。若線上則需要處理一些異常等。

          小結(jié)

          若能夠批量請(qǐng)求進(jìn)行合并操作,自然可以節(jié)省很多的網(wǎng)絡(luò)帶寬、CPU等資源。有類似問題的同學(xué),不妨考慮一下。

          posted on 2014-11-09 22:08 nieyong 閱讀(16189) 評(píng)論(17)  編輯  收藏 所屬分類: Socket

          評(píng)論

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2014-11-10 09:34 下巴長(zhǎng)痘痘是什么原因

          經(jīng)過樓主的講解,我現(xiàn)在才明白為什么批量請(qǐng)求要盡可能的合并操作  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作[未登錄] 2014-11-11 13:36 劉洋

          好專業(yè)啊...每個(gè)月有一諞  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2014-11-13 14:37 Stanley Xu

          線程數(shù)要根據(jù)cpu的情況而決定的,一臺(tái)4核的機(jī)器開40個(gè)線程就是蛋疼。同步、context switch的開銷已經(jīng)超過了線程帶來的優(yōu)勢(shì)。如果不合并,僅僅減少線程數(shù),性能也會(huì)有所優(yōu)化。  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2014-11-16 09:12 妞妞寶貝衣間

          支持博主分享,歡迎到我的小店、、、、  回復(fù)  更多評(píng)論   

          # gank開黑吧 2014-11-16 22:13 gank開黑吧

          gank開黑吧http://www.kaihei8.com 贊一下博主  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2014-11-16 23:37 網(wǎng)絡(luò)營(yíng)銷技巧

          看了樓主的講解,我現(xiàn)在才大致明白  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2014-11-17 14:32 臉上長(zhǎng)粉刺是什么原因

          不錯(cuò)的文章,學(xué)習(xí)了  回復(fù)  更多評(píng)論   

          # 武岡SEO 2014-11-25 22:57 794680490@qq.com

          文章很實(shí)用,學(xué)習(xí)了,到時(shí)實(shí)踐下  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-01-24 15:09 黎洪鑫

          請(qǐng)教一下,我們使用pipeline的方式后,出現(xiàn)了在一些閑時(shí),內(nèi)存暴漲。然后kill掉twemproxy之后就降下來了。然后查了相關(guān)的資料,把pipeline的數(shù)量降到500,甚至20了,仍然出現(xiàn)。而且11臺(tái)機(jī)器中,有一些機(jī)器經(jīng)常出現(xiàn),但是最近經(jīng)常出現(xiàn)的不出現(xiàn),從沒出現(xiàn)的又出現(xiàn)這情況了。不知道您是否有遇到過,如果解決。  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-01-26 09:39 nieyong

          @黎洪鑫
          沒有遇見過類似問題,愛莫能助。
          因?yàn)閜ipeline是一個(gè)阻塞請(qǐng)求-響應(yīng)過程,這一點(diǎn)很重要;另外網(wǎng)絡(luò)機(jī)房擁塞會(huì)導(dǎo)致非常大的延遲,具體情況就是請(qǐng)求發(fā)出去,等待很長(zhǎng)時(shí)間響應(yīng)。若是機(jī)房網(wǎng)絡(luò)延遲問題,可以考慮把pipeline異步提交,不要阻塞當(dāng)前線程。
          以上都是建議,僅供參考!  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-01-26 10:46 黎洪鑫

          多謝了,我先做一下升級(jí)看看情況會(huì)不會(huì)改善。@nieyong
            回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-05-28 10:05 tinsang

          @nieyong
          可以考慮把pipeline異步提交,不要阻塞當(dāng)前線程 ;

          這個(gè)異步是指?不是很明白  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-05-28 17:04 nieyong

          @tinsang
          把較為耗時(shí)任務(wù)委派到其它線程處理,當(dāng)前業(yè)務(wù)線程繼續(xù)忙別的。  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-05-28 19:01 tinsang

          @nieyong
          那我明白你的意思了  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-05-28 19:02 tinsang

          @nieyong
          pipeline阻塞了,那其他請(qǐng)求redis不是一樣被阻塞了?  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2015-05-29 09:52 nieyong

          @tinsang
          針對(duì)單臺(tái)Redis而言,單線程模型。一旦pipeline阻塞了,其它請(qǐng)求會(huì)被阻塞住。可考慮單線程操作管道,一個(gè)一個(gè)批處理。  回復(fù)  更多評(píng)論   

          # re: 為什么批量請(qǐng)求要盡可能的合并操作 2016-05-16 17:45 zhouwei

          private static boolean finished = false;
          finished變量應(yīng)該為volatile。
          樓主這么牛逼的人不應(yīng)該犯這種小錯(cuò)誤 ^_^
          好文章,學(xué)習(xí)了~  回復(fù)  更多評(píng)論   

          公告

          所有文章皆為原創(chuàng),若轉(zhuǎn)載請(qǐng)標(biāo)明出處,謝謝~

          新浪微博,歡迎關(guān)注:

          導(dǎo)航

          <2014年11月>
          2627282930311
          2345678
          9101112131415
          16171819202122
          23242526272829
          30123456

          統(tǒng)計(jì)

          常用鏈接

          留言簿(58)

          隨筆分類(130)

          隨筆檔案(151)

          個(gè)人收藏

          最新隨筆

          搜索

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 沅陵县| 县级市| 新津县| 托克托县| 新乡市| 德保县| 闸北区| 忻州市| 安远县| 张家口市| 颍上县| 云安县| 峨山| 花莲县| 天水市| 井陉县| 武强县| 镇安县| 镇原县| 南江县| 莲花县| 遂昌县| 潞城市| 手游| 金秀| 南乐县| 柏乡县| 宜兰县| 栾川县| 宿迁市| 资阳市| 塘沽区| 同德县| 宿州市| 久治县| 疏勒县| 青冈县| 茶陵县| 中卫市| 台东县| 巨鹿县|