環(huán)境是這樣的:服務(wù)器是用Java做的, 數(shù)據(jù)庫(kù)是MongoDB
需求是這樣的:我們的系統(tǒng)里要生成一個(gè)唯一ID,前面的部分有一定的格式,并和時(shí)間關(guān)聯(lián), 精確到微秒,考慮到同一微秒內(nèi)有可能存在并發(fā)情況, 所以后面在加兩位序列號(hào), 系統(tǒng)需要定義為1毫秒內(nèi)的并發(fā)小于100個(gè),所以后面兩位就夠用了。 Java服務(wù)器端有多臺(tái)機(jī)器都可以用來(lái)生成這個(gè)唯一ID,所以需要在不同的機(jī)器上不能生成相同的序列號(hào),所以需要在某一點(diǎn)上做全局的范圍同步來(lái)保存這序列 號(hào)的唯一性。 其實(shí)如果不考慮需求里的唯一ID是有一定意義的格式的, 用UUID或MongoDB的ObjectId都是更好的選擇,完全不需要在某一點(diǎn)上進(jìn)行同步,性能會(huì)更好。
這個(gè)可以生成序列號(hào)的點(diǎn), 我們可以做一個(gè)序列號(hào)生成服務(wù)器來(lái)對(duì)應(yīng), 也可以用數(shù)據(jù)庫(kù)來(lái)對(duì)應(yīng)。 單單為這個(gè)簡(jiǎn)單的功能準(zhǔn)備一個(gè)服務(wù)器來(lái)做顯然不合適。 但是我們用的MongoDB并沒(méi)有類似于MySQL或Oracle中的SELECT FOR UPDATE這樣的鎖機(jī)制。 所以沒(méi)有辦法簡(jiǎn)單的對(duì)這個(gè)序列號(hào)做原子操作。 但是MongoDB的對(duì)單個(gè)document進(jìn)行update操作中有很是具有原子性的, 例如
- $set
- $unset
- $inc
- $push
- $pushAll
- $pull
- $pullAll
我們可以利用這些原子操作,在數(shù)據(jù)庫(kù)層以樂(lè)觀鎖的形式來(lái)實(shí)現(xiàn)循環(huán)序列字段。為了方便調(diào)用我把這段邏輯做成數(shù)據(jù)庫(kù)中的Javascript函數(shù)。 類似與MySQL中的存儲(chǔ)過(guò)程。
首先我們需要一個(gè)collection來(lái)存放序列號(hào),并對(duì)需要的需要的序列號(hào)進(jìn)行初始化。我們叫它c(diǎn)ounters。
- db.counters.save({_id:"SerialNo1", val:0, maxval:99})
然后我們想system.js里添加一個(gè)Javascript函數(shù)
- db.system.js.save({_id:"getNextUniqueSeq",
- value:function (keyName) {
- var seqObj = db.counters.findOne({_id:keyName});
- if (seqObj == null) {
- print("can not find record with key: " + keyName);
- return -1;
- }
- // the max value of sequence
- var maxVal = seqObj.maxval;
- // the current value of sequence
- var curVal = seqObj.val;
- while(true){
- // if curVal reach max, reset it
- if(curVal >= maxVal){
- db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error reset data: " + tojson( err ) );
- return -2;
- } else if (err.n == 0){
- // fail to reset value, may be reseted by others
- print("fail to reset value: ");
- }
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- }
- // if curVal not reach the max, inc it;
- // increase
- db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, false, false);
- var err = db.getLastErrorObj();
- if( err && err.code ) {
- print( "unexpected error inc val: " + tojson( err ) );
- return -3;
- } else if (err.n == 0){
- // fail to reset value, may be increased by others
- print("fail to inc value: ");
- // get current value again.
- seqObj = db.counters.findOne({_id:keyName});
- maxVal = seqObj.maxval;
- curVal = seqObj.val;
- continue;
- } else {
- var retVal = curVal + 1;
- print("success to get seq : " + retVal);
- // increase successful
- return retVal;
- }
- }
- }
- });
上面這段會(huì)把指定的序列號(hào)的val值+1,如果val達(dá)到上限則從0開(kāi)始。所以叫循環(huán)序列。
其實(shí)上面的實(shí)現(xiàn)在原理上和Java里的AtomicInteger系列的功能實(shí)現(xiàn)是類似的,利用循環(huán)重試和原子性的CAS來(lái)實(shí)現(xiàn)。這種實(shí)現(xiàn)方式在多線程的環(huán)境里由于鎖(Monitor)的范圍很小,所以并發(fā)性上比排他鎖要好一些。
下面我們用Java來(lái)測(cè)試一下這個(gè)函數(shù)的正確性。 即在多線程的情況下會(huì)不會(huì)得到重復(fù)的序列號(hào)。
第一個(gè)測(cè)試,val=0, maxval=2000, Java端20個(gè)線程每個(gè)線程循環(huán)調(diào)用100次。 共2000次。 所以正確的情況下,從0到1999應(yīng)該每個(gè)數(shù)字只出現(xiàn)一次。
- @Test
- public void testGetNextUniqueSeq1() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", THREAD_COUNT * LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {
- // every number appear 1 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(1, times);
- }
- }
然后我們?cè)贉y(cè)試一下循環(huán)的情況。 val=0, maxval=99。 同樣是Java端20個(gè)線程每個(gè)線程循環(huán)調(diào)用100次。 共2000次。這次從0到99的數(shù)字每個(gè)應(yīng)該取得20次。
- @Test
- public void testGetNextUniqueSeq2() throws Exception {
- final int THREAD_COUNT = 20;
- final int LOOP_COUNT = 100;
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- BasicDBObject q = new BasicDBObject();
- q.put("_id", "UNIQUE_KEY");
- BasicDBObject upd = new BasicDBObject();
- BasicDBObject set = new BasicDBObject();
- set.put("val", 0);
- set.put("maxval", LOOP_COUNT);
- upd.put("$set", set);
- db.getCollection("counters").update(q, upd);
- Thread[] threads = new Thread[THREAD_COUNT];
- final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
- for (int i = 0; i < THREAD_COUNT; i++) {
- final int temp_i = i;
- threads[i] = new Thread("" + i) {
- @Override
- public void run() {
- try {
- Mongo mongoClient = new Mongo("172.17.2.100", 27017);
- DB db = mongoClient.getDB("im");
- db.authenticate("imadmin", "imadmin".toCharArray());
- for (int j = 0; j < LOOP_COUNT; j++) {
- Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
- System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
- results[temp_i][j] = ((Double) result).intValue();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- }
- };
- }
- for (Thread thread : threads) {
- thread.start();
- }
- for (Thread thread : threads) {
- thread.join();
- }
- for (int num = 1; num <= LOOP_COUNT; num++) {
- // every number appear 20 times only!
- int times = 0;
- for (int j = 0; j < THREAD_COUNT; j++) {
- for (int k = 0; k < LOOP_COUNT; k++) {
- if (results[j][k] == num)
- times++;
- }
- }
- assertEquals(20, times);
- }
- }
這個(gè)測(cè)試跑了幾次都是正確的。
由于沒(méi)有可以進(jìn)行對(duì)比其他的實(shí)現(xiàn)方式(例如排他鎖)所以沒(méi)有做性能測(cè)試。
寫(xiě)在最后。 雖然MongoDB支持類似于存儲(chǔ)過(guò)程的Stored Javascript,但是其實(shí)不建議使用這個(gè)來(lái)解決復(fù)雜問(wèn)題。主要原因是沒(méi)法調(diào)試,維護(hù)起來(lái)太不方便。而且在2.4之前MongoDB對(duì)服務(wù)端 Javascript支持并不是很好, 一個(gè)mongod進(jìn)程同時(shí)只能執(zhí)行一段Javascript。如果能在應(yīng)用層解決掉還是在應(yīng)用層里實(shí)現(xiàn)邏輯比較好。
如果采用時(shí)間戳在加上一個(gè)隨機(jī)數(shù)是不是也可以解決這個(gè)問(wèn)題,碰撞的概率微乎其微。
想法二:
ID = 時(shí)間戳+機(jī)器ID(2位數(shù)字,可以在程序啟動(dòng)的時(shí)候設(shè)置好)+本機(jī)自增序列
這樣也不需要每次都使用一個(gè)全局的ID