一. 它要能適應(yīng)不同類(lèi)型的請(qǐng)求:
本節(jié)用 makeString來(lái)說(shuō)明要求有返回值的請(qǐng)求.用displayString來(lái)說(shuō)明不需要返回值的請(qǐng)求.
二. 要能同時(shí)并發(fā)處理多個(gè)請(qǐng)求,并能按一定機(jī)制調(diào)度:
本節(jié)將用一個(gè)隊(duì)列來(lái)存放請(qǐng)求,所以只能按FIFO機(jī)制調(diào)度,你可以改用LinkedList,就可以簡(jiǎn)單實(shí)現(xiàn)一個(gè)優(yōu)先級(jí)(優(yōu)先級(jí)高的addFirst,低的addLast).
三. 有能力將調(diào)用的邊界從線(xiàn)程擴(kuò)展到機(jī)器間(RMI)
四. 分離過(guò)度耦合,如分離調(diào)用句柄(取貨憑證)和真實(shí)數(shù)據(jù)的實(shí)現(xiàn).分離調(diào)用和執(zhí)行的過(guò)程,可以盡快地將調(diào)返回.
現(xiàn)在看具體的實(shí)現(xiàn):
public interface Axman {
Result resultTest(int count,char c);
void noResultTest(String str);
}
這個(gè)接口有兩個(gè)方法要實(shí)現(xiàn),就是有返回值的調(diào)用resultTest和不需要返回值的調(diào)用
noResultTest,
我們把這個(gè)接口用一個(gè)代理類(lèi)來(lái)實(shí)現(xiàn),目的是將方法調(diào)用轉(zhuǎn)化為對(duì)象,這樣就可以將多個(gè)請(qǐng)求(多個(gè)方法調(diào))放到一個(gè)容器中緩存起來(lái),然后統(tǒng)一處理,因?yàn)?
Java不支持方法指針,所以把方法調(diào)用轉(zhuǎn)換為對(duì)象,然后在這個(gè)對(duì)象上統(tǒng)一執(zhí)行它們的方法,不僅可以做到異步處理,而且可以將代表方法調(diào)用的請(qǐng)求對(duì)象序列
化后通過(guò)網(wǎng)絡(luò)傳遞到另一個(gè)機(jī)器上執(zhí)行(RMI).這也是Java回調(diào)機(jī)制最有力的實(shí)現(xiàn).
一個(gè)簡(jiǎn)單的例子.
如果 1: 做A
如果 2: 做B
如果 3: 做C
如果有1000個(gè)情況,你不至于用1000個(gè)case吧?以后再增加呢?
所以如果C/C++程序員,會(huì)這樣實(shí)現(xiàn): (c和c++定義結(jié)構(gòu)不同)
type define struct MyStruct{
int mark;
(*fn) ();
} MyList;
然后你可以聲明這個(gè)結(jié)構(gòu)數(shù)據(jù):
{1,A,
2,B
3,C
}
做一個(gè)循環(huán):
for(i=0;i<length;i++) {
if(數(shù)據(jù)組[i].mark == 傳入的值) (數(shù)據(jù)組[i].*fn)();
}
簡(jiǎn)單說(shuō)c/c++中將要被調(diào)用的涵數(shù)可以被保存起來(lái),然后去訪(fǎng)問(wèn),調(diào)用,而Java中,我們無(wú)法將一個(gè)方法保存,除了直接調(diào)用,所以將要調(diào)用的方法用子類(lèi)來(lái)實(shí)現(xiàn),然后把這些子類(lèi)實(shí)例保存起來(lái),然后在這些子類(lèi)的實(shí)現(xiàn)上調(diào)用方法:
interface My{
void test();
}
class A implements My{
public void test(){
System.out.println(“A”):
}
}
class B implements My{
public void test(){
System.out.println(“B”):
}
}
class C implements My{
public void test(){
System.out.println(“C”):
}
}
class MyStruct {
int mark;
My m;
public MyStruct(int mark,My m){this.mark = amrk;this.m = m}
}
數(shù)組:
{ new MyStruct(1,new A()),new MyStruct(2,new B()),new MyStruct(3,new C())}
for(xxxxxxxxx) if(參數(shù) ==數(shù)組[i].mark) 數(shù)組[i].m.test();
這樣把要調(diào)用的方法轉(zhuǎn)換為對(duì)象的保程不僅僅是可以對(duì)要調(diào)用的方法進(jìn)行調(diào)度,而且可以把對(duì)象序列化后在另一臺(tái)機(jī)器上執(zhí)行,這樣就把調(diào)用邊界從線(xiàn)程擴(kuò)展到了機(jī)器.
回到我們的例子:
class Proxy implements Axman{
private final Scheduler scheduler;
private final Servant servant;
public Proxy(Scheduler scheduler,Servant servant){
this.scheduler = scheduler;
this.servant = servant;
}
public Result resultTest(int count,char c){
FutureResult futrue = new FutureResult();
this.scheduler.invoke(new ResultRequest(servant,futrue,count,c));
return futrue;
}
public void noResultTest(String str){
this.scheduler.invoke(new NoResultRequest(this.servant,str));
}
}
其中scheduler是管理對(duì)調(diào)用的調(diào)度, servant是真正的對(duì)方法的執(zhí)行:
Servant就是去真實(shí)地實(shí)現(xiàn)方法:
class Servant implements Axman{
public Result resultTest(int count,char c){
char[] buf = new char[count];
for(int i = 0;i < count;i++){
buf[i] = c;
try{
Thread.sleep(100);
}catch(Throwable t){}
}
return new RealResult(new String(buf));
}
public void noResultTest(String str){
try{
System.out.println("displayString :" + str);
Thread.sleep(10);
}catch(Throwable t){}
}
}
在scheduler
將方法的調(diào)用(invkoe)和執(zhí)行(execute)進(jìn)行了分離,調(diào)用就是開(kāi)始”注冊(cè)”方法到要執(zhí)行的容器中,這樣就可以立即返回出來(lái).真正執(zhí)行多久就
是execute的事了,就象一個(gè)人點(diǎn)燃爆竹的引信就跑了,至于那個(gè)爆竹什么時(shí)候爆炸就不是他能控制的了.
public class Scheduler extends Thread {
private final ActivationQueue queue;
public Scheduler(ActivationQueue queue){
this.queue = queue;
}
public void invoke(MethodRequest request){
this.queue.putRequest(request);
}
public void run(){
while(true){
//如果隊(duì)列中有請(qǐng)求線(xiàn)程,測(cè)開(kāi)始執(zhí)行請(qǐng)求
MethodRequest request = this.queue.takeRequest();
request.execute();
}
}
}
在scheduler中只用一個(gè)隊(duì)列來(lái)保存代表方法和請(qǐng)求對(duì)象,實(shí)行簡(jiǎn)單的FIFO調(diào)用,你要實(shí)更復(fù)雜的調(diào)度就要在這里重新實(shí)現(xiàn):
class ActivationQueue{
private static final int MAX_METHOD_REQUEST = 100;
private final MethodRequest[] requestQueue;
private int tail;
private int head;
private int count;
public ActivationQueue(){
this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
this.head = this.count = this.tail = 0;
}
public synchronized void putRequest(MethodRequest request){
while(this.count >= this.requestQueue.length){
try {
this.wait();
}
catch (Throwable t) {}
}
this.requestQueue[this.tail] = request;
tail = (tail + 1)%this.requestQueue.length;
count ++ ;
this.notifyAll();
}
public synchronized MethodRequest takeRequest(){
while(this.count <= 0){
try {
this.wait();
}
catch (Throwable t) {}
}
MethodRequest request = this.requestQueue[this.head];
this.head = (this.head + 1) % this.requestQueue.length;
count --;
this.notifyAll();
return request;
}
}
為了將方法調(diào)用轉(zhuǎn)化為對(duì)象,我們通過(guò)實(shí)現(xiàn)MethodRequest對(duì)象的execute方法來(lái)方法具體方法轉(zhuǎn)換成具體對(duì)象:
abstract class MethodRequest{
protected final Servant servant;
protected final FutureResult future;
protected MethodRequest(Servant servant,FutureResult future){
this.servant = servant;
this.future = future;
}
public abstract void execute();
}
class ResultRequest extends MethodRequest{
private final int count;
private final char c;
public ResultRequest(Servant servant,FutureResult future,int count,char c){
super(servant,future);
this.count = count;
this.c = c;
}
public void execute(){
Result result = servant.resultTest(this.count,this.c);
this.future.setResult(result);
}
}
class NoResultRequest extends MethodRequest{
private String str;
public NoResultRequest(Servant servant,String str){
super(servant,null);
this.str = str;
}
public void execute(){
this.servant.noResultTest(str);
}
}
而返回的數(shù)據(jù)我們也將真實(shí)數(shù)據(jù)的獲取和取貨憑證邏輯分離:
package com.axman.jasync;
public abstract class Result {
public abstract Object getResult();
}
class FutureResult extends Result{
private Result result;
private boolean completed;
public synchronized void setResult(Result result){
this.result = result;
this.completed = true;
this.notifyAll();
}
public synchronized Object getResult(){
while(!this.completed){
try{
this.wait();
}catch(Throwable t){}
}
return this.result.getResult();
}
}
class RealResult extends Result{
private final Object result;
public RealResult(Object result){
this.result = result;
}
public Object getResult(){
return this.result;
}
}
OK,現(xiàn)在這個(gè)異步消息處理器已經(jīng)有了模型,這個(gè)異步處理器中有昭雪些對(duì)象參與呢?
Servant 忠心做真實(shí)的事務(wù)
ActivationQueue將請(qǐng)求緩存起來(lái)以便調(diào)度
Scheduler對(duì)容器中的請(qǐng)求根據(jù)一定原則進(jìn)行調(diào)度執(zhí)行
Proxy將特定方法請(qǐng)求轉(zhuǎn)換為特定對(duì)象
所有這些都是這個(gè)異步處理器的核心部件,雖然是核心部件,我們就要進(jìn)行封裝而不能隨便讓調(diào)用者來(lái)修改,所以我們用工廠(chǎng)模式(我KAO,我實(shí)在不想提模式但有時(shí)找不到其它詞來(lái)表述)來(lái)產(chǎn)生處理器Axman對(duì)象:
package com.axman.jasync;
public class AxmanFactory {
public static Axman createAxman() {
Servant s = new Servant();
ActivationQueue queue = new ActivationQueue();
Scheduler st = new Scheduler(queue);
Proxy p = new Proxy(st,s);
st.start();
return p;
}
}
好了,我們現(xiàn)在用兩個(gè)請(qǐng)求的產(chǎn)生者不停產(chǎn)生請(qǐng)求:
ResultInvokeThreadv 發(fā)送有返回值的請(qǐng)求:
package com.axman.jasync;
public class ResultInvokeThread extends Thread{
private final Axman ao;
private final char c;
public ResultInvokeThread(String name,Axman ao){
this.ao = ao;
this.c = name.charAt(0);
}
public void run(){
try{
int i = 0;
while(true){
Result result = this.ao.resultTest(i++,c);
Thread.sleep(10);
String = (String)result.getResult();
System.out.println(Thread.currentThread().getName() + " = " + );
}
}
catch(Throwable t){}
}
}
NoResultInvokeThread發(fā)送無(wú)返回值的請(qǐng)求:
package com.axman.jasync;
public class NoResultInvokeThread extends Thread{
private final Axman ao;
public NoResultInvokeThread(String name,Axman ao){
super(name);
this.ao = ao;
}
public void run(){
try{
int i = 0;
while(true){
String s = Thread.currentThread().getName() + i++;
ao.noResultTest(s);
Thread.sleep(20);
}
}
catch(Throwable t){}
}
}
對(duì)了,我們還需要一個(gè)什么東西來(lái)產(chǎn)生一個(gè)演示:
package com.axman.jasync;
public class Program {
public static void main(String[] args) {
Axman ao = AxmanFactory.createAxman();
new ResultInvokeThread("Axman",ao).start();
new ResultInvokeThread("Sager",ao).start();
new NoResultInvokeThread("Macke",ao).start();
}
}
看看結(jié)果吧.你可以把不同類(lèi)型的請(qǐng)求不斷地向處理器發(fā)送,處理器會(huì)不斷地接收請(qǐng)求,放到隊(duì)列中,并同時(shí)不斷從隊(duì)列中提出請(qǐng)求進(jìn)行處理.