聶永的博客

          記錄工作/學習的點點滴滴。

          為什么批量請求要盡可能的合并操作

          前言

          線上情況:

          1. 線上Redis集群,多個Twemproxy代理(nutcracker),LVS DR路由均衡調度
          2. 客戶端使用Jedis操作Redis集群,一個程序進程實例使用原先1024個工作線程處理請求,若干個進程實例
          3. 一天超過22億次請求,網絡一般情況下,一天超過上萬個連接失敗異常
          4. 運維同學告知,LVS壓力較大

          改進工作:

          1. 工作線程由原先1024改用16個
          2. 每個線程每次最多操作1000個Redis命令批量提交

          實際效果:

          1. 一天不到一億次的請求量
          2. LVS壓力大減
          3. CPU壓力降低到原先1/3以下
          4. 單個請求抽樣調研平均減少1-90毫秒時間(尤其是跨機房處理)

          Redis支持批量提交

          原生支持批量操作方式

          一般命令前綴若添加上m字符串,表示支持多個、批量命令提交了。

          顯式的...

          MSET key value [key value ...]
          MSETNX key value [key value ...]
          
          HMGET key field [field ...]
          HMSET key field value [field value ...]
          

          一般方式的...

          HDEL key field [field ...]
          SREM key member [member ...]
          RPUSH key value [value ...]
          ......
          

          更多,請參考:http://redis.cn/commands.html

          pipeline管道方式

          官方文檔:http://redis.io/topics/pipelining

          1. Redis Client把所有命令一起打包發送到Redis Server,然后阻塞等待處理結果
          2. Redis Server必須在處理完所有命令前先緩存起所有命令的處理結果
          3. 打包的命令越多,緩存消耗內存也越多
          4. 不是打包的命令越多越好
          5. 實際環境需要根據命令執行時間等各種因素選擇合并命令的個數,以及測試效果等

          Java隊列支持

          一般業務、接入前端請求量過大,生產者速度過快,這時候使用隊列暫時緩存會比較好一些,消費者直接直接從隊列獲取任務,通過隊列讓生產者和消費者進行分離這也是業界普通采用的方式。

          監控隊列

          有的時候,若可以監控一下隊列消費情況,可以監控一下,就很直觀。同事為隊列添加了一個監控線程,清晰明了了解隊列消費情況。

          示范

          示范使用了Redis Pipeline,線程池,準備數據,生產者-消費者隊列,隊列監控等,消費完畢,程序關閉。

          /**
           * 以下測試在Jedis 2.6下測試通過
           * 
           * @author nieyong
           * 
           */
          public class TestJedisPipeline {
              private static final int NUM = 512;
              private static final int MAX = 1000000; // 100W
          
              private static JedisPool redisPool;
              private static final ExecutorService pool = Executors.newCachedThreadPool();
              protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
                      MAX); // 100W
              private static boolean finished = false;
          
              static {
                  JedisPoolConfig config = new JedisPoolConfig();
                  config.setMaxActive(64);
                  config.setMaxIdle(64);
          
                  try {
                      redisPool = new JedisPool(config, "192.168.192.8", 6379, 10000,
                              null, 0);
                  } catch (Exception e) {
                      System.err.println("Init msg redis factory error! " + e.toString());
                  }
              }
          
              public static void main(String[] args) throws InterruptedException {
                  System.out.println("prepare test data 100W");
                  prepareTestData();
                  System.out.println("prepare test data done!");
          
                  // 生產者,模擬請求100W次
                  pool.execute(new Runnable() {
                      @Override
                      public void run() {
                          for (int i = 0; i < MAX; i++) {
                              if (i % 3 == 0) {
                                  queue.offer("del_key key_" + i);
                              } else {
                                  queue.offer("get_key key_" + i);
                              }
                          }
                      }
                  });
          
                  // CPU核數*2 個工作者線程
                  int threadNum = 2 * Runtime.getRuntime().availableProcessors();
          
                  for (int i = 0; i < threadNum; i++)
                      pool.execute(new ConsumerTask());
          
                  pool.execute(new MonitorTask());
          
                  Thread.sleep(10 * 1000);// 10sec
                  System.out.println("going to shutdown server ...");
                  setFinished(true);
                  pool.shutdown();
          
                  pool.awaitTermination(1, TimeUnit.MILLISECONDS);
          
                  System.out.println("colse!");
              }
          
              private static void prepareTestData() {
                  Jedis redis = redisPool.getResource();
                  Pipeline pipeline = redis.pipelined();
          
                  for (int i = 0; i < MAX; i++) {
                      pipeline.set("key_" + i, (i * 2 + 1) + "");
          
                      if (i % (NUM * 2) == 0) {
                          pipeline.sync();
                      }
                  }
                  pipeline.sync();
                  redisPool.returnResource(redis);
              }
          
              // queue monitor,生產者-消費隊列監控
              private static class MonitorTask implements Runnable {
          
                  @Override
                  public void run() {
                      while (!Thread.interrupted() && !isFinished()) {
                          System.out.println("queue.size = " + queue.size());
                          try {
                              Thread.sleep(500); // 0.5 second
                          } catch (InterruptedException e) {
                              break;
                          }
                      }
                  }
              }
          
              // consumer,消費者
              private static class ConsumerTask implements Runnable {
                  @Override
                  public void run() {
                      while (!Thread.interrupted() && !isFinished()) {
                          if (queue.isEmpty()) {
                              try {
                                  Thread.sleep(100);
                              } catch (InterruptedException e) {
                              }
          
                              continue;
                          }
          
                          List<String> tasks = new ArrayList<String>(NUM);
                          queue.drainTo(tasks, NUM);
                          if (tasks.isEmpty()) {
                              continue;
                          }
          
                          Jedis jedis = redisPool.getResource();
                          Pipeline pipeline = jedis.pipelined();
          
                          try {
                              List<Response<String>> resultList = new ArrayList<Response<String>>(
                                      tasks.size());
          
                              List<String> waitDeleteList = new ArrayList<String>(
                                      tasks.size());
          
                              for (String task : tasks) {
                                  String key = task.split(" ")[1];
                                  if (task.startsWith("get_key")) {
                                      resultList.add(pipeline.get(key));
                                      waitDeleteList.add(key);
                                  } else if (task.startsWith("del_key")) {
                                      pipeline.del(key);
                                  }
                              }
          
                              pipeline.sync();
          
                              // 處理返回列表
                              for (int i = 0; i < resultList.size(); i++) {
                                  resultList.get(i).get();
                                  // handle value here ...
                                  // System.out.println("get value " + value);
                              }
          
                              // 讀取完畢,直接刪除之
                              for (String key : waitDeleteList) {
                                  pipeline.del(key);
                              }
          
                              pipeline.sync();
                          } catch (Exception e) {
                              redisPool.returnBrokenResource(jedis);
                          } finally {
                              redisPool.returnResource(jedis);
                          }
                      }
                  }
              }
          
              private static boolean isFinished(){
                  return finished;
              }
          
              private static void setFinished(boolean bool){
                  finished = bool;
              }
          }
          

          代碼作為示范。若線上則需要處理一些異常等。

          小結

          若能夠批量請求進行合并操作,自然可以節省很多的網絡帶寬、CPU等資源。有類似問題的同學,不妨考慮一下。

          posted on 2014-11-09 22:08 nieyong 閱讀(16190) 評論(17)  編輯  收藏 所屬分類: Socket

          評論

          # re: 為什么批量請求要盡可能的合并操作 2014-11-10 09:34 下巴長痘痘是什么原因

          經過樓主的講解,我現在才明白為什么批量請求要盡可能的合并操作  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作[未登錄] 2014-11-11 13:36 劉洋

          好專業啊...每個月有一諞  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2014-11-13 14:37 Stanley Xu

          線程數要根據cpu的情況而決定的,一臺4核的機器開40個線程就是蛋疼。同步、context switch的開銷已經超過了線程帶來的優勢。如果不合并,僅僅減少線程數,性能也會有所優化。  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2014-11-16 09:12 妞妞寶貝衣間

          支持博主分享,歡迎到我的小店、、、、  回復  更多評論   

          # gank開黑吧 2014-11-16 22:13 gank開黑吧

          gank開黑吧http://www.kaihei8.com 贊一下博主  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2014-11-16 23:37 網絡營銷技巧

          看了樓主的講解,我現在才大致明白  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2014-11-17 14:32 臉上長粉刺是什么原因

          不錯的文章,學習了  回復  更多評論   

          # 武岡SEO 2014-11-25 22:57 794680490@qq.com

          文章很實用,學習了,到時實踐下  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-01-24 15:09 黎洪鑫

          請教一下,我們使用pipeline的方式后,出現了在一些閑時,內存暴漲。然后kill掉twemproxy之后就降下來了。然后查了相關的資料,把pipeline的數量降到500,甚至20了,仍然出現。而且11臺機器中,有一些機器經常出現,但是最近經常出現的不出現,從沒出現的又出現這情況了。不知道您是否有遇到過,如果解決。  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-01-26 09:39 nieyong

          @黎洪鑫
          沒有遇見過類似問題,愛莫能助。
          因為pipeline是一個阻塞請求-響應過程,這一點很重要;另外網絡機房擁塞會導致非常大的延遲,具體情況就是請求發出去,等待很長時間響應。若是機房網絡延遲問題,可以考慮把pipeline異步提交,不要阻塞當前線程。
          以上都是建議,僅供參考!  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-01-26 10:46 黎洪鑫

          多謝了,我先做一下升級看看情況會不會改善。@nieyong
            回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-05-28 10:05 tinsang

          @nieyong
          可以考慮把pipeline異步提交,不要阻塞當前線程 ;

          這個異步是指?不是很明白  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-05-28 17:04 nieyong

          @tinsang
          把較為耗時任務委派到其它線程處理,當前業務線程繼續忙別的。  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-05-28 19:01 tinsang

          @nieyong
          那我明白你的意思了  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-05-28 19:02 tinsang

          @nieyong
          pipeline阻塞了,那其他請求redis不是一樣被阻塞了?  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2015-05-29 09:52 nieyong

          @tinsang
          針對單臺Redis而言,單線程模型。一旦pipeline阻塞了,其它請求會被阻塞住。可考慮單線程操作管道,一個一個批處理。  回復  更多評論   

          # re: 為什么批量請求要盡可能的合并操作 2016-05-16 17:45 zhouwei

          private static boolean finished = false;
          finished變量應該為volatile。
          樓主這么牛逼的人不應該犯這種小錯誤 ^_^
          好文章,學習了~  回復  更多評論   

          公告

          所有文章皆為原創,若轉載請標明出處,謝謝~

          新浪微博,歡迎關注:

          導航

          <2015年5月>
          262728293012
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456

          統計

          常用鏈接

          留言簿(58)

          隨筆分類(130)

          隨筆檔案(151)

          個人收藏

          最新隨筆

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 清原| 台湾省| 潍坊市| 黄龙县| 阳谷县| 河池市| 宁河县| 吉水县| 疏附县| 政和县| 扶沟县| 宣武区| 云南省| 东乌| 延安市| 宣化县| 多伦县| 清镇市| 嘉义县| 兴仁县| 巴塘县| 赣州市| 抚顺市| 泾阳县| 桐梓县| 井冈山市| 黄浦区| 景洪市| 金湖县| 东至县| 高雄市| 衡阳县| 新河县| 衡阳市| 通道| 台安县| 莆田市| 新津县| 陕西省| 嘉定区| 义乌市|