JDK1.4提供的無(wú)阻塞I/O(NIO)有效解決了多線程服務(wù)器存在的線程開(kāi)銷問(wèn)題,但在使用上略顯得復(fù)雜一些。許多基于 NIO的多線程服務(wù)器程序往往直接基于選擇器(Selector)的Reactor模式實(shí)現(xiàn)。這種簡(jiǎn)單的事件機(jī)制對(duì)于較復(fù)雜的服務(wù)器應(yīng)用,顯然缺乏擴(kuò)展性 和可維護(hù)性,而且缺乏直觀清晰的結(jié)構(gòu)層次。本文將通過(guò)一個(gè)基于事件回調(diào)的NIO多線程服務(wù)器的設(shè)計(jì),試圖提供一個(gè)簡(jiǎn)潔、直觀、易于擴(kuò)展的NIO多線程服務(wù) 器模型。
JDK1.4的NIO有效解決了原有流式IO存在的線程開(kāi)銷的問(wèn)題,在NIO中使用多線程,主要目的已不是為了應(yīng)對(duì)每個(gè)客戶端請(qǐng)求而分配獨(dú)立的服務(wù)線程,而是通過(guò)多線程充分使用用多個(gè)CPU的處理能力和處理中的等待時(shí)間,達(dá)到提高服務(wù)能力的目的。
多線程的引入,容易為本來(lái)就略顯復(fù)雜的NIO代碼進(jìn)一步降低可讀性和可維護(hù)性。引入良好的設(shè)計(jì)模型,將不僅帶來(lái)高性能、高可靠的代碼,也將帶來(lái)一個(gè)愜意的開(kāi)發(fā)過(guò)程。
線程模型
NIO
的選擇器采用了多路復(fù)用(Multiplexing)技術(shù),可在一個(gè)選擇器上處理多個(gè)套接字,通過(guò)獲取讀寫通道來(lái)進(jìn)行IO操作。由于網(wǎng)絡(luò)帶寬等原因,在通
道的讀、寫操作中是容易出現(xiàn)等待的,所以在讀、寫操作中引入多線程,對(duì)性能提高明顯,而且可以提高客戶端的感知服務(wù)質(zhì)量。所以本文的模型將主要通過(guò)使用
讀、寫線程池來(lái)提高與客戶端的數(shù)據(jù)交換能力。
如下圖所示,服務(wù)端接受客戶端請(qǐng)求后,控制線程將該請(qǐng)求的讀通道交給讀線程池,由讀線程池分配 線程完成對(duì)客戶端數(shù)據(jù)的讀取操作;當(dāng)讀線程完成讀操作后,將數(shù)據(jù)返回控制線程,進(jìn)行服務(wù)端的業(yè)務(wù)處理;完成業(yè)務(wù)處理后,將需回應(yīng)給客戶端的數(shù)據(jù)和寫通道提 交給寫線程池,由寫線程完成向客戶端發(fā)送回應(yīng)數(shù)據(jù)的操作。
同時(shí)整個(gè)服務(wù)端的流程處理,建立于事件機(jī)制上。在 [接受連接->讀->業(yè)務(wù)處理->寫 ->關(guān)閉連接 ]這個(gè)過(guò)程中,觸發(fā)器將觸發(fā)相應(yīng)事件,由事件處理器對(duì)相應(yīng)事件分別響應(yīng),完成服務(wù)器端的業(yè)務(wù)處理。
下面我們就來(lái)詳細(xì)看一下這個(gè)模型的各個(gè)組成部分。
相關(guān)事件定義 在這個(gè)模型中,我們定義了一些基本的事件:
(1)
onAccept:當(dāng)服務(wù)端收到客戶端連接請(qǐng)求時(shí),觸發(fā)該事件。通過(guò)該事件我們可以知道有新的客戶端呼入。該事件可用來(lái)控制服務(wù)端的負(fù)載。例如,服務(wù)器可
設(shè)定同時(shí)只為一定數(shù)量客戶端提供服務(wù),當(dāng)同時(shí)請(qǐng)求數(shù)超出數(shù)量時(shí),可在響應(yīng)該事件時(shí)直接拋出異常,以拒絕新的連接。
(2)onAccepted:當(dāng)客戶端請(qǐng)求被服務(wù)器接受后觸發(fā)該事件。該事件表明一個(gè)新的客戶端與服務(wù)器正式建立連接。
(3)
onRead:當(dāng)客戶端發(fā)來(lái)數(shù)據(jù),并已被服務(wù)器控制線程正確讀取時(shí),觸發(fā)該事件。該事件通知各事件處理器可以對(duì)客戶端發(fā)來(lái)的數(shù)據(jù)進(jìn)行實(shí)際處理了。需要注意
的是,在本模型中,客戶端的數(shù)據(jù)讀取是由控制線程交由讀線程完成的,事件處理器不需要在該事件中進(jìn)行專門的讀操作,而只需將控制線程傳來(lái)的數(shù)據(jù)進(jìn)行直接處
理即可。
(4)onWrite:當(dāng)客戶端可以開(kāi)始接受服務(wù)端發(fā)送數(shù)據(jù)時(shí)觸發(fā)該事件,通過(guò)該事件,我們可以向客戶端發(fā)送回應(yīng)數(shù)據(jù)。在本模型中,事件處理器只需要在該事件中設(shè)置
(5)onClosed:當(dāng)客戶端與服務(wù)器斷開(kāi)連接時(shí)觸發(fā)該事件。
(6)onError:當(dāng)客戶端與服務(wù)器從連接開(kāi)始到最后斷開(kāi)連接期間發(fā)生錯(cuò)誤時(shí)觸發(fā)該事件。通過(guò)該事件我們可以知道有什么錯(cuò)誤發(fā)生。
事件回調(diào)機(jī)制的實(shí)現(xiàn)
在這個(gè)模型中,事件采用廣播方式,也就是所有在冊(cè)的事件處理器都能獲得事件通知。這樣可以將不同性質(zhì)的業(yè)務(wù)處理,分別用不同的處理器實(shí)現(xiàn),使每個(gè)處理器的業(yè)務(wù)功能盡可能單一。
如下圖:整個(gè)事件模型由監(jiān)聽(tīng)器、事件適配器、事件觸發(fā)器、事件處理器組成。
- 監(jiān)聽(tīng)器(Serverlistener):這是一個(gè)事件接口,定義需監(jiān)聽(tīng)的服務(wù)器事件,如果您需要定義更多的事件,可在這里進(jìn)行擴(kuò)展。
public interface Serverlistener {
public void onError(String error);
public void onAccept() throws Exception;
public void onAccepted(Request request) throws Exception;
public void onRead(Request request) throws Exception;
public void onWrite(Request request, Response response) throws Exception;
public void onClosed(Request request) throws Exception;
} - 事件適配器(EventAdapter):對(duì)Serverlistener接口實(shí)現(xiàn)一個(gè)適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關(guān)心的事件。
public abstract class EventAdapter implements Serverlistener {
public EventAdapter() {
}
public void onError(String error) {}
public void onAccept() throws Exception {}
public void onAccepted(Request request) throws Exception {}
public void onRead(Request request) throws Exception {}
public void onWrite(Request request, Response response) throws Exception {}
public void onClosed(Request request) throws Exception {}
} - 事件觸發(fā)器(Notifier):用于在適當(dāng)?shù)臅r(shí)候通過(guò)觸發(fā)服務(wù)器事件,通知在冊(cè)的事件處理器對(duì)事件做出響應(yīng)。觸發(fā)器以Singleton模式實(shí)現(xiàn),統(tǒng)一控制整個(gè)服務(wù)器端的事件,避免造成混亂。
public class Notifier {
private static Arraylist listeners = null;
private static Notifier instance = null;
private Notifier() {
listeners = new Arraylist();
}
/**
* 獲取事件觸發(fā)器
* @return 返回事件觸發(fā)器
*/
public static synchronized Notifier getNotifier() {
if (instance == null) {
instance = new Notifier();
return instance;
}
else return instance;
}
/**
* 添加事件監(jiān)聽(tīng)器
* @param l 監(jiān)聽(tīng)器
*/
public void addlistener(Serverlistener l) {
synchronized (listeners) {
if (!listeners.contains(l))
listeners.add(l);
}
}
public void fireOnAccept() throws Exception {
for (int i = listeners.size() - 1; i >= 0; i--)
( (Serverlistener) listeners.get(i)).onAccept();
}
....// other fire method
} - 事件處理器(Handler):繼承事件適配器,對(duì)感興趣的事件進(jìn)行響應(yīng)處理,實(shí)現(xiàn)業(yè)務(wù)處理。以下是一個(gè)簡(jiǎn)單的事件處理器實(shí)現(xiàn),它響應(yīng)onRead事件,在終端打印出從客戶端讀取的數(shù)據(jù)。
public class ServerHandler extends EventAdapter {
public ServerHandler() {
}
public void onRead(Request request) throws Exception {
System.out.println("Received: " + new String(data));
}
} - 事件處理器的注冊(cè)。為了能讓事件處理器獲得服務(wù)線程的事件通知,事件處理器需在觸發(fā)器中注冊(cè)。
ServerHandler handler = new ServerHandler();
Notifier.addlistener(handler);
實(shí)現(xiàn)NIO多線程服務(wù)器
NIO多線程服務(wù)器主要由主控服務(wù)線程、讀線程和寫線程組成。
- 主控服務(wù)線程(Server):主控線程將創(chuàng)建讀、寫線程池,實(shí)現(xiàn)監(jiān)聽(tīng)、接受客戶端請(qǐng)求,同時(shí)將讀、寫通道提交由相應(yīng)的讀線程(Reader)和寫服務(wù)線程(Writer),由讀寫線程分別完成對(duì)客戶端數(shù)據(jù)的讀取和對(duì)客戶端的回應(yīng)操作。
public class Server implements Runnable {
....
private static int MAX_THREADS = 4;
public Server(int port) throws Exception {
....
// 創(chuàng)建無(wú)阻塞網(wǎng)絡(luò)套接
selector = Selector.open();
sschannel = ServerSocketChannel.open();
sschannel.configureBlocking(false);
address = new InetSocketAddress(port);
ServerSocket ss = sschannel.socket();
ss.bind(address);
sschannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() {
System.out.println("Server started ...");
System.out.println("Server listening on port: " + port);
// 監(jiān)聽(tīng)
while (true) {
try {
int num = 0;
num = selector.select();
if (num > 0) {
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
it.remove();
// 處理IO事件
if ( (key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
notifier.fireOnAccept();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 觸發(fā)接受連接事件
Request request = new Request(sc);
notifier.fireOnAccepted(request);
// 注冊(cè)讀操作,以進(jìn)行下一步的讀操作
sc.register(selector, SelectionKey.OP_READ, request);
}
else if ( (key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ ) {
Reader.processRequest(key); // 提交讀服務(wù)線程讀取客戶端數(shù)據(jù)
key.cancel();
}
else if ( (key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE ) {
Writer.processRequest(key); // 提交寫服務(wù)線程向客戶端發(fā)送回應(yīng)數(shù)據(jù)
key.cancel();
}
}
}
else {
addRegister(); // 在Selector中注冊(cè)新的寫通道
}
}
catch (Exception e) {
notifier.fireOnError("Error occured in Server: " + e.getMessage());
continue;
}
}
}
....
} - 讀線程(Reader):使用線程池技術(shù),通過(guò)多個(gè)線程讀取客戶端數(shù)據(jù),以充分利用網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)臅r(shí)間,提高讀取效率。
public class Reader extends Thread {
public void run() {
while (true) {
try {
SelectionKey key;
synchronized (pool) {
while (pool.isEmpty()) {
pool.wait();
}
key = (SelectionKey) pool.remove(0);
}
// 讀取客戶端數(shù)據(jù),并觸發(fā)onRead事件
read(key);
}
catch (Exception e) {
continue;
}
}
}
....
} - 寫線程(Writer):和讀操作一樣,使用線程池,負(fù)責(zé)將服務(wù)器端的數(shù)據(jù)發(fā)送回客戶端。
public final class Writer extends Thread {
public void run() {
while (true) {
try {
SelectionKey key;
synchronized (pool) {
while (pool.isEmpty()) {
pool.wait();
}
key = (SelectionKey) pool.remove(0);
}
// 向客戶端發(fā)送數(shù)據(jù),然后關(guān)閉連接,并分別觸發(fā)onWrite,onClosed事件
write(key);
}
catch (Exception e) {
continue;
}
}
}
....
}
具體應(yīng)用
NIO多線程模型的實(shí)現(xiàn)告一段落,現(xiàn)在我們可以暫且將NIO的各個(gè)API和煩瑣的調(diào)用方法拋于腦后,專心于我們的實(shí)際應(yīng)用中。
我們用一個(gè)簡(jiǎn)單的TimeServer(時(shí)間查詢服務(wù)器)來(lái)看看該模型能帶來(lái)多么簡(jiǎn)潔的開(kāi)發(fā)方式。
在
這個(gè)TimeServer中,將提供兩種語(yǔ)言(中文、英文)的時(shí)間查詢服務(wù)。我們將讀取客戶端的查詢命令(GB/EN),并回應(yīng)相應(yīng)語(yǔ)言格式的當(dāng)前時(shí)間。
在應(yīng)答客戶的請(qǐng)求的同時(shí),服務(wù)器將進(jìn)行日志記錄。做為示例,對(duì)日志記錄,我們只是簡(jiǎn)單地將客戶端的訪問(wèn)時(shí)間和IP地址輸出到服務(wù)器的終端上。
- 實(shí)現(xiàn)時(shí)間查詢服務(wù)的事件處理器(TimeHandler):
public class TimeHandler extends EventAdapter {
public TimeHandler() {
}
public void onWrite(Request request, Response response) throws Exception {
String command = new String(request.getDataInput());
String time = null;
Date date = new Date();
// 判斷查詢命令
if (command.equals("GB")) {
// 中文格式
DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
DateFormat.FulL, Locale.CHINA);
time = cnDate.format(date);
}
else {
// 英文格式
DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
DateFormat.FulL, Locale.US);
time = enDate.format(date);
}
response.send(time.getBytes());
}
} - 實(shí)現(xiàn)日志記錄服務(wù)的事件處理器(LogHandler):
public class LogHandler extends EventAdapter {
public LogHandler() {
}
public void onClosed(Request request) throws Exception {
String log = new Date().toString() + " from " + request.getAddress().toString();
System.out.println(log);
}
public void onError(String error) {
System.out.println("Error: " + error);
}
} - 啟動(dòng)程序:
public class Start {
public static void main(String[] args) {
try {
LogHandler loger = new LogHandler();
TimeHandler timer = new TimeHandler();
Notifier notifier = Notifier.getNotifier();
notifier.addlistener(loger);
notifier.addlistener(timer);
System.out.println("Server starting ...");
Server server = new Server(5100);
Thread tServer = new Thread(server);
tServer.start();
}
catch (Exception e) {
System.out.println("Server error: " + e.getMessage());
System.exit(-1);
}
}
}
通過(guò)例子我們可以看到,基于事件回調(diào)的NIO多線程服務(wù)器模型,提供了清晰直觀的實(shí)現(xiàn)方式,可讓開(kāi)發(fā)者從NIO及多線程的技術(shù)細(xì)節(jié)中擺脫出來(lái),集中精力關(guān)注具體的業(yè)務(wù)實(shí)現(xiàn)。