本文將告訴你如何使用Netty2來編一個網(wǎng)絡(luò)應(yīng)用程序(包括客戶端和服務(wù)端)。我會介紹一個簡單的SumUp協(xié)議,用來對整數(shù)求和。通過源代碼的一步步講解,你會了解到Netty2的每個特性。
SumUp 協(xié)議
SumUp服務(wù)會加總從客戶端送來的ADD消息中的所有值,并且為每個ADD消息返回一個RESULT消息。所有消息都是由header和body兩部分組成:
header包含type和sequence兩個字段。type表示消息的類型(0是RESULT消息,1是ADD消息)。sequence用來表示一組對應(yīng)的ADD和RESULT(也就是說,服務(wù)器回應(yīng)ADD消息時,應(yīng)在RESULT中使用與ADD一樣的sequence值)。
ADD 消息
ADD消息包含了要被求和的值。
RESULT消息
RESULT具有不固定長度的消息體。當計算沒問題時,body內(nèi)容是加總的值(4bytes),如果有錯誤或溢位,則是2bytes. 見下圖:
實現(xiàn)MessageRecognizer
MessageRecognizer從送來的數(shù)據(jù)中重組出Message對象。這兒我們實現(xiàn)了一個SumUpMessageRecognizer,用于客戶端和服務(wù)端的信息重組。
public class SumUpMessageRecognizer implements MessageRecognizer { public static final int CLIENT_MODE = 1; public static final int SERVER_MODE = 2; private int mode; public SumUpMessageRecognizer(int mode) { switch (mode) { case CLIENT_MODE: case SERVER_MODE: this.mode = mode; break; default: throw new IllegalArgumentException("invalid mode: " + mode); } } public Message recognize(ByteBuffer buf) throws MessageParseException { // return null if message type is not arrived yet. if (buf.remaining() < Constants.TYPE_LEN) return null; int type = buf.getShort(); switch (mode) { // 如果是server模式,只讓它接收ADD消息. case SERVER_MODE: switch (type) { case Constants.ADD: return new AddMessage(); default: throw new MessageParseException("unknown type: " + type); } //如果是客戶端模式,只讓它接收RESULT消息. case CLIENT_MODE: switch (type) { case Constants.RESULT: return new ResultMessage(); default: throw new MessageParseException("unknown type: " + type); } default: throw new InternalError(); // this cannot happen } } }
實現(xiàn)ADD和RESULT消息
我們必須實現(xiàn)ADD和RESULT消息: ADD和RESULT。 它們都有公共的header,最好的方式是實現(xiàn)一個AbstractMessage,并且從它繼承出Add和Result消息。
源代碼:
實現(xiàn)協(xié)議處理流程
實現(xiàn)了Messagerecognizer和Message之后,要實現(xiàn)Server和Client是非常容易的事情,通過下面的代碼,你會很容易理解如何去實現(xiàn)協(xié)議的處理流程。
實現(xiàn)Server
實現(xiàn)服務(wù)端兩個主要的類,一個是Server類,另一個是ServerSessionListener. Server類負責(zé)啟動主程序并監(jiān)聽連接。而ServerSessionListener用于處理和發(fā)送消息。
public class Server { private static final int SERVER_PORT = 8080; private static final int DISPATCHER_THREAD_POOL_SIZE = 16; public static void main(String[] args) throws Throwable { // 初始化 I/O processor 和 event dispatcher IoProcessor ioProcessor = new IoProcessor(); ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher(); // 啟動缺省數(shù)量的I/O工作線程 ioProcessor.start(); // 啟動指定數(shù)量的event dispatcher 線程 eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE); eventDispatcher.start(); // 準備 message recognizer MessageRecognizer recognizer = new SumUpMessageRecognizer( SumUpMessageRecognizer.SERVER_MODE); // 準備session監(jiān)聽器,用于處理通訊過程. ServerSessionListener listener = new ServerSessionListener(); // 開啟server socket通道 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(SERVER_PORT)); // 監(jiān)聽連接,并開始通訊 System.out.println("listening on port " + SERVER_PORT); for (;;) { // 接受connection SocketChannel channel = ssc.accept(); // 建立新的session Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher); // 添加session監(jiān)聽器 session.addSessionListener(listener); // 開始通訊 session.start(); } } } public class ServerSessionListener implements SessionListener { public ServerSessionListener() { } public void connectionEstablished(Session session) { System.out.println(session.getSocketAddress() + " connected"); // 設(shè)置空閑時間為60秒 session.getConfig().setIdleTime(60); // 設(shè)置sum的初始值為0。 session.setAttachment(new Integer(0)); } public void connectionClosed(Session session) { System.out.println(session.getSocketAddress() + " closed"); } // 當收到client發(fā)來的消息時,此方法被調(diào)用 public void messageReceived(Session session, Message message) { System.out.println(session.getSocketAddress() + " RCVD: " + message); // client端只發(fā)送AddMessage. 其它情況要另作處理 // 在這里只是簡單的進行類型轉(zhuǎn)換處理 AddMessage am = (AddMessage) message; // 將收到的消息里的值加上當前sum的值. int sum = ((Integer) session.getAttachment()).intValue(); int value = am.getValue(); long expectedSum = (long) sum + value; if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) { // 如果溢位返回錯誤消息 ResultMessage rm = new ResultMessage(); rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。 rm.setOk(false); session.write(rm); } else { // 加總 sum = (int) expectedSum; session.setAttachment(new Integer(sum)); // 返回結(jié)果消息 ResultMessage rm = new ResultMessage(); rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。 rm.setOk(true); rm.setValue(sum); session.write(rm); } } public void messageSent(Session session, Message message) { System.out.println(session.getSocketAddress() + " SENT: " + message); } public void sessionIdle(Session session) { System.out.println(session.getSocketAddress() + " disconnecting the idle"); // 關(guān)閉空閑的會話。 session.close(); } // 異常發(fā)生時,將調(diào)用此方法 public void exceptionCaught(Session session, Throwable cause) { System.out.println(Thread.currentThread().getName() + session.getSocketAddress() + " exception:"); cause.printStackTrace(System.out); if (cause instanceof MessageParseException) { // 印出錯誤信息內(nèi)容,便于調(diào)試 MessageParseException mpe = (MessageParseException) cause; ByteBuffer buf = mpe.getBuffer(); System.out.println(buf); System.out.print("Buffer Content: "); while (buf.remaining() > 0) { System.out.print(buf.get() & 0xFF); System.out.print(' '); } System.out.println(); } // 關(guān)閉會話 session.close(); } }
服務(wù)端運行后,其輸出的內(nèi)容示例如下:
listening on port 8080 /127.0.0.1:4753 connected /127.0.0.1:4753 RCVD: 0:ADD(4) /127.0.0.1:4753 RCVD: 1:ADD(6) /127.0.0.1:4753 RCVD: 2:ADD(2) /127.0.0.1:4753 RCVD: 3:ADD(7) /127.0.0.1:4753 RCVD: 4:ADD(8) /127.0.0.1:4753 RCVD: 5:ADD(1) /127.0.0.1:4753 SENT: 0:RESULT(4) /127.0.0.1:4753 SENT: 1:RESULT(10) /127.0.0.1:4753 SENT: 2:RESULT(12) /127.0.0.1:4753 SENT: 3:RESULT(19) /127.0.0.1:4753 SENT: 4:RESULT(27) /127.0.0.1:4753 SENT: 5:RESULT(28) /127.0.0.1:4753 closed
實現(xiàn)客戶端
跟服務(wù)端對應(yīng),主要由Client和ClientSessionListener組成。
public class Client { private static final String HOSTNAME = "localhost"; private static final int PORT = 8080; private static final int CONNECT_TIMEOUT = 30; // seconds private static final int DISPATCHER_THREAD_POOL_SIZE = 4; public static void main(String[] args) throws Throwable { // 預(yù)備要加總的值。 int[] values = new int[args.length]; for (int i = 0; i < args.length; i++) { values[i] = Integer.parseInt(args[i]); } // 初始化 I/O processor 和 event dispatcher IoProcessor ioProcessor = new IoProcessor(); ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher(); // 開始缺省數(shù)量的I/O工作線程 ioProcessor.start(); // 啟動指定數(shù)量的event dispatcher線程 eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE eventDispatcher.start(); // 準備 message recognizer MessageRecognizer recognizer = new SumUpMessageRecognizer( SumUpMessageRecognizer.CLIENT_MODE); // 準備客戶端會話。 Session session = new Session(ioProcessor, new InetSocketAddress( HOSTNAME, PORT), recognizer, eventDispatcher); session.getConfig().setConnectTimeout(CONNECT_TIMEOUT); // 開始會話,并使用ClientSessionListener監(jiān)聽。 ClientSessionListener listener = new ClientSessionListener(values); session.addSessionListener(listener); session.start(); // 一直等到加總完成 while ( !listener.isComplete() ) { Thread.sleep(1000); } // 停止 I/O processor 和 event dispatcher eventDispatcher.stop(); ioProcessor.stop(); } } public class ClientSessionListener implements SessionListener { private final int[] values; private boolean complete; public ClientSessionListener(int[] values) { this.values = values; } public boolean isComplete() { return complete; } // 當連接建立好后會調(diào)用此方法。 public void connectionEstablished(Session session) { System.out.println("connected to " + session.getSocketAddress()); // 發(fā)送加總請求。 for (int i = 0; i < values.length; i++) { AddMessage m = new AddMessage(); m.setSequence(i); m.setValue(values[i]); session.write(m); } } public void connectionClosed(Session session) { System.out.println("disconnected from " + session.getSocketAddress()); } // 當收到server的回應(yīng)信息時,會調(diào)用此方法 public void messageReceived(Session session, Message message) { System.out.println("RCVD: " + message); // 服務(wù)端只發(fā)送ResultMessage. 其它情況下 // 要通過instanceOf來判斷它的類型. ResultMessage rm = (ResultMessage) message; if (rm.isOk()) { // 如果ResultMessage是OK的. // 根據(jù)ResultMessage的sequence值來判斷如果, // 一次消息的sequence值,則 if (rm.getSequence() == values.length - 1) { // 打印出結(jié)果. System.out.println("The sum: " + rm.getValue()); // 關(guān)閉會話 session.close(); complete = true; } } else { // 如有錯誤,則打印錯誤信息,并結(jié)束會話. System.out.println("server error, disconnecting..."); session.close(); complete = true; } } public void messageSent(Session session, Message message) { System.out.println("SENT: " + message); } public void sessionIdle(Session session) { } public void exceptionCaught(Session session, Throwable cause) { cause.printStackTrace(System.out); if (cause instanceof ConnectException) { // 如果連接server失敗, 則間隔5秒重試連接. System.out.println("sleeping..."); try { Thread.sleep(5000); } catch (InterruptedException e) { } System.out.println("reconnecting... " + session.getSocketAddress()); session.start(); } else { session.close(); } } }
通過上面的例子,你也許會發(fā)現(xiàn)實現(xiàn)一個自定義的協(xié)議原來如此簡單。你如果用Netty試著去實現(xiàn)自己的smtp或pop協(xié)議,我想也不會是一件難事了。
Netty2的首頁在http://gleamynode.net/dev/projects/netty2/index.html,你可以在這找到本文的全部源碼。