為什么批量請(qǐng)求要盡可能的合并操作
前言
線上情況:
- 線上Redis集群,多個(gè)Twemproxy代理(nutcracker),LVS DR路由均衡調(diào)度
- 客戶端使用Jedis操作Redis集群,一個(gè)程序進(jìn)程實(shí)例使用原先1024個(gè)工作線程處理請(qǐng)求,若干個(gè)進(jìn)程實(shí)例
- 一天超過22億次請(qǐng)求,網(wǎng)絡(luò)一般情況下,一天超過上萬個(gè)連接失敗異常
- 運(yùn)維同學(xué)告知,LVS壓力較大
改進(jìn)工作:
- 工作線程由原先1024改用16個(gè)
- 每個(gè)線程每次最多操作1000個(gè)Redis命令批量提交
實(shí)際效果:
- 一天不到一億次的請(qǐng)求量
- LVS壓力大減
- CPU壓力降低到原先1/3以下
- 單個(gè)請(qǐng)求抽樣調(diào)研平均減少1-90毫秒時(shí)間(尤其是跨機(jī)房處理)
Redis支持批量提交
原生支持批量操作方式
一般命令前綴若添加上m字符串,表示支持多個(gè)、批量命令提交了。
顯式的...
MSET key value [key value ...]
MSETNX key value [key value ...]
HMGET key field [field ...]
HMSET key field value [field value ...]
一般方式的...
HDEL key field [field ...]
SREM key member [member ...]
RPUSH key value [value ...]
......
更多,請(qǐng)參考:http://redis.cn/commands.html
pipeline管道方式
官方文檔:http://redis.io/topics/pipelining
- Redis Client把所有命令一起打包發(fā)送到Redis Server,然后阻塞等待處理結(jié)果
- Redis Server必須在處理完所有命令前先緩存起所有命令的處理結(jié)果
- 打包的命令越多,緩存消耗內(nèi)存也越多
- 不是打包的命令越多越好
- 實(shí)際環(huán)境需要根據(jù)命令執(zhí)行時(shí)間等各種因素選擇合并命令的個(gè)數(shù),以及測(cè)試效果等
Java隊(duì)列支持
一般業(yè)務(wù)、接入前端請(qǐng)求量過大,生產(chǎn)者速度過快,這時(shí)候使用隊(duì)列暫時(shí)緩存會(huì)比較好一些,消費(fèi)者直接直接從隊(duì)列獲取任務(wù),通過隊(duì)列讓生產(chǎn)者和消費(fèi)者進(jìn)行分離這也是業(yè)界普通采用的方式。
監(jiān)控隊(duì)列
有的時(shí)候,若可以監(jiān)控一下隊(duì)列消費(fèi)情況,可以監(jiān)控一下,就很直觀。同事為隊(duì)列添加了一個(gè)監(jiān)控線程,清晰明了了解隊(duì)列消費(fèi)情況。
示范
示范使用了Redis Pipeline,線程池,準(zhǔn)備數(shù)據(jù),生產(chǎn)者-消費(fèi)者隊(duì)列,隊(duì)列監(jiān)控等,消費(fèi)完畢,程序關(guān)閉。
/**
* 以下測(cè)試在Jedis 2.6下測(cè)試通過
*
* @author nieyong
*
*/
public class TestJedisPipeline {
private static final int NUM = 512;
private static final int MAX = 1000000; // 100W
private static JedisPool redisPool;
private static final ExecutorService pool = Executors.newCachedThreadPool();
protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
MAX); // 100W
private static boolean finished = false;
static {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxActive(64);
config.setMaxIdle(64);
try {
redisPool = new JedisPool(config, "192.168.192.8", 6379, 10000,
null, 0);
} catch (Exception e) {
System.err.println("Init msg redis factory error! " + e.toString());
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("prepare test data 100W");
prepareTestData();
System.out.println("prepare test data done!");
// 生產(chǎn)者,模擬請(qǐng)求100W次
pool.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < MAX; i++) {
if (i % 3 == 0) {
queue.offer("del_key key_" + i);
} else {
queue.offer("get_key key_" + i);
}
}
}
});
// CPU核數(shù)*2 個(gè)工作者線程
int threadNum = 2 * Runtime.getRuntime().availableProcessors();
for (int i = 0; i < threadNum; i++)
pool.execute(new ConsumerTask());
pool.execute(new MonitorTask());
Thread.sleep(10 * 1000);// 10sec
System.out.println("going to shutdown server ...");
setFinished(true);
pool.shutdown();
pool.awaitTermination(1, TimeUnit.MILLISECONDS);
System.out.println("colse!");
}
private static void prepareTestData() {
Jedis redis = redisPool.getResource();
Pipeline pipeline = redis.pipelined();
for (int i = 0; i < MAX; i++) {
pipeline.set("key_" + i, (i * 2 + 1) + "");
if (i % (NUM * 2) == 0) {
pipeline.sync();
}
}
pipeline.sync();
redisPool.returnResource(redis);
}
// queue monitor,生產(chǎn)者-消費(fèi)隊(duì)列監(jiān)控
private static class MonitorTask implements Runnable {
@Override
public void run() {
while (!Thread.interrupted() && !isFinished()) {
System.out.println("queue.size = " + queue.size());
try {
Thread.sleep(500); // 0.5 second
} catch (InterruptedException e) {
break;
}
}
}
}
// consumer,消費(fèi)者
private static class ConsumerTask implements Runnable {
@Override
public void run() {
while (!Thread.interrupted() && !isFinished()) {
if (queue.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
continue;
}
List<String> tasks = new ArrayList<String>(NUM);
queue.drainTo(tasks, NUM);
if (tasks.isEmpty()) {
continue;
}
Jedis jedis = redisPool.getResource();
Pipeline pipeline = jedis.pipelined();
try {
List<Response<String>> resultList = new ArrayList<Response<String>>(
tasks.size());
List<String> waitDeleteList = new ArrayList<String>(
tasks.size());
for (String task : tasks) {
String key = task.split(" ")[1];
if (task.startsWith("get_key")) {
resultList.add(pipeline.get(key));
waitDeleteList.add(key);
} else if (task.startsWith("del_key")) {
pipeline.del(key);
}
}
pipeline.sync();
// 處理返回列表
for (int i = 0; i < resultList.size(); i++) {
resultList.get(i).get();
// handle value here ...
// System.out.println("get value " + value);
}
// 讀取完畢,直接刪除之
for (String key : waitDeleteList) {
pipeline.del(key);
}
pipeline.sync();
} catch (Exception e) {
redisPool.returnBrokenResource(jedis);
} finally {
redisPool.returnResource(jedis);
}
}
}
}
private static boolean isFinished(){
return finished;
}
private static void setFinished(boolean bool){
finished = bool;
}
}
代碼作為示范。若線上則需要處理一些異常等。
小結(jié)
若能夠批量請(qǐng)求進(jìn)行合并操作,自然可以節(jié)省很多的網(wǎng)絡(luò)帶寬、CPU等資源。有類似問題的同學(xué),不妨考慮一下。
posted on 2014-11-09 22:08 nieyong 閱讀(16189) 評(píng)論(17) 編輯 收藏 所屬分類: Socket