posts - 66,  comments - 40,  trackbacks - 0

          本文將告訴你如何使用Netty2來編一個網絡應用程序(包括客戶端和服務端)。我會介紹一個簡單的SumUp協議,用來對整數求和。通過源代碼的一步步講解,你會了解到Netty2的每個特性。

          SumUp 協議

          SumUp服務會加總從客戶端送來的ADD消息中的所有值,并且為每個ADD消息返回一個RESULT消息。所有消息都是由header和body兩部分組成:

          header包含type和sequence兩個字段。type表示消息的類型(0是RESULT消息,1是ADD消息)。sequence用來表示一組對應的ADD和RESULT(也就是說,服務器回應ADD消息時,應在RESULT中使用與ADD一樣的sequence值)。

          ADD 消息

          ADD消息包含了要被求和的值。

          RESULT消息

          RESULT具有不固定長度的消息體。當計算沒問題時,body內容是加總的值(4bytes),如果有錯誤或溢位,則是2bytes. 見下圖:

          ?

          實現MessageRecognizer

          MessageRecognizer從送來的數據中重組出Message對象。這兒我們實現了一個SumUpMessageRecognizer,用于客戶端和服務端的信息重組。

          public class SumUpMessageRecognizer implements MessageRecognizer {
          ??
          ?? publicstaticfinalint CLIENT_MODE = 1;
          ??
          ?? publicstaticfinalint SERVER_MODE = 2;
          ??
          ?? privateint mode;
          ??
          ?? publicSumUpMessageRecognizer(int mode) {
          ??? ??? switch (mode) {
          ??? ??????? case CLIENT_MODE:
          ??????? ??? ??????? case SERVER_MODE:
          ???? ??????????? this.mode = mode;
          ???? ??????????? break;
          ???????????????????? default:
          ???? ??????????? thrownew IllegalArgumentException("invalid mode: " + mode);
          ??? ??? }
          ?? }
          ??
          ?? public Message recognize(ByteBuffer buf) throws MessageParseException {
          ??? ??? // return null if message type is not arrived yet.
          ??? ??? if (buf.remaining() < Constants.TYPE_LEN)
          ???? ??????? returnnull;
          ????
          ??? ??? int type = buf.getShort();
          ??? ??? switch (mode) {
          ??? ?????? ?// 如果是server模式,只讓它接收ADD消息.
          ??? ??????? case SERVER_MODE:
          ???? ?????????? ?switch (type) {
          ???? ??????????????? case Constants.ADD:
          ????? ??????????????? returnnewAddMessage();
          ???? ??????????????? default:
          ????? ??????????????????? thrownew MessageParseException("unknown type: " + type);
          ???? ??????????? }
          ??? ??????? //如果是客戶端模式,只讓它接收RESULT消息.
          ??? ??????? case CLIENT_MODE:
          ???????????????????????? switch (type) {
          ???? ??????????????? case Constants.RESULT:
          ????? ??????????????????? returnnewResultMessage();
          ???? ??????????????? default:
          ????? ??????????????????????? thrownew MessageParseException("unknown type: " + type);
          ???? ??????????? }
          ??? ??????? default:
          ???? ??????????? thrownew InternalError(); // this cannot happen
          ??? ??? }
          ?? }
          ? }

          實現ADD和RESULT消息

          我們必須實現ADD和RESULT消息: ADD和RESULT。 它們都有公共的header,最好的方式是實現一個AbstractMessage,并且從它繼承出Add和Result消息。

          源代碼:

        1. AbstractMessage.java
          						1
          						/*
          				
        2. 						2
          						 * @(#) $Id: AbstractMessage.java 11 2005-04-18 03:42:45Z trustin $
          						 */
          						package 
          						net.gleamynode.netty2.example.sumup;
          56import java.nio.ByteBuffer;
          78import net.gleamynode.netty2.Message;
          9import net.gleamynode.netty2.MessageParseException;
          
        3. 						17
          						public
          						abstract
          						class
          						AbstractMessage implements Message {
          1819privatefinalint type;
          2021privateint sequence;
          2223privateboolean readHeader;
          2425privateboolean wroteHeader;
          2627protectedAbstractMessage(int type) {
          28this.type = type;
          29  	}
          3031publicint getSequence() {
          32return sequence;
          33  	}
          3435publicvoid setSequence(int sequence) {
          36this.sequence = sequence;
          37  	}
          3839publicfinalboolean read(ByteBuffer buf) throws MessageParseException {
          40// read a header if not read yet.41if (!readHeader) {
          42  			readHeader = readHeader(buf);
          43if (!readHeader)
          44return false;
          45  		}
          4647// Header is read, now try to read body48if (readBody(buf)) {
          49// finished reading single complete message50
        4. 		readHeader = false; // reset state51returntrue;
          52  		} else53return false;
          54  	}
          5556privateboolean readHeader(ByteBuffer buf) throws MessageParseException {
          57// if header is not fully read, don't read it.58if (buf.remaining() < Constants.HEADER_LEN)
          59return false;
          6061// read header and validate the message62int readType = buf.getShort();
          63if (type != readType)
          64thrownew MessageParseException("type mismatches: " + readType
          65  					+ " (expected: " + type + ')');
          6667// read sequence number of the message68  		sequence = buf.getInt();
          69returntrue;
          70  	}
          7172protectedabstractboolean readBody(ByteBuffer buf)
          73  			throws MessageParseException;
          7475publicboolean write(ByteBuffer buf) {
          76// write a header if not written yet.77if (!wroteHeader) {
          78  			wroteHeader = writeHeader(buf);
          79if (!wroteHeader)
          80return false; // buffer is almost full perhaps81  		}
          8283// Header is written, now try to write body84if (writeBody(buf)) {
          85// finished writing single complete message86  			wroteHeader = false;
          87returntrue;
          88  		} else {
          89return false;
          90  		}
          91  	}
          9293privateboolean writeHeader(ByteBuffer buf) {
          94// check if there is enough space to write header95if (buf.remaining() < Constants.HEADER_LEN) return false;
          96  		buf.putShort((short) type);
          97  		buf.putInt(sequence);
          98returntrue;
          99  	}
          100101protectedabstractboolean writeBody(ByteBuffer buf);
          102 }
          



        5. ?

        6. AddMessage.java
          						
          						
          						package 
          						
          								net.gleamynode.netty2.example.sumup
          						
          						
          								5
          						
          						
          								6
          						
          						import java.nio.ByteBuffer;
          78import net.gleamynode.netty2.MessageParseException;
          publicclassAddMessageextendsAbstractMessage {
          1718privateint value;
          1920publicAddMessage() {
          21super(Constants.ADD);
          22  	}
          2324publicint getValue() {
          25return value;
          26  	}
          2728publicvoid setValue(int value) {
          29this.value = value;
          30  	}
          3132protectedboolean readBody(ByteBuffer buf) throws MessageParseException {
          33// don't read body if it is partially readable34if (buf.remaining() < Constants.ADD_BODY_LEN) return false;
          35  		value = buf.getInt();
          36returntrue;
          37  	}
          3839protectedboolean writeBody(ByteBuffer buf) {
          40// check if there is enough space to write body41if (buf.remaining() < Constants.ADD_BODY_LEN)
          42return false;
          4344  		buf.putInt(value);
          4546returntrue;
          47  	}
          4849public String toString() {
          50// it is a good practice to create toString() method on message classes.51return getSequence() + ":ADD(" + value + ')';
          52  	}
          53  }
          


          ?

        7. ResultMessage.jav a?

          ?

        8. 								package 
          								
          										net.gleamynode.netty2.example.sumup
          								
          								
          										5
          								
          								
          										6
          								
          								import java.nio.ByteBuffer;
          78import net.gleamynode.netty2.MessageParseException;
          • 16 public class ResultMessage extends AbstractMessage { 1718privateboolean ok; 1920privateint value; 2122privateboolean processedResultCode; 2324publicResultMessage() { 25super(Constants.RESULT); 26 } 2728publicboolean isOk() { 29return ok; 30 } 3132publicvoid setOk(boolean ok) { 33this.ok = ok; 34 } 3536publicint getValue() { 37return value; 38 } 3940publicvoid setValue(int value) { 41this.value = value; 42 } 4344protectedboolean readBody(ByteBuffer buf) throws MessageParseException { 45if (!processedResultCode) { 46 processedResultCode = readResultCode(buf); 47if (!processedResultCode) 48return false; 49 } 5051if (ok) { 52if (readValue(buf)) { 53 processedResultCode = false; 54returntrue; 55 } else56return false; 57 } else { 58 processedResultCode = false; 59returntrue; 60 } 61 } 6263privateboolean readResultCode(ByteBuffer buf) { 64if (buf.remaining() < Constants.RESULT_CODE_LEN) 65return false; 66 ok = buf.getShort() == Constants.RESULT_OK; 67returntrue; 68 } 6970privateboolean readValue(ByteBuffer buf) { 71if (buf.remaining() < Constants.RESULT_VALUE_LEN) 72return false; 73 value = buf.getInt(); 74returntrue; 7576 } 7778protectedboolean writeBody(ByteBuffer buf) { 79// check if there is enough space to write body80if (buf.remaining() < Constants.RESULT_CODE_LEN 81 + Constants.RESULT_VALUE_LEN) 82return false; 8384 buf.putShort((short) (ok ? Constants.RESULT_OK 86 : Constants.RESULT_ERROR)); 87if (ok) 88 buf.putInt(value); 8990returntrue; 91 } 9293public String toString() { 94if (ok) { 95return getSequence() + ":RESULT(" + value + ')'; 96 } else { 97return getSequence() + ":RESULT(ERROR)"; 98 } 99 } 100 }

        9. ?

        10. 實現協議處理流程

          實現了Messagerecognizer和Message之后,要實現Server和Client是非常容易的事情,通過下面的代碼,你會很容易理解如何去實現協議的處理流程。

          實現Server

          實現服務端兩個主要的類,一個是Server類,另一個是ServerSessionListener. Server類負責啟動主程序并監聽連接。而ServerSessionListener用于處理和發送消息。

          public class Server {

          privatestaticfinalint SERVER_PORT = 8080;
          privatestaticfinalint DISPATCHER_THREAD_POOL_SIZE = 16;

          publicstaticvoid main(String[] args) throws Throwable {
          //?初始化 I/O processor?和 event dispatcher
          IoProcessor ioProcessor = new IoProcessor();
          ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
          ?
          ? // 啟動缺省數量的I/O工作線程
          ? ioProcessor.start();
          ?
          ? // 啟動指定數量的event dispatcher?線程
          ? eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
          ? eventDispatcher.start();
          ?
          ? //?準備 message recognizer
          ? MessageRecognizer recognizer = new SumUpMessageRecognizer(
          ? SumUpMessageRecognizer.SERVER_MODE);
          ?
          ? // 準備session監聽器,用于處理通訊過程.
          ?ServerSessionListener listener = new ServerSessionListener();
          ?
          ?// 開啟server socket通道
          ?ServerSocketChannel ssc = ServerSocketChannel.open();
          ?ssc.socket().bind(new InetSocketAddress(SERVER_PORT));
          ?
          ?// 監聽連接,并開始通訊
          ?System.out.println("listening on port " + SERVER_PORT);
          ?for (;;) {
          ?// 接受connection
          ?SocketChannel channel = ssc.accept();
          ?
          ?//?建立新的session
          ?Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);
          ?
          ?// 添加session監聽器
          ?session.addSessionListener(listener);
          ?
          ?//?開始通訊
          ?session.start();
          ?}
          ?}
          ?}
          ?

          public class ServerSessionListener implements SessionListener {

          ?

          ??????? public ServerSessionListener() {

          ??????? }

          ?

          ??????? public void connectionEstablished(Session session) {

          ?????????????? System.out.println(session.getSocketAddress() + " connected");

          ?

          ?????????????? //?設置空閑時間為60秒

          ?????????????? session.getConfig().setIdleTime(60);

          ?

          ?????????????? // 設置sum的初始值為0。

          ?????????????? session.setAttachment(new Integer(0));

          ??????? }

          ?

          ??????? public void connectionClosed(Session session) {

          ?????????????? System.out.println(session.getSocketAddress() + " closed");

          ? ?????? }

          ??????? // 當收到client發來的消息時,此方法被調用

          ? ?????? public void messageReceived(Session session, Message message) {

          ? ????????????? System.out.println(session.getSocketAddress() + " RCVD: " + message);

          ?

          ?????????????? // client端只發送AddMessage. 其它情況要另作處理

          ?????????????? // 在這里只是簡單的進行類型轉換處理

          ?????????????? AddMessage am = (AddMessage) message;

          ?

          ?????????????? // 將收到的消息里的值加上當前sum的值.

          ?????????????? int sum = ((Integer) session.getAttachment()).intValue();

          ?????????????? int value = am.getValue();

          ?????????????? long expectedSum = (long) sum + value;

          ?????????????? if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {

          ?????????????????????? // 如果溢位返回錯誤消息

          ??????? ?????????????? ResultMessage rm = newResultMessage();

          ??????? ?????????????? rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。

          ?????????????????????? rm.setOk(false);

          ?????????????????????? session.write(rm);

          ?????????????? } else {

          ?????????????????????? //? 加總

          ?????????????????????? sum = (int) expectedSum;

          ?????????????????????? session.setAttachment(new Integer(sum));

          ?

          ?????????????????????? // 返回結果消息

          ?????????????????????? ResultMessage rm = newResultMessage();

          ?????????????????????? rm.setSequence(am.getSequence()); // 從送來的Add消息中得到sequence值。

          ?????????????????????? rm.setOk(true);

          ?????????????????????? rm.setValue(sum);

          ?????????????????????? session.write(rm);

          ?????????????? }

          ??????? }

          ?

          ??????? public void messageSent(Session session, Message message) {

          ?????????????? System.out.println(session.getSocketAddress() + " SENT: " + message);

          ??????? }

          ?

          ??????? public void sessionIdle(Session session) {

          ?????????????? System.out.println(session.getSocketAddress()

          ?????????????????????????????? + " disconnecting the idle");

          ?

          ?????????????? // 關閉空閑的會話。

          ?????????????? session.close();

          ??????? }

          ??????? // 異常發生時,將調用此方法

          ??????? public void exceptionCaught(Session session, Throwable cause) {

          ?????????????? System.out.println(Thread.currentThread().getName()

          ?????????????????????????????? + session.getSocketAddress() + " exception:");

          ? ????????????? cause.printStackTrace(System.out);

          ?

          ? ????????????? if (cause instanceof MessageParseException) {

          ? ????????????????????? //?印出錯誤信息內容,便于調試

          ?????????????????????? MessageParseException mpe = (MessageParseException) cause;

          ?????????????????????? ByteBuffer buf = mpe.getBuffer();

          ?????????????????????? System.out.println(buf);

          ?????????????????????? System.out.print("Buffer Content: ");

          ?????????????????????? while (buf.remaining() > 0) {

          ?????????????????????????????? System.out.print(buf.get() & 0xFF);

          ?????????????????????????????? System.out.print(' ');

          ?????????????????????? }

          ?????????????????????? System.out.println();

          ?????????????? }

          ?

          ?????????????? // 關閉會話

          ?????????????? session.close();

          ??????? }

          }

          ?

          服務端運行后,其輸出的內容示例如下:

          listening on port 8080
          /127.0.0.1:4753 connected
          /127.0.0.1:4753 RCVD: 0:ADD(4)
          /127.0.0.1:4753 RCVD: 1:ADD(6)
          /127.0.0.1:4753 RCVD: 2:ADD(2)
          /127.0.0.1:4753 RCVD: 3:ADD(7)
          /127.0.0.1:4753 RCVD: 4:ADD(8)
          /127.0.0.1:4753 RCVD: 5:ADD(1)
          /127.0.0.1:4753 SENT: 0:RESULT(4)
          /127.0.0.1:4753 SENT: 1:RESULT(10)
          /127.0.0.1:4753 SENT: 2:RESULT(12)
          /127.0.0.1:4753 SENT: 3:RESULT(19)
          /127.0.0.1:4753 SENT: 4:RESULT(27)
          /127.0.0.1:4753 SENT: 5:RESULT(28)
          /127.0.0.1:4753 closed
          														實現客戶端
          												
          跟服務端對應,主要由Client和ClientSessionListener組成。
          														

          public class Client {

          ??????? private static final String HOSTNAME = "localhost";

          ?

          ??????? private static final int PORT = 8080;

          ?

          ??????? private static final int CONNECT_TIMEOUT = 30; // seconds

          ?

          ??????? private static final int DISPATCHER_THREAD_POOL_SIZE = 4;

          ?

          ??????? public static void main(String[] args) throws Throwable {

          ?????????????? // 預備要加總的值。

          ?????????????? int[] values = newint[args.length];

          ?????????????? for (int i = 0; i < args.length; i++) {

          ?????????????????????? values[i] = Integer.parseInt(args[i]);

          ?????????????? }

          ?

          ?????????????? // 初始化 I/O processor 和 event dispatcher

          ?????????????? IoProcessor ioProcessor = new IoProcessor();

          ?????????????? ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();

          ?

          ?????????????? // 開始缺省數量的I/O工作線程

          ?????????????? ioProcessor.start();

          ?

          ?????????????? // 啟動指定數量的event dispatcher線程

          ??????? eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE

          ?????????????? eventDispatcher.start();

          ?

          ?????????????? // 準備 message recognizer

          ?????????????? MessageRecognizer recognizer = new SumUpMessageRecognizer(

          ?????????????????????????????? SumUpMessageRecognizer.CLIENT_MODE);

          ?

          ?????????????? // 準備客戶端會話。

          ?????????????? Session session = new Session(ioProcessor, new InetSocketAddress(

          ?????????????????????????????? HOSTNAME, PORT), recognizer, eventDispatcher);

          ??????????????

          ??????????????

          ?????????????? session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);

          ??????????????

          ?????????????? // 開始會話,并使用ClientSessionListener監聽。

          ?????????????? ClientSessionListener listener = new ClientSessionListener(values);

          ?????????????? session.addSessionListener(listener);

          ?????????????? session.start();

          ??????????????

          ?????????????? // 一直等到加總完成

          ?????????????? while ( !listener.isComplete() ) {

          ?????????????????????? Thread.sleep(1000);

          ?????????????? }

          ??????????????

          ?????????????? // 停止 I/O processor 和 event dispatcher

          ?????????????? eventDispatcher.stop();

          ?????????????? ioProcessor.stop();

          ? ?????? }

          ?}

          public class ClientSessionListener implements SessionListener {

          ?

          ? ?????? private final int[] values;

          ? ?????? private boolean complete;

          ?

          ? ?????? public ClientSessionListener(int[] values) {

          ? ????????????? this.values = values;

          ? ?????? }

          ? ??????

          ? ?????? public boolean isComplete() {

          ? ????????????? return complete;

          ? ?????? }

          ??????? // 當連接建立好后會調用此方法。

          ? ?????? public void connectionEstablished(Session session) {

          ? ????????????? System.out.println("connected to " + session.getSocketAddress());

          ?

          ? ????????????? // 發送加總請求。

          ? ????????????? for (int i = 0; i < values.length; i++) {

          ? ????????????????????? AddMessage m = new AddMessage();

          ? ????????????????????? m.setSequence(i);

          ? ????????????????????? m.setValue(values[i]);

          ? ????????????????????? session.write(m);

          ? ????????????? }

          ? ?????? }

          ?

          ? ?????? public void connectionClosed(Session session) {

          ? ????????????? System.out.println("disconnected from " + session.getSocketAddress());

          ??????? }

          ??????? // 當收到server的回應信息時,會調用此方法

          ??????? public void messageReceived(Session session, Message message) {

          ?????????????? System.out.println("RCVD: " + message);

          ?

          ?????????????? // 服務端只發送ResultMessage. 其它情況下

          ? ????????????? // 要通過instanceOf來判斷它的類型.

          ? ????????????? ResultMessage rm = (ResultMessage) message;

          ? ????????????? if (rm.isOk()) {

          ? ????????????????????? // 如果ResultMessage是OK的.

          ? ????????????????????? // 根據ResultMessage的sequence值來判斷如果,

          ? ????????????????????? // 一次消息的sequence值,則

          ? ????????????????????? if (rm.getSequence() == values.length - 1) {

          ? ????????????????????????????? // 打印出結果.

          ? ????????????????????????????? System.out.println("The sum: " + rm.getValue());
          ???????????????????????????????// 關閉會話

          ?????????????????????????????? session.close();

          ?????????????????????????????? complete = true;

          ?????????????????????? }

          ?????????????? } else {

          ?????????????????????? //?如有錯誤,則打印錯誤信息,并結束會話.

          ? ????????????????????? System.out.println("server error, disconnecting...");

          ?????????????????????? session.close();

          ?????????????????????? complete = true;

          ?????????????? }

          ??????? }

          ?

          ??????? public void messageSent(Session session, Message message) {

          ?????????????? System.out.println("SENT: " + message);

          ??????? }

          ?

          ??????? public void sessionIdle(Session session) {

          ??????????????

          ??????? }

          ?

          ??????? public void exceptionCaught(Session session, Throwable cause) {

          ?????????????? cause.printStackTrace(System.out);

          ?

          ?????????????? if (cause instanceof ConnectException) {

          ?????????????????????? // 如果連接server失敗, 則間隔5秒重試連接.

          ? ????????????????????? System.out.println("sleeping...");

          ? ????????????????????? try {

          ? ????????????????????????????? Thread.sleep(5000);

          ? ????????????????????? } catch (InterruptedException e) {

          ? ????????????????????? }

          ? ?????????????????????

          ? ????????????????????? System.out.println("reconnecting... " + session.getSocketAddress());

          ? ????????????????????? session.start();

          ? ????????????? } else {

          ? ????????????????????? session.close();

          ? ????????????? }

          ? ?????? }

          ?}

          通過上面的例子,你也許會發現實現一個自定義的協議原來如此簡單。你如果用Netty試著去實現自己的smtp或pop協議,我想也不會是一件難事了。

          ?

          Netty2的首頁在http://gleamynode.net/dev/projects/netty2/index.html,你可以在這找到本文的全部源碼。

        11. posted on 2006-06-20 19:45 happytian 閱讀(372) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          <2025年7月>
          293012345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          Welcome here, my friend!

          常用鏈接

          留言簿(12)

          隨筆檔案(66)

          文章分類

          文章檔案(63)

          web

          最新隨筆

          搜索

          •  

          積分與排名

          • 積分 - 89726
          • 排名 - 647

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 兴安盟| 女性| 石阡县| 蒙自县| 永靖县| 施秉县| 曲阳县| 孟津县| 普宁市| 剑川县| 左权县| 沈阳市| 新野县| 泰宁县| 嘉祥县| 湖北省| 龙岩市| 固始县| 瓦房店市| 东乌珠穆沁旗| 濮阳县| 永年县| 延安市| 鹤壁市| 屏边| 阳山县| 南汇区| 河西区| 光山县| 元氏县| 济源市| 桃园市| 神农架林区| 郑州市| 临高县| 阿拉尔市| 永嘉县| 大埔县| 年辖:市辖区| 宝清县| 德化县|