環(huán)境是這樣的:服務(wù)器是用Java做的, 數(shù)據(jù)庫(kù)是MongoDB

 

需求是這樣的:我們的系統(tǒng)里要生成一個(gè)唯一ID,前面的部分有一定的格式,并和時(shí)間關(guān)聯(lián), 精確到微秒,考慮到同一微秒內(nèi)有可能存在并發(fā)情況, 所以后面在加兩位序列號(hào), 系統(tǒng)需要定義為1毫秒內(nèi)的并發(fā)小于100個(gè),所以后面兩位就夠用了。 Java服務(wù)器端有多臺(tái)機(jī)器都可以用來(lái)生成這個(gè)唯一ID,所以需要在不同的機(jī)器上不能生成相同的序列號(hào),所以需要在某一點(diǎn)上做全局的范圍同步來(lái)保存這序列 號(hào)的唯一性。 其實(shí)如果不考慮需求里的唯一ID是有一定意義的格式的, 用UUID或MongoDB的ObjectId都是更好的選擇,完全不需要在某一點(diǎn)上進(jìn)行同步,性能會(huì)更好。

 

這個(gè)可以生成序列號(hào)的點(diǎn), 我們可以做一個(gè)序列號(hào)生成服務(wù)器來(lái)對(duì)應(yīng), 也可以用數(shù)據(jù)庫(kù)來(lái)對(duì)應(yīng)。 單單為這個(gè)簡(jiǎn)單的功能準(zhǔn)備一個(gè)服務(wù)器來(lái)做顯然不合適。 但是我們用的MongoDB并沒(méi)有類似于MySQL或Oracle中的SELECT FOR UPDATE這樣的鎖機(jī)制。 所以沒(méi)有辦法簡(jiǎn)單的對(duì)這個(gè)序列號(hào)做原子操作。 但是MongoDB的對(duì)單個(gè)document進(jìn)行update操作中有很是具有原子性的, 例如

  • $set
  • $unset
  • $inc
  • $push
  • $pushAll
  • $pull
  • $pullAll

我們可以利用這些原子操作,在數(shù)據(jù)庫(kù)層以樂(lè)觀鎖的形式來(lái)實(shí)現(xiàn)循環(huán)序列字段。為了方便調(diào)用我把這段邏輯做成數(shù)據(jù)庫(kù)中的Javascript函數(shù)。 類似與MySQL中的存儲(chǔ)過(guò)程。

 

首先我們需要一個(gè)collection來(lái)存放序列號(hào),并對(duì)需要的需要的序列號(hào)進(jìn)行初始化。我們叫它c(diǎn)ounters。

Js代碼  收藏代碼
  1. db.counters.save({_id:"SerialNo1", val:0, maxval:99})  

 

然后我們想system.js里添加一個(gè)Javascript函數(shù)

Js代碼  收藏代碼
  1. db.system.js.save({_id:"getNextUniqueSeq",  
  2. value:function (keyName) {  
  3.     var seqObj = db.counters.findOne({_id:keyName});  
  4.     if (seqObj == null) {  
  5.         print("can not find record with key: " + keyName);  
  6.         return -1;  
  7.     }  
  8.       
  9.     // the max value of sequence  
  10.     var maxVal = seqObj.maxval;  
  11.     // the current value of sequence  
  12.     var curVal = seqObj.val;  
  13.       
  14.     while(true){  
  15.         // if curVal reach max, reset it  
  16.         if(curVal >= maxVal){  
  17.             db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, falsefalse);  
  18.             var err = db.getLastErrorObj();  
  19.             if( err && err.code ) {  
  20.                 print( "unexpected error reset data: " + tojson( err ) );  
  21.                 return -2;  
  22.             } else if (err.n == 0){  
  23.                 // fail to reset value, may be reseted by others  
  24.                 print("fail to reset value: ");  
  25.             }   
  26.   
  27.             // get current value again.  
  28.             seqObj = db.counters.findOne({_id:keyName});  
  29.             maxVal = seqObj.maxval;  
  30.             curVal = seqObj.val;  
  31.             continue;  
  32.         }   
  33.           
  34.         // if curVal not reach the max, inc it;  
  35.         // increase   
  36.         db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, falsefalse);  
  37.         var err = db.getLastErrorObj();  
  38.         if( err && err.code ) {  
  39.             print( "unexpected error inc val: " + tojson( err ) );  
  40.                return -3;  
  41.         } else if (err.n == 0){  
  42.             // fail to reset value, may be increased by others  
  43.             print("fail to inc value: ");  
  44.               
  45.             // get current value again.  
  46.             seqObj = db.counters.findOne({_id:keyName});  
  47.             maxVal = seqObj.maxval;  
  48.             curVal = seqObj.val;  
  49.             continue;  
  50.         } else {  
  51.             var retVal = curVal + 1;  
  52.             print("success to get seq : " + retVal);  
  53.             // increase successful  
  54.             return retVal;  
  55.         }  
  56.     }  
  57. }  
  58. });  

上面這段會(huì)把指定的序列號(hào)的val值+1,如果val達(dá)到上限則從0開(kāi)始。所以叫循環(huán)序列。

 

其實(shí)上面的實(shí)現(xiàn)在原理上和Java里的AtomicInteger系列的功能實(shí)現(xiàn)是類似的,利用循環(huán)重試和原子性的CAS來(lái)實(shí)現(xiàn)。這種實(shí)現(xiàn)方式在多線程的環(huán)境里由于鎖(Monitor)的范圍很小,所以并發(fā)性上比排他鎖要好一些。

 

下面我們用Java來(lái)測(cè)試一下這個(gè)函數(shù)的正確性。 即在多線程的情況下會(huì)不會(huì)得到重復(fù)的序列號(hào)。

 

第一個(gè)測(cè)試,val=0, maxval=2000, Java端20個(gè)線程每個(gè)線程循環(huán)調(diào)用100次。 共2000次。 所以正確的情況下,從0到1999應(yīng)該每個(gè)數(shù)字只出現(xiàn)一次。

 

Java代碼  收藏代碼
  1. @Test  
  2. public void testGetNextUniqueSeq1() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", THREAD_COUNT * LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {  
  53.         // every number appear 1 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(1, times);  
  63.     }  
  64. }  

 

然后我們?cè)贉y(cè)試一下循環(huán)的情況。 val=0, maxval=99。 同樣是Java端20個(gè)線程每個(gè)線程循環(huán)調(diào)用100次。 共2000次。這次從0到99的數(shù)字每個(gè)應(yīng)該取得20次。

 

Java代碼  收藏代碼
  1. @Test  
  2. public void testGetNextUniqueSeq2() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT; num++) {  
  53.         // every number appear 20 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(20, times);  
  63.     }  
  64. }  

 

這個(gè)測(cè)試跑了幾次都是正確的。

 

由于沒(méi)有可以進(jìn)行對(duì)比其他的實(shí)現(xiàn)方式(例如排他鎖)所以沒(méi)有做性能測(cè)試。

 

寫(xiě)在最后。 雖然MongoDB支持類似于存儲(chǔ)過(guò)程的Stored Javascript,但是其實(shí)不建議使用這個(gè)來(lái)解決復(fù)雜問(wèn)題。主要原因是沒(méi)法調(diào)試,維護(hù)起來(lái)太不方便。而且在2.4之前MongoDB對(duì)服務(wù)端 Javascript支持并不是很好, 一個(gè)mongod進(jìn)程同時(shí)只能執(zhí)行一段Javascript。如果能在應(yīng)用層解決掉還是在應(yīng)用層里實(shí)現(xiàn)邏輯比較好。