环境是这L(fng)Q服务器是用Java做的Q?数据库是MongoDB
需求是q样的:(x)我们的系l里要生成一个唯一IDQ前面的部分有一定的格式Qƈ和时间关联, _到微U,考虑到同一微秒内有可能存在q发情况Q? 所以后面在加两位序列号Q?pȝ需要定义ؓ(f)1毫秒内的q发于100个,所以后面两位就够用?jin)? Java服务器端有多台机器都可以用来生成q个唯一IDQ所以需要在不同的机器上不能生成相同的序列号Q所以需要在某一点上做全局的范围同步来保存q序? L(fng)唯一性?其实如果不考虑需求里的唯一ID是有一定意义的格式的, 用UUID或MongoDB的ObjectId都是更好的选择Q完全不需要在某一点上q行同步Q性能?x)更好?/p>
q个可以生成序列L(fng)点, 我们可以做一个序列号生成服务器来对应Q?也可以用数据库来对应? 单单个简单的功能准备一个服务器来做昄不合适?但是我们用的MongoDBq没有类gMySQL或Oracle中的SELECT FOR UPDATEq样的锁机制?所以没有办法简单的对这个序列号做原子操作? 但是MongoDB的对单个documentq行update操作中有很是h原子性的Q?例如
- $set
- $unset
- $inc
- $push
- $pushAll
- $pull
- $pullAll
我们可以利用q些原子操作Q在数据库层以乐观锁的Ş式来实现循环序列字段。ؓ(f)?jin)方便调用我把这D逻辑做成数据库中的Javascript函数?cM与MySQL中的存储q程?/p>
首先我们需要一个collection来存攑ֺ列号Qƈ寚w要的需要的序列可行初始化。我们叫它counters?/p>
- db.counters.save({_id:"SerialNo1", val:0, maxval:99})
然后我们想system.js里添加一个Javascript函数
- db.system.js.save({_id:"getNextUniqueSeq",
- value:function (keyName) {
- var seqObj = db.counters.findOne({_id:keyName});
- if (seqObj == null) {
- print("can not find record with key: " + keyName);
- return -1;
- }
- // the max value of sequence
- var maxVal = seqObj.maxval;
- // the current value of sequence
- var curVal = seqObj.val;
- while(true){
- // if curVal reach max, reset it
- if(curVal >= maxVal){
- db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error reset data: " + tojson( err ) );
- return -2;
- } else if (err.n == 0){
- // fail to reset value, may be reseted by others
- print("fail to reset value: ");
- }
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- }
- // if curVal not reach the max, inc it;
- // increase
- db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error inc val: " + tojson( err ) );
- return -3;
- } else if (err.n == 0){
- // fail to reset value, may be increased by others
- print("fail to inc value: ");
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- } else {
- var retVal = curVal + 1;
- print("success to get seq : " + retVal);
- // increase successful
- return retVal;
- }
- }
- }
- });
上面q段?x)把指定的序列号的val?1Q如果val辑ֈ上限则从0开始。所以叫循环序列?/p>
其实上面的实现在原理上和Java里的AtomicIntegerpd的功能实现是cM的,利用循环重试和原子性的CAS来实现。这U实现方式在多线E的环境里由于锁QMonitorQ的范围很小Q所以ƈ发性上比排他锁要好一些?/p>
下面我们用Java来测试一下这个函数的正确性?卛_多线E的情况下会(x)不会(x)得到重复的序列号?/p>
W一个测试,val=0Q?maxval=2000Q?Java?0个线E每个线E@环调?00ơ??000ơ?所以正的情况下,??999应该每个数字只出Cơ?/p>
- @Test
- public void testGetNextUniqueSeq1() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", THREAD_COUNT * LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {
- // every number appear 1 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(1, times);
- }
- }
然后我们再测试一下@环的情况?val=0, maxval=99?同样是Java?0个线E每个线E@环调?00ơ??000ơ。这ơ从0?9的数字每个应该取?0ơ?/p>
- @Test
- public void testGetNextUniqueSeq2() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT; num++) {
- // every number appear 20 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(20, times);
- }
- }
q个试跑了(jin)几次都是正确的?/p>
׃没有可以q行Ҏ(gu)其他的实现方式(例如排他锁)(j)所以没有做性能试?/p>
写在最后?虽然MongoDB支持cM于存储过E的Stored JavascriptQ但是其实不使用q个来解军_杂问题。主要原因是没法调试Q维护v来太不方ѝ而且?.4之前MongoDBҎ(gu)务端 Javascript支持q不是很好, 一个mongodq程同时只能执行一DJavascript。如果能在应用层解决掉还是在应用层里实现逻辑比较好?/p>