環境是這樣的:服務器是用Java做的, 數據庫是MongoDB

 

需求是這樣的:我們的系統里要生成一個唯一ID,前面的部分有一定的格式,并和時間關聯, 精確到微秒,考慮到同一微秒內有可能存在并發情況, 所以后面在加兩位序列號, 系統需要定義為1毫秒內的并發小于100個,所以后面兩位就夠用了。 Java服務器端有多臺機器都可以用來生成這個唯一ID,所以需要在不同的機器上不能生成相同的序列號,所以需要在某一點上做全局的范圍同步來保存這序列 號的唯一性。 其實如果不考慮需求里的唯一ID是有一定意義的格式的, 用UUID或MongoDB的ObjectId都是更好的選擇,完全不需要在某一點上進行同步,性能會更好。

 

這個可以生成序列號的點, 我們可以做一個序列號生成服務器來對應, 也可以用數據庫來對應。 單單為這個簡單的功能準備一個服務器來做顯然不合適。 但是我們用的MongoDB并沒有類似于MySQL或Oracle中的SELECT FOR UPDATE這樣的鎖機制。 所以沒有辦法簡單的對這個序列號做原子操作。 但是MongoDB的對單個document進行update操作中有很是具有原子性的, 例如

  • $set
  • $unset
  • $inc
  • $push
  • $pushAll
  • $pull
  • $pullAll

我們可以利用這些原子操作,在數據庫層以樂觀鎖的形式來實現循環序列字段。為了方便調用我把這段邏輯做成數據庫中的Javascript函數。 類似與MySQL中的存儲過程。

 

首先我們需要一個collection來存放序列號,并對需要的需要的序列號進行初始化。我們叫它counters。

Js代碼  收藏代碼
  1. db.counters.save({_id:"SerialNo1", val:0, maxval:99})  

 

然后我們想system.js里添加一個Javascript函數

Js代碼  收藏代碼
  1. db.system.js.save({_id:"getNextUniqueSeq",  
  2. value:function (keyName) {  
  3.     var seqObj = db.counters.findOne({_id:keyName});  
  4.     if (seqObj == null) {  
  5.         print("can not find record with key: " + keyName);  
  6.         return -1;  
  7.     }  
  8.       
  9.     // the max value of sequence  
  10.     var maxVal = seqObj.maxval;  
  11.     // the current value of sequence  
  12.     var curVal = seqObj.val;  
  13.       
  14.     while(true){  
  15.         // if curVal reach max, reset it  
  16.         if(curVal >= maxVal){  
  17.             db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, falsefalse);  
  18.             var err = db.getLastErrorObj();  
  19.             if( err && err.code ) {  
  20.                 print( "unexpected error reset data: " + tojson( err ) );  
  21.                 return -2;  
  22.             } else if (err.n == 0){  
  23.                 // fail to reset value, may be reseted by others  
  24.                 print("fail to reset value: ");  
  25.             }   
  26.   
  27.             // get current value again.  
  28.             seqObj = db.counters.findOne({_id:keyName});  
  29.             maxVal = seqObj.maxval;  
  30.             curVal = seqObj.val;  
  31.             continue;  
  32.         }   
  33.           
  34.         // if curVal not reach the max, inc it;  
  35.         // increase   
  36.         db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, falsefalse);  
  37.         var err = db.getLastErrorObj();  
  38.         if( err && err.code ) {  
  39.             print( "unexpected error inc val: " + tojson( err ) );  
  40.                return -3;  
  41.         } else if (err.n == 0){  
  42.             // fail to reset value, may be increased by others  
  43.             print("fail to inc value: ");  
  44.               
  45.             // get current value again.  
  46.             seqObj = db.counters.findOne({_id:keyName});  
  47.             maxVal = seqObj.maxval;  
  48.             curVal = seqObj.val;  
  49.             continue;  
  50.         } else {  
  51.             var retVal = curVal + 1;  
  52.             print("success to get seq : " + retVal);  
  53.             // increase successful  
  54.             return retVal;  
  55.         }  
  56.     }  
  57. }  
  58. });  

上面這段會把指定的序列號的val值+1,如果val達到上限則從0開始。所以叫循環序列。

 

其實上面的實現在原理上和Java里的AtomicInteger系列的功能實現是類似的,利用循環重試和原子性的CAS來實現。這種實現方式在多線程的環境里由于鎖(Monitor)的范圍很小,所以并發性上比排他鎖要好一些。

 

下面我們用Java來測試一下這個函數的正確性。 即在多線程的情況下會不會得到重復的序列號。

 

第一個測試,val=0, maxval=2000, Java端20個線程每個線程循環調用100次。 共2000次。 所以正確的情況下,從0到1999應該每個數字只出現一次。

 

Java代碼  收藏代碼
  1. @Test  
  2. public void testGetNextUniqueSeq1() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", THREAD_COUNT * LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {  
  53.         // every number appear 1 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(1, times);  
  63.     }  
  64. }  

 

然后我們再測試一下循環的情況。 val=0, maxval=99。 同樣是Java端20個線程每個線程循環調用100次。 共2000次。這次從0到99的數字每個應該取得20次。

 

Java代碼  收藏代碼
  1. @Test  
  2. public void testGetNextUniqueSeq2() throws Exception {  
  3.   
  4.     final int THREAD_COUNT = 20;  
  5.     final int LOOP_COUNT = 100;  
  6.   
  7.     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  8.     DB db = mongoClient.getDB("im");  
  9.     db.authenticate("imadmin""imadmin".toCharArray());  
  10.     BasicDBObject q = new BasicDBObject();  
  11.     q.put("_id""UNIQUE_KEY");  
  12.   
  13.     BasicDBObject upd = new BasicDBObject();  
  14.     BasicDBObject set = new BasicDBObject();  
  15.     set.put("val"0);  
  16.     set.put("maxval", LOOP_COUNT);  
  17.     upd.put("$set", set);  
  18.   
  19.     db.getCollection("counters").update(q, upd);  
  20.   
  21.     Thread[] threads = new Thread[THREAD_COUNT];  
  22.     final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];  
  23.     for (int i = 0; i < THREAD_COUNT; i++) {  
  24.         final int temp_i = i;  
  25.         threads[i] = new Thread("" + i) {  
  26.             @Override  
  27.             public void run() {  
  28.                 try {  
  29.                     Mongo mongoClient = new Mongo("172.17.2.100"27017);  
  30.                     DB db = mongoClient.getDB("im");  
  31.                     db.authenticate("imadmin""imadmin".toCharArray());  
  32.                     for (int j = 0; j < LOOP_COUNT; j++) {  
  33.                         Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");  
  34.                         System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());  
  35.                         results[temp_i][j] = ((Double) result).intValue();  
  36.                     }  
  37.                 } catch (UnknownHostException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         };  
  42.     }  
  43.   
  44.     for (Thread thread : threads) {  
  45.         thread.start();  
  46.     }  
  47.   
  48.     for (Thread thread : threads) {  
  49.         thread.join();  
  50.     }  
  51.   
  52.     for (int num = 1; num <= LOOP_COUNT; num++) {  
  53.         // every number appear 20 times only!  
  54.         int times = 0;  
  55.         for (int j = 0; j < THREAD_COUNT; j++) {  
  56.             for (int k = 0; k < LOOP_COUNT; k++) {  
  57.                 if (results[j][k] == num)  
  58.                     times++;  
  59.             }  
  60.         }  
  61.   
  62.         assertEquals(20, times);  
  63.     }  
  64. }  

 

這個測試跑了幾次都是正確的。

 

由于沒有可以進行對比其他的實現方式(例如排他鎖)所以沒有做性能測試。

 

寫在最后。 雖然MongoDB支持類似于存儲過程的Stored Javascript,但是其實不建議使用這個來解決復雜問題。主要原因是沒法調試,維護起來太不方便。而且在2.4之前MongoDB對服務端 Javascript支持并不是很好, 一個mongod進程同時只能執行一段Javascript。如果能在應用層解決掉還是在應用層里實現邏輯比較好。