posts - 56,  comments - 12,  trackbacks - 0
          一.    它要能適應(yīng)不同類(lèi)型的請(qǐng)求:
          本節(jié)用 makeString來(lái)說(shuō)明要求有返回值的請(qǐng)求.用displayString來(lái)說(shuō)明不需要返回值的請(qǐng)求.
          二.    要能同時(shí)并發(fā)處理多個(gè)請(qǐng)求,并能按一定機(jī)制調(diào)度:
          本節(jié)將用一個(gè)隊(duì)列來(lái)存放請(qǐng)求,所以只能按FIFO機(jī)制調(diào)度,你可以改用LinkedList,就可以簡(jiǎn)單實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)(優(yōu)先級(jí)高的addFirst,低的addLast).
          三.    有能力將調(diào)用的邊界從線(xiàn)程擴(kuò)展到機(jī)器間(RMI)
          四.    分離過(guò)度耦合,如分離調(diào)用句柄(取貨憑證)和真實(shí)數(shù)據(jù)的實(shí)現(xiàn).分離調(diào)用和執(zhí)行的過(guò)程,可以盡快地將調(diào)返回.

          現(xiàn)在看具體的實(shí)現(xiàn):
          public interface Axman {
            Result resultTest(int count,char c);
            void noResultTest(String str);
          }
          這個(gè)接口有兩個(gè)方法要實(shí)現(xiàn),就是有返回值的調(diào)用resultTest和不需要返回值的調(diào)用
          noResultTest, 我們把這個(gè)接口用一個(gè)代理類(lèi)來(lái)實(shí)現(xiàn),目的是將方法調(diào)用轉(zhuǎn)化為對(duì)象,這樣就可以將多個(gè)請(qǐng)求(多個(gè)方法調(diào))放到一個(gè)容器中緩存起來(lái),然后統(tǒng)一處理,因?yàn)? Java不支持方法指針,所以把方法調(diào)用轉(zhuǎn)換為對(duì)象,然后在這個(gè)對(duì)象上統(tǒng)一執(zhí)行它們的方法,不僅可以做到異步處理,而且可以將代表方法調(diào)用的請(qǐng)求對(duì)象序列 化后通過(guò)網(wǎng)絡(luò)傳遞到另一個(gè)機(jī)器上執(zhí)行(RMI).這也是Java回調(diào)機(jī)制最有力的實(shí)現(xiàn).
              一個(gè)簡(jiǎn)單的例子.
              如果 1: 做A
              如果 2: 做B
          如果 3: 做C
          如果有1000個(gè)情況,你不至于用1000個(gè)case吧?以后再增加呢?
          所以如果C/C++程序員,會(huì)這樣實(shí)現(xiàn): (c和c++定義結(jié)構(gòu)不同)

          type define struct MyStruct{
          int mark;
          (*fn) ();
          } MyList;
              
              然后你可以聲明這個(gè)結(jié)構(gòu)數(shù)據(jù):
              {1,A,
               2,B
               3,C
          }
          做一個(gè)循環(huán):
          for(i=0;i<length;i++) {
              if(數(shù)據(jù)組[i].mark == 傳入的值) (數(shù)據(jù)組[i].*fn)();
          }
          簡(jiǎn)單說(shuō)c/c++中將要被調(diào)用的涵數(shù)可以被保存起來(lái),然后去訪(fǎng)問(wèn),調(diào)用,而Java中,我們無(wú)法將一個(gè)方法保存,除了直接調(diào)用,所以將要調(diào)用的方法用子類(lèi)來(lái)實(shí)現(xiàn),然后把這些子類(lèi)實(shí)例保存起來(lái),然后在這些子類(lèi)的實(shí)現(xiàn)上調(diào)用方法:
          interface My{
              void test();
          }

          class A implements My{
              public void test(){
                  System.out.println(“A”):
          }
          }
          class B implements My{
              public void test(){
                  System.out.println(“B”):
          }
          }

          class C implements My{
              public void test(){
                  System.out.println(“C”):
          }
          }

          class MyStruct {
              
              int mark;
              My m;
              public MyStruct(int mark,My m){this.mark = amrk;this.m = m}
          }
          數(shù)組:
          { new MyStruct(1,new A()),new MyStruct(2,new B()),new MyStruct(3,new C())}
          for(xxxxxxxxx) if(參數(shù) ==數(shù)組[i].mark) 數(shù)組[i].m.test();

          這樣把要調(diào)用的方法轉(zhuǎn)換為對(duì)象的保程不僅僅是可以對(duì)要調(diào)用的方法進(jìn)行調(diào)度,而且可以把對(duì)象序列化后在另一臺(tái)機(jī)器上執(zhí)行,這樣就把調(diào)用邊界從線(xiàn)程擴(kuò)展到了機(jī)器.

          回到我們的例子:
          class Proxy implements Axman{
            private final Scheduler scheduler;
            private final Servant servant;

            public Proxy(Scheduler scheduler,Servant servant){
              this.scheduler = scheduler;
              this.servant = servant;
            }
            public Result resultTest(int count,char c){
              FutureResult futrue = new FutureResult();
              this.scheduler.invoke(new ResultRequest(servant,futrue,count,c));
              return futrue;
            }

            public void noResultTest(String str){
              this.scheduler.invoke(new NoResultRequest(this.servant,str));
            }
          }

          其中scheduler是管理對(duì)調(diào)用的調(diào)度, servant是真正的對(duì)方法的執(zhí)行:

          Servant就是去真實(shí)地實(shí)現(xiàn)方法:

          class Servant implements Axman{
            public Result resultTest(int count,char c){
              char[] buf = new char[count];
              for(int i = 0;i < count;i++){
                buf[i] = c;
                try{
                  Thread.sleep(100);
                }catch(Throwable t){}
              }
              return new RealResult(new String(buf));
            }

            public void noResultTest(String str){
              try{
                System.out.println("displayString :" + str);
                Thread.sleep(10);
              }catch(Throwable t){}
            }
          }
          在scheduler 將方法的調(diào)用(invkoe)和執(zhí)行(execute)進(jìn)行了分離,調(diào)用就是開(kāi)始”注冊(cè)”方法到要執(zhí)行的容器中,這樣就可以立即返回出來(lái).真正執(zhí)行多久就 是execute的事了,就象一個(gè)人點(diǎn)燃爆竹的引信就跑了,至于那個(gè)爆竹什么時(shí)候爆炸就不是他能控制的了.
          public class Scheduler extends Thread {
            private final ActivationQueue queue;
            public Scheduler(ActivationQueue queue){
              this.queue = queue;
            }

            public void invoke(MethodRequest request){
              this.queue.putRequest(request);
            }

            public void run(){
              while(true){

                //如果隊(duì)列中有請(qǐng)求線(xiàn)程,測(cè)開(kāi)始執(zhí)行請(qǐng)求
                MethodRequest request = this.queue.takeRequest();
                request.execute();
              }
            }
          }
          在scheduler中只用一個(gè)隊(duì)列來(lái)保存代表方法和請(qǐng)求對(duì)象,實(shí)行簡(jiǎn)單的FIFO調(diào)用,你要實(shí)更復(fù)雜的調(diào)度就要在這里重新實(shí)現(xiàn):
          class ActivationQueue{
            private static final int MAX_METHOD_REQUEST = 100;
            private final MethodRequest[] requestQueue;
            private int tail;
            private int head;
            private int count;

            public ActivationQueue(){
              this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
              this.head = this.count = this.tail = 0;
            }

            public synchronized void putRequest(MethodRequest request){
              while(this.count >= this.requestQueue.length){
                try {
                  this.wait();
                }
                catch (Throwable t) {}
              }
              this.requestQueue[this.tail] = request;
              tail = (tail + 1)%this.requestQueue.length;
              count ++ ;
              this.notifyAll();

            }


            public synchronized MethodRequest takeRequest(){
              while(this.count <= 0){
                try {
                  this.wait();
                }
                catch (Throwable t) {}
           
              }

              MethodRequest request = this.requestQueue[this.head];
              this.head = (this.head + 1) % this.requestQueue.length;
              count --;
              this.notifyAll();
              return request;
            }
          }

          為了將方法調(diào)用轉(zhuǎn)化為對(duì)象,我們通過(guò)實(shí)現(xiàn)MethodRequest對(duì)象的execute方法來(lái)方法具體方法轉(zhuǎn)換成具體對(duì)象:
          abstract class MethodRequest{
            protected final Servant servant;
            protected final FutureResult future;

            protected MethodRequest(Servant servant,FutureResult future){
              this.servant = servant;
              this.future = future;
            }

            public abstract void execute();
          }

          class ResultRequest extends MethodRequest{
            private final int count;
            private final char c;
            public ResultRequest(Servant servant,FutureResult future,int count,char c){
              super(servant,future);
              this.count = count;
              this.c = c;
            }
            public void execute(){
              Result result = servant.resultTest(this.count,this.c);
              this.future.setResult(result);
            }
          }

          class NoResultRequest extends MethodRequest{
            private String str;
            public NoResultRequest(Servant servant,String str){
              super(servant,null);
              this.str = str;
            }

            public void execute(){
              this.servant.noResultTest(str);
            }
          }

          而返回的數(shù)據(jù)我們也將真實(shí)數(shù)據(jù)的獲取和取貨憑證邏輯分離:
          package com.axman.jasync;

          public abstract class Result {
            public abstract Object getResult();
          }

          class FutureResult extends Result{
            private Result result;
            private boolean completed;

            public synchronized void setResult(Result result){
              this.result = result;
              this.completed = true;
              this.notifyAll();
            }

            public synchronized Object getResult(){
              while(!this.completed){
                try{
                  this.wait();
                }catch(Throwable t){}
              }
              return this.result.getResult();
            }
          }

          class RealResult extends Result{
            private final Object result;

            public RealResult(Object result){
              this.result = result;
            }
            public Object getResult(){
              return this.result;
            }
          }
          OK,現(xiàn)在這個(gè)異步消息處理器已經(jīng)有了模型,這個(gè)異步處理器中有昭雪些對(duì)象參與呢?
              Servant 忠心做真實(shí)的事務(wù)
              ActivationQueue將請(qǐng)求緩存起來(lái)以便調(diào)度
              Scheduler對(duì)容器中的請(qǐng)求根據(jù)一定原則進(jìn)行調(diào)度執(zhí)行
              Proxy將特定方法請(qǐng)求轉(zhuǎn)換為特定對(duì)象
          所有這些都是這個(gè)異步處理器的核心部件,雖然是核心部件,我們就要進(jìn)行封裝而不能隨便讓調(diào)用者來(lái)修改,所以我們用工廠(chǎng)模式(我KAO,我實(shí)在不想提模式但有時(shí)找不到其它詞來(lái)表述)來(lái)產(chǎn)生處理器Axman對(duì)象:
          package com.axman.jasync;

          public class AxmanFactory {
            public static Axman createAxman() {
              Servant s = new Servant();
              ActivationQueue queue = new ActivationQueue();
              Scheduler st = new Scheduler(queue);
              Proxy p = new Proxy(st,s);
              st.start();
              return p;
            }
          }
          好了,我們現(xiàn)在用兩個(gè)請(qǐng)求的產(chǎn)生者不停產(chǎn)生請(qǐng)求:
          ResultInvokeThreadv 發(fā)送有返回值的請(qǐng)求:
          package com.axman.jasync;

          public class ResultInvokeThread extends Thread{
            private final Axman ao;
            private final char c;
            public ResultInvokeThread(String name,Axman ao){
              this.ao = ao;
              this.c = name.charAt(0);
            }

            public void run(){
              try{
                int i = 0;
                while(true){
                  Result result  = this.ao.resultTest(i++,c);
                  Thread.sleep(10);
                  String  = (String)result.getResult();
                  System.out.println(Thread.currentThread().getName() + "  = " + );
                }
              }
              catch(Throwable t){}
            }
          }

          NoResultInvokeThread發(fā)送無(wú)返回值的請(qǐng)求:
          package com.axman.jasync;

          public class NoResultInvokeThread extends Thread{
            private final Axman ao;
            public NoResultInvokeThread(String name,Axman ao){
              super(name);
              this.ao = ao;
            }

            public void run(){
              try{
                int i = 0;
                while(true){
                  String s = Thread.currentThread().getName() + i++;
                  ao.noResultTest(s);
                  Thread.sleep(20);
                }
              }
              catch(Throwable t){}
            }
          }

          對(duì)了,我們還需要一個(gè)什么東西來(lái)產(chǎn)生一個(gè)演示:
          package com.axman.jasync;

          public class Program {
            public static void main(String[] args) {
              Axman ao = AxmanFactory.createAxman();
              new ResultInvokeThread("Axman",ao).start();
              new ResultInvokeThread("Sager",ao).start();
              new NoResultInvokeThread("Macke",ao).start();
            }
          }
          看看結(jié)果吧.你可以把不同類(lèi)型的請(qǐng)求不斷地向處理器發(fā)送,處理器會(huì)不斷地接收請(qǐng)求,放到隊(duì)列中,并同時(shí)不斷從隊(duì)列中提出請(qǐng)求進(jìn)行處理.
          posted on 2007-01-19 00:10 苦笑枯 閱讀(5731) 評(píng)論(2)  編輯  收藏 所屬分類(lèi): Java

          FeedBack:
          # re: Java 異步消息處理
          2008-09-24 22:12 | ozhibin
          請(qǐng)問(wèn)用到了哪些包?FutureResult 同Result等我這都找不到引用包  回復(fù)  更多評(píng)論
            
          # re: Java 異步消息處理[未登錄](méi)
          2012-01-10 09:59 | 樂(lè)樂(lè)
          @ozhibin
          都是自定義的,仔細(xì)找一下,文章中都寫(xiě)出來(lái)了  回復(fù)  更多評(píng)論
            
          收藏來(lái)自互聯(lián)網(wǎng),僅供學(xué)習(xí)。若有侵權(quán),請(qǐng)與我聯(lián)系!

          <2007年1月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿(2)

          隨筆分類(lèi)(56)

          隨筆檔案(56)

          搜索

          •  

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 桂东县| 湟中县| 高碑店市| 盐亭县| 旅游| 赤壁市| 皮山县| 榆树市| 柞水县| 吴旗县| 香河县| 满城县| 奈曼旗| 西贡区| 永年县| 淳化县| 正宁县| 梁山县| 淮安市| 邳州市| 隆化县| 太白县| 惠安县| 东丽区| 绵竹市| 右玉县| 越西县| 扎赉特旗| 同仁县| 扶余县| 玛沁县| 东辽县| 衡东县| 会宁县| 黄浦区| 峡江县| 资溪县| 凤台县| 山东省| 玉环县| 霍林郭勒市|