環境是這樣的:服務器是用Java做的, 數據庫是MongoDB
需求是這樣的:我們的系統里要生成一個唯一ID,前面的部分有一定的格式,并和時間關聯, 精確到微秒,考慮到同一微秒內有可能存在并發情況, 所以后面在加兩位序列號, 系統需要定義為1毫秒內的并發小于100個,所以后面兩位就夠用了。 Java服務器端有多臺機器都可以用來生成這個唯一ID,所以需要在不同的機器上不能生成相同的序列號,所以需要在某一點上做全局的范圍同步來保存這序列 號的唯一性。 其實如果不考慮需求里的唯一ID是有一定意義的格式的, 用UUID或MongoDB的ObjectId都是更好的選擇,完全不需要在某一點上進行同步,性能會更好。
這個可以生成序列號的點, 我們可以做一個序列號生成服務器來對應, 也可以用數據庫來對應。 單單為這個簡單的功能準備一個服務器來做顯然不合適。 但是我們用的MongoDB并沒有類似于MySQL或Oracle中的SELECT FOR UPDATE這樣的鎖機制。 所以沒有辦法簡單的對這個序列號做原子操作。 但是MongoDB的對單個document進行update操作中有很是具有原子性的, 例如
- $set
- $unset
- $inc
- $push
- $pushAll
- $pull
- $pullAll
我們可以利用這些原子操作,在數據庫層以樂觀鎖的形式來實現循環序列字段。為了方便調用我把這段邏輯做成數據庫中的Javascript函數。 類似與MySQL中的存儲過程。
首先我們需要一個collection來存放序列號,并對需要的需要的序列號進行初始化。我們叫它counters。
- db.counters.save({_id:"SerialNo1", val:0, maxval:99})
然后我們想system.js里添加一個Javascript函數
- db.system.js.save({_id:"getNextUniqueSeq",
- value:function (keyName) {
- var seqObj = db.counters.findOne({_id:keyName});
- if (seqObj == null) {
- print("can not find record with key: " + keyName);
- return -1;
- }
- // the max value of sequence
- var maxVal = seqObj.maxval;
- // the current value of sequence
- var curVal = seqObj.val;
- while(true){
- // if curVal reach max, reset it
- if(curVal >= maxVal){
- db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error reset data: " + tojson( err ) );
- return -2;
- } else if (err.n == 0){
- // fail to reset value, may be reseted by others
- print("fail to reset value: ");
- }
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- }
- // if curVal not reach the max, inc it;
- // increase
- db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error inc val: " + tojson( err ) );
- return -3;
- } else if (err.n == 0){
- // fail to reset value, may be increased by others
- print("fail to inc value: ");
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- } else {
- var retVal = curVal + 1;
- print("success to get seq : " + retVal);
- // increase successful
- return retVal;
- }
- }
- }
- });
上面這段會把指定的序列號的val值+1,如果val達到上限則從0開始。所以叫循環序列。
其實上面的實現在原理上和Java里的AtomicInteger系列的功能實現是類似的,利用循環重試和原子性的CAS來實現。這種實現方式在多線程的環境里由于鎖(Monitor)的范圍很小,所以并發性上比排他鎖要好一些。
下面我們用Java來測試一下這個函數的正確性。 即在多線程的情況下會不會得到重復的序列號。
第一個測試,val=0, maxval=2000, Java端20個線程每個線程循環調用100次。 共2000次。 所以正確的情況下,從0到1999應該每個數字只出現一次。
- @Test
- public void testGetNextUniqueSeq1() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", THREAD_COUNT * LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {
- // every number appear 1 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(1, times);
- }
- }
然后我們再測試一下循環的情況。 val=0, maxval=99。 同樣是Java端20個線程每個線程循環調用100次。 共2000次。這次從0到99的數字每個應該取得20次。
- @Test
- public void testGetNextUniqueSeq2() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT; num++) {
- // every number appear 20 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(20, times);
- }
- }
這個測試跑了幾次都是正確的。
由于沒有可以進行對比其他的實現方式(例如排他鎖)所以沒有做性能測試。
寫在最后。 雖然MongoDB支持類似于存儲過程的Stored Javascript,但是其實不建議使用這個來解決復雜問題。主要原因是沒法調試,維護起來太不方便。而且在2.4之前MongoDB對服務端 Javascript支持并不是很好, 一個mongod進程同時只能執行一段Javascript。如果能在應用層解決掉還是在應用層里實現邏輯比較好。
如果采用時間戳在加上一個隨機數是不是也可以解決這個問題,碰撞的概率微乎其微。
想法二:
ID = 時間戳+機器ID(2位數字,可以在程序啟動的時候設置好)+本機自增序列
這樣也不需要每次都使用一個全局的ID