posts - 56,  comments - 12,  trackbacks - 0

          Overview

          Cindy是一個Java異步I/O框架,提供了一個統(tǒng)一高效的模型,同時支持TCP、UDP以及Pipe,并能夠方便的在異步和同步操作之間進(jìn)行 切換。目前其實(shí)現(xiàn)是基于Java NIO,并計(jì)劃通過JNI來支持各操作系統(tǒng)上本身提供的異步I/O功能,應(yīng)用可以方便的通過運(yùn)行期屬性來方便的切換到更為高效的實(shí)現(xiàn)上。

          為什么不使用Java IO?

          Java IO包采用阻塞模型來處理網(wǎng)絡(luò)操作。假設(shè)應(yīng)用調(diào)用了read方法讀取來自網(wǎng)絡(luò)流上的數(shù)據(jù),而數(shù)據(jù)尚未到達(dá)本機(jī),則對read方法的調(diào)用將一直等到數(shù)據(jù)到達(dá) 并成功接收后才返回。由于IO包的所有操作會一直阻塞當(dāng)前線程,為了不影響其他事務(wù)的處理,在一般情況下應(yīng)用總是會用一個獨(dú)立線程或線程池來處理這些 I/O操作。

          Java IO包阻塞模型的優(yōu)點(diǎn)就是非常簡單易用,如果配合上線程池,效率也非常不錯,所以得到了廣泛的應(yīng)用。但這種簡單模型也有其固有的缺點(diǎn):擴(kuò)展性不足。如果應(yīng) 用只需要進(jìn)行少量的網(wǎng)絡(luò)操作,那么開啟若干個單獨(dú)的I/O線程無傷大雅;但是如果是實(shí)現(xiàn)一個服務(wù)端的應(yīng)用,需要同時處理成千上萬個網(wǎng)絡(luò)連接,采用阻塞模型 的話就要同時開啟上千個線程。雖然現(xiàn)在強(qiáng)勁的服務(wù)器能夠負(fù)擔(dān)起這么多線程,但系統(tǒng)花在線程調(diào)度上的時間也會遠(yuǎn)遠(yuǎn)多于用于處理網(wǎng)絡(luò)操作上的時間。

          為什么要使用Java NIO?

          采用IO包的阻塞模型,如果數(shù)據(jù)量不大的話,則線程的大部分時間都會浪費(fèi)在等待上。對于稀缺的服務(wù)器資源而言,這是一種極大的浪費(fèi)。

          在Java 1.4中引入的NIO包里,最引人注目的就是提供了非阻塞I/O的實(shí)現(xiàn)。和IO包提供的阻塞模型不同的是,對一個非阻塞的連接進(jìn)行操作,如果此時相應(yīng)的狀 態(tài)還未就緒,則調(diào)用會立即返回,而不是等待狀態(tài)就緒后才返回。假設(shè)應(yīng)用調(diào)用了read方法讀取來自網(wǎng)絡(luò)流上的數(shù)據(jù),而此刻數(shù)據(jù)尚未到達(dá)本機(jī),則對read 方法的調(diào)用將立即返回,并通知應(yīng)用目前只能讀到0個字節(jié)。應(yīng)用可以根據(jù)自身的策略來進(jìn)行處理,比如讀取其他網(wǎng)絡(luò)連接的數(shù)據(jù)等等,這就使得一個線程管理多個 連接成為可能。

          NIO包還提供了Selector機(jī)制,將一個非阻塞連接注冊在Selector上,應(yīng)用就不需去輪詢該連接當(dāng)前是否可以讀取或?qū)懭霐?shù)據(jù),在相應(yīng)狀 態(tài)就緒后Selector會通知該連接。由于一個Selector上可以注冊多個非阻塞連接,這樣就使得可以用更少的線程數(shù)來管理更多的連接。

          為什么選擇Cindy,而不直接使用NIO?

          Java NIO包雖然提供了非阻塞I/O模型,但是直接使用NIO的非阻塞I/O需要成熟的網(wǎng)絡(luò)編程經(jīng)驗(yàn),處理眾多底層的網(wǎng)絡(luò)異常,以及維護(hù)連接狀態(tài),判斷連接超 時等等。對于關(guān)注于其業(yè)務(wù)邏輯的應(yīng)用而言,這些復(fù)雜性都是不必要的。不同Java版本的NIO實(shí)現(xiàn)也會有一些Bug,Cindy會巧妙的繞開這些已知的 Bug并完成相應(yīng)功能。并且NIO本身也在不斷發(fā)展中,Java 1.4的NIO包中只實(shí)現(xiàn)了TCP/UDP單播/Pipe,Java 5.0中引入的SSLEngine類使得基于非阻塞的流協(xié)議(TCP/Pipe)支持SSL/TLS成為可能,在未來的版本中還可能會加入非阻塞多播的實(shí) 現(xiàn)。Cindy會關(guān)注這些新功能,并將其納入到統(tǒng)一的框架中來。

          Cindy雖然目前的實(shí)現(xiàn)是基于NIO,但它會不僅僅局限于NIO。等到一些基于操作系統(tǒng)本身實(shí)現(xiàn)的AIO(Asynchronous IO)類庫成熟后,它也會加入對這些類庫的支持,通過操作系統(tǒng)本身實(shí)現(xiàn)的AIO來提高效率。

          如果應(yīng)用程序只想使用一種高效的模型,而不想關(guān)心直接使用NIO所帶來的這些限制,或希望將來無需更改代碼就切換到更高效率的AIO實(shí)現(xiàn)上,那么 Cindy會是一個很好的選擇。并且使用Cindy,應(yīng)用可以在同步和異步之間進(jìn)行無縫切換,對于大部分操作是異步,可某些特殊操作需要同步的應(yīng)用而言, 這極大的提高了易用性。

          Hello world example

          場景:服務(wù)端監(jiān)聽本地的1234端口,打印任何收到的消息到控制臺上;客戶端建立TCP連接到本地的1234端口,發(fā)送完"Hello world!",然后斷開連接。

          基于Java IO包的客戶端示例

          基于Java IO包的阻塞模型的示例,用于對比,不做額外說明。(異常處理代碼略)

          Socket socket = new Socket("localhost",1234);
          OutputStream os = new BufferedOutputStream(socket.getOutputStream());
          os.write("Hello world!".getBytes());
          os.close();
          socket.close();

          基于Java IO包的服務(wù)端示例

          基于Java IO包的阻塞模型的示例,用于對比,不做額外說明。(異常處理代碼略)

          ServerSocket ss = new ServerSocket(1234);
          while (true) {
          ? final Socket socket = ss.accept();
          ? newThread() {

          ??? public void run() {
          ????? BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
          ????? String line = null;
          ????? while ((line = br.readLine()) != null) {
          ??????? System.out.println(line);
          ????? }
          }

          }.start();
          }

          基于Cindy的同步客戶端示例

          								//create tcp session
          Session session = SessionFactory.createSession(SessionType.TCP);

          //set remote address
          session.setRemoteAddress(new InetSocketAddress("localhost", 1234));

          //start session and wait until completed
          session.start().complete();

          //create send packet
          Packet packet = new DefaultPacket(BufferFactory.wrap("Hello world\!".getBytes()));

          //send packet and wait until completed
          session.flush(packet).complete();

          //close session and wait until completed
          session.close().complete();

          Step 1: create session

          Session是連接的抽象,一個Session代表著一個連接。連接有多種類型,SessionType.TCP代表著TCP連接,SessionType.UDP代表著UDP連接,SessionType.PIPE代表著Pipe連接等等。

          在這里通過SessionFactory.createSession(SessionType.TCP)創(chuàng)建了一個TCP連接。如果要創(chuàng)建UDP連接,可以使用類似的代碼:

          Session session = SessionFactory.createSession(SessionType.UDP);

          連接創(chuàng)建后可以通過返回Session實(shí)例的getSessionType方法來得到該連接的類型。

          Step 2: set the remote address

          TCP需要先設(shè)置好要連接的地址才能開始連接,在這里是連接到本機(jī)的1234端口。對于UDP來說,這一步設(shè)置不是必須的。

          Step 3: start session

          參數(shù)設(shè)置完成后就可以通過session.start方法啟動Session了。*和Java IO中的阻塞同步調(diào)用不同,由于Cindy是一個異步I/O框架,在調(diào)用session.start()方法并返回后,連接可能還沒有建立成功。*如果我 們想要切換到同步阻塞的方式,比如等連接建立成功后才返回,則可以通過start方法返回的Future對象來進(jìn)行切換。

          Session中所有的異步方法(如start/close/flush/send等)的返回值均為Future對象,該對象代表著所進(jìn)行的異步操作。我們可以通過三種方式來處理Future對象:

          • 阻塞:調(diào)用Future.complete(),則該方法會一直等到異步操作完成后才返回;或者可以調(diào)用Future.complete(int timeout)來等待異步操作完成,如果在指定時間內(nèi)操作未完成,則該方法也會返回。值得注意的是操作完成(isCompleted)包括操作成功和操作失敗兩種狀態(tài)(isSucceeded)。
          • 輪詢:周期性的調(diào)用Future.isCompleted()方法來查詢異步操作是否完成。
          • 回調(diào):通過Future.addListener()方法加入自定義的FutureListener,在異步操作結(jié)束后,F(xiàn)uture會觸發(fā)FutureListener的futureCompleted事件。對任何已完成的Future對象調(diào)用addListener方法會馬上觸發(fā)FutureListener的futureCompleted事件。

          在該示例中我們采用的是第一種方式——阻塞,等待連接建立完成后才返回。可以通過Future.complete()方法的返回值或者 Session.isStarted()來判斷連接是否建立成功。在這個簡單的示例里假設(shè)不會出現(xiàn)任何連接錯誤,所以就不再判斷異常情況了,不過在正式的 應(yīng)用中應(yīng)當(dāng)注意相關(guān)的判斷。

          Step 4: the Packet/Buffer interface

          任何數(shù)據(jù)要在網(wǎng)絡(luò)上傳輸最終都得轉(zhuǎn)換成字節(jié)流,Buffer接口就是字節(jié)流的抽象。注意的是這里使用的Buffer是net.sf.cindy.Buffer,而沒有采用java.nio.Buffer,這個設(shè)計(jì)上的取舍可以參考以后的介紹。

          而數(shù)據(jù)傳輸除了要有內(nèi)容外,還得有目的地,這樣才能把內(nèi)容送到正確的地址上。Packet接口就代表著要發(fā)送的數(shù)據(jù),包括一個Buffer對象和一 個目的地址------SocketAddress。對于流協(xié)議(如TCP/Pipe)而言,目的地址在建立連接的時候就已經(jīng)設(shè)置好了,所以發(fā)送過程中無 需再進(jìn)行設(shè)置,用默認(rèn)的null即可;對于消息協(xié)議(比如非連接型的UDP),則需要在發(fā)送時指定每個包的目的地址。

          在這里要將"Hello world!"轉(zhuǎn)換為Buffer對象,可以通過BufferFactory來完成:BufferFactory.wrap("Hello world!".getBytes()),將一個字節(jié)數(shù)組包裝成Buffer。

          由于是TCP連接,目的地址在建立連接時就已經(jīng)指定好了,所以可以簡單的構(gòu)造一個只有Buffer而沒有SocketAddress的Packet對象:new DefaultPacket(buffer)。(DefaultPacket是Packet接口的默認(rèn)實(shí)現(xiàn))

          Step 5: send packet

          Session的flush方法和start方法一樣都是異步方法,在調(diào)用flush并返回時,數(shù)據(jù)可能并沒有發(fā)送完成,在該示例中仍然采用阻塞方式等待數(shù)據(jù)發(fā)送完成后才返回:session.flush(packet).complete()。

          flush和send方法的區(qū)別在于:flush方法接受的參數(shù)是Packet對象,send方法接受的參數(shù)是Object對象,通過send方法發(fā)送的對象會被Session所關(guān)聯(lián)的PacketEncoder轉(zhuǎn)換為Packet對象再進(jìn)行發(fā)送。

          Step 6: close session

          發(fā)送完成后需要關(guān)閉連接,close方法同樣是一個異步方法,在這里等待連接完全關(guān)閉后才返回:session.close().complete()。

          從這個同步的示例可以看到,雖然Cindy是一個異步I/O框架,但用它來完成同步的I/O操作也是一件非常容易的事情。

          基于Cindy的異步客戶端示例

          								final Session session = SessionFactory.createSession(SessionType.TCP);
          session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
          Future future = session.start();
          future.addListener(new FutureListener() {

          public void futureCompleted(Future future) throws Exception {
          Packet packet = new DefaultPacket("Hello world!".getBytes());
          session.flush(packet).complete();
          session.close();
          }

          });

          前面的三行代碼和同步示例沒有區(qū)別,只是在第四行代碼中采用了回調(diào)的方式來處理Future對象,而不是通過阻塞的方式。當(dāng)連接建立完成后,F(xiàn)utureListener的futureCompleted事件被觸發(fā),應(yīng)用可以在該方法中做相應(yīng)的事件處理。

          在發(fā)送Packet的過程中,其實(shí)也可以采用回調(diào)的方式來處理。在這里仍采用同步方法處理,一方面是減少內(nèi)嵌類的數(shù)量,另一方面是示例Cindy可以非常容易的切換同步和異步操作。后文對異步處理會有更詳細(xì)的介紹。

          基于Cindy的服務(wù)端示例

          SessionAcceptor acceptor = SessionFactory.createSessionAcceptor(SessionType.TCP);
          acceptor.setListenPort(1234);
          acceptor.setAcceptorHandler(new SessionAcceptorHandlerAdapter() {

          public void sessionAccepted(SessionAcceptor acceptor, Session session) throws Exception {
          session.setSessionHandler(new LogSessionHandler());
          session.start();
          }

          });
          acceptor.start();

          Step 1: create SessionAcceptor

          SessionAcceptor代表著連接的服務(wù)端,SessionAcceptor和Session是一對多關(guān)系,類似于IO包的ServerSocket和Socket。這里創(chuàng)建的是TCP服務(wù)端。

          Step 2: set listen port

          設(shè)置服務(wù)端的監(jiān)聽端口,在這里是1234端口;如果要設(shè)置監(jiān)聽地址,則可以通過setListenAddress來進(jìn)行設(shè)置。

          Step 3: set acceptor handler

          每當(dāng)SessionAcceptor上有連接建立成功,將會觸發(fā)該SessionAcceptor所關(guān)聯(lián)SessionAcceptorHandler的sessionAccepted事件,應(yīng)用可以在該事件中為連接上的Session設(shè)置一些屬性并開始或關(guān)閉連接。

          SessionAcceptorHandler接口是處理SessionAcceptor產(chǎn)生的各種事件,SessionHandler接口則用于 處理Session產(chǎn)生的各種事件。在這里我們僅僅關(guān)心對象接收事件(objectReceived),當(dāng)接收到對象后,將該對象打印到控制臺上。(注: LogSessionHandler的代碼沒有列出來)

          SessionHandler/SessionAcceptorHandler的更多介紹請參閱后面章節(jié)。

          PacketEncoder/PacketDecoder

          PacketEncoder

          在前面的Hello world示例中,我們都是通過session.flush方法來發(fā)送數(shù)據(jù),而該方法只接收Packet類型的參數(shù),這就要求我們在發(fā)送任何數(shù)據(jù)前都要先 進(jìn)行轉(zhuǎn)換。比如在前面示例中,我們就得先將"Hello world!"字符串轉(zhuǎn)換成一個代表"Hello world!"的Packet,然后再進(jìn)行發(fā)送。

          這種做法沒有什么問題,其唯一的缺陷在于將發(fā)送邏輯和序列化邏輯耦合在一起。比如在上面的示例中,發(fā)送邏輯是將"Hello world!"發(fā)送出去,序列化邏輯是"Hello world"字符串的字節(jié)表示。雖然有一定的關(guān)聯(lián),但的確是兩種不同的邏輯,比如我們可以更改序列化邏輯,通過Serializable接口來序列化 "Hello world",但這并不影響發(fā)送邏輯。

          神說,要有光,就有了光。你知道,神不關(guān)心光是怎么來的。PacketEncoder的作用就是分離發(fā)送邏輯和序列化邏輯。對于應(yīng)用而言,在發(fā)送時 它只需要把要發(fā)送的對象傳遞給Session,至于怎么序列化,則由Session所關(guān)聯(lián)的PacketEncoder來處理。所以在上面的Hello world示例中,發(fā)送邏輯可以改為:

          session.send("Hello world!");

          序列化邏輯可以通過PacketEncoder來設(shè)置:

          session.setPacketEncoder(new PacketEncoder() {

          public Packet encode(Session session, Object obj) throws Exception {
          returnnew DefaultPacket(BufferFactory.wrap(obj.toString().getBytes()));
          }

          });

          如果要改變序列化邏輯,比如通過Serializable接口來序列化,則只需要更改PacketEncoder,而不需要改動發(fā)送代碼:

          session.setPacketEncoder(new SerialEncoder());

          通過Cindy內(nèi)置的PacketEncoderChain,應(yīng)用可通過Session發(fā)送任意對象,把序列化邏輯完全交給PacketEncoder。如下面的偽碼所示:

          PacketEncoderChain chain = new PacketEncoderChain();
          chain.addPacketEncoder(new Message1Encoder());
          chain.addPacketEncoder(new Message2Encoder());

          session.setPacketEncoder(chain);
          session.send(new Message1());
          session.send(new Message2());

          PacketDecoder

          PacketEncoder是用來處理序列化邏輯的,相應(yīng)的,PacketDecoder則是用來處理反序列化邏輯的。

          發(fā)送方通過session.send方法可以發(fā)送任意對象,Session關(guān)聯(lián)的PacketEncoder會將該對象轉(zhuǎn)換為Packet發(fā)送出 去;接收方收到了Packet后,也可以通過其關(guān)聯(lián)的PacketDecoder將該P(yáng)acket轉(zhuǎn)換為一個對象,再通知應(yīng)用。假設(shè)發(fā)送方用了 SerialEncoder來發(fā)送序列化對象,則接收方就可以使用SerialDecoder來進(jìn)行反序列化,然后應(yīng)用就可以直接對對象進(jìn)行處理。

          session.setPacketDecoder(new SerialDecoder());
          session.setSessionHandler(new SessionHandlerAdapter() {

          public void objectReceived(Session session, Object obj) throws Exception {
          //obj即為反序列化后得到的對象
          }

          }

          Cindy源代碼example目錄下net.sf.cindy.example.helloworld包下提供了一個非常簡單的示例。

          應(yīng)用在實(shí)現(xiàn)PacketDecoder的時候要注意判斷當(dāng)前已接收到的內(nèi)容長度。

          比如TCP是一個流協(xié)議,一方發(fā)送了1000個字節(jié),另一方可能在接收的時候只收到了前200個字節(jié),剩下的800個字節(jié)要在下次接收時才收到,可是需要1000個字節(jié)才能構(gòu)造出一個完整的對象,則應(yīng)用的PacketDecoder實(shí)現(xiàn)可能會類似于:

          								public
          Object decode(Session session, Packet packet) throws Exception {
          Buffer content = packet.getContent();
          if (content.remaining() >= 1000) {
          //如果接收到的長度大于1000,則從Buffer中取出內(nèi)容并返回decode結(jié)果
          }
          returnnull; //接收長度不足,等待全部接收完成后再decode,返回null不會觸發(fā)objectReceived事件
          }

          SessionHandler/SessionHandlerAdapter

          SessionHandler接口用于處理Session的各種事件。比如當(dāng)連接建立成功后,會觸發(fā)SessionHandler的 sessionStarted事件;連接關(guān)閉后,會觸發(fā)SessionHandler的sessionClosed事件;對象發(fā)送成功后會觸發(fā) SessionHandler的objectSent事件;對象接收成功后會觸發(fā)SessionHandler的objectReceived事件等等。

          SessionHandlerAdapter是SessionHandler的空實(shí)現(xiàn)。即如果你僅僅對SessionHandler中某幾個事件感 興趣,就不用全部實(shí)現(xiàn)SessionHandler中定義的各種方法,而只需要繼承自SessionHandlerAdapter,實(shí)現(xiàn)感興趣的事件即 可。

          通過SessionHandler,可以極大的減少內(nèi)嵌類的數(shù)量。如前面的異步Hello world示例,如果用SessionHandler來改寫,則會是:

          Session session = SessionFactory.createSession(SessionType.TCP);
          session.setRemoteAddress(new InetSocketAddress("localhost", 1234));
          session.setSessionHandler(new SessionHandlerAdapter() {

          public void sessionStarted(Session session) throws Exception {
          // session started
          Buffer buffer = BufferFactory.wrap("Hello world!".getBytes());
          Packet packet = new DefaultPacket(buffer);
          session.send(packet);
          }

          public void objectSent(Session session, Object obj) throws Exception {
          // the packet have been sent, close current session
          session.close();
          };
          };
          session.start();

          在上面的代碼中,當(dāng)sessionStarted事件觸發(fā)后,即Session成功建立后,會發(fā)送"Hello world!"消息;當(dāng)objectSent事件觸發(fā)后,即消息發(fā)送成功,調(diào)用session.close()異步關(guān)閉連接。這些處理全部都是異步的,并 且僅僅使用了一個內(nèi)嵌類。

          SessionHandler中一共定義了如下幾個方法:

          • void sessionStarted(Session session) throws Exception //連接已建立
          • void sessionClosed(Session session) throws Exception //連接已關(guān)閉
          • void sessionTimeout(Session session) throws Exception //連接超時
          • void objectReceived(Session session, Object obj) throws Exception //接收到了對象
          • void objectSent(Session session, Object obj) throws Exception //發(fā)送了對象
          • void exceptionCaught(Session session, Throwable cause) //捕捉到異常

          如果通過session.setSessionTimeout方法設(shè)置了超時時間,則在指定的時間內(nèi)沒有接收或發(fā)送任何數(shù)據(jù)就會觸發(fā) sessionTimeout事件。發(fā)生了該事件并不代表著連接被關(guān)閉,應(yīng)用可以選擇關(guān)閉該空閑連接或者發(fā)送某些消息來檢測網(wǎng)絡(luò)連接是否暢通。默認(rèn)情況下 sessionTimeout為0,即從不觸發(fā)sessionTimeout事件。

          如果通過PacketEncoder發(fā)送了任何對象,則objectSent事件將被觸發(fā)(注:通過Session.flush方法發(fā)送的 Packet不會觸發(fā)objectSent事件,只有通過Session.send方法發(fā)送的對象才會觸發(fā)objectSent事件);通過 PacketDecoder接收到任何對象,則objectReceived事件將被觸發(fā),一般情況下應(yīng)用都通過監(jiān)聽該事件來對接收到的對象做相應(yīng)處理。

          exceptionCaught事件代表著session捕捉到了一個異常,這個異??赡苁怯捎诘讓泳W(wǎng)絡(luò)所導(dǎo)致,也可能是應(yīng)用在處理SessionHandler事件時所拋出來的異常。請注意,如果應(yīng)用在處理exceptionCaught事件中拋出運(yùn)行期異常,則該異常不會再度觸發(fā)exceptionCaught事件,否則可能出現(xiàn)死循環(huán)。 由于應(yīng)用無法處理底層網(wǎng)絡(luò)所引發(fā)的異常,所以在部署穩(wěn)定后,可以通過指定運(yùn)行期參數(shù)- Dnet.sf.cindy.disableInnerException來取消對底層網(wǎng)絡(luò)異常的分發(fā),或者判斷異常類型——所有的內(nèi)部異常都是從 SessionException基類繼承下來的,應(yīng)用可以根據(jù)這個特性來判斷是底層網(wǎng)絡(luò)出現(xiàn)了異常,還是SessionHandler或 SessionFilter中拋出了異常。

          SessionFilter/SessionFilterAdapter

          SessionFilter與SessionHandler有些類似,均用于處理Session的各種事件,不同的則是SessionFilter 先處理這些事件,并判斷是否需要把該事件傳遞給下一個SessionFilter。等到所有SessionFilter處理完成后,事件才會傳遞給 SessionHandler由其來處理。

          SessionFilterAdapter是SessionFilter的空實(shí)現(xiàn),默認(rèn)是把事件傳遞給下一個SessionFilter。SessionFilter中定義了如下幾個方法:

          • void sessionStarted(SessionFilterChain filterChain) throws Exception;
          • void sessionClosed(SessionFilterChain filterChain) throws Exception;
          • void sessionTimeout(SessionFilterChain filterChain) throws Exception;
          • void objectReceived(SessionFilterChain filterChain, Object obj) throws Exception;
          • void objectSent(SessionFilterChain filterChain, Object obj) throws Exception;
          • void exceptionCaught(SessionFilterChain filterChain, Throwable cause);
          • void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception;
          • void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception;
          • void packetSent(SessionFilterChain filterChain, Packet packet) throws Exception;

          其中前六種方法和SessionHandler的作用一致,后三種方法則是用于處理接收和發(fā)送的Packet的。

          可以看到SessionFilter可以算做SessionHandler的超集,那為什么需要引入SessionHandler呢?

          雖然SessionFilter和SessionHandler在表現(xiàn)形式上有很多接近的地方,但是在應(yīng)用邏輯上卻是處于不同的地位。 SessionHandler目的就是處理應(yīng)用最為核心的業(yè)務(wù)邏輯,這些邏輯都是基于Object的,和網(wǎng)絡(luò)層沒有太大的關(guān)系;而 SessionFilter和網(wǎng)絡(luò)層的關(guān)聯(lián)就比較大了,一般用來處理網(wǎng)絡(luò)相關(guān)的一些邏輯(如包壓縮/解壓縮、包加密/解密)或者是核心業(yè)務(wù)邏輯外的一些分 支邏輯(如記錄日志、黑名單處理)。

          基于SessionFilter,應(yīng)用可以做很多擴(kuò)展而不影響核心的業(yè)務(wù)處理(核心的業(yè)務(wù)處理應(yīng)該放在SessionHandler中)。比如數(shù)據(jù) 包相關(guān)的擴(kuò)展:加入SSLFilter,則所發(fā)送的數(shù)據(jù)都會被SSL編碼后才發(fā)送,接收的數(shù)據(jù)會先被解碼成明文才接收;加入ZipFilter,則可以壓 縮所發(fā)送的數(shù)據(jù),接收時再解壓縮。比如統(tǒng)計(jì)的擴(kuò)展:加入StatisticFilter,則可以統(tǒng)計(jì)發(fā)送和接收的字節(jié)數(shù),以及發(fā)送速率。比如ACL的擴(kuò) 展:加入AllowListFilter/BlockListFilter,則可以允許指定或限制某些IP地址訪問;加入LoginFilter,如果用 戶沒有登錄,則不把事件傳遞給后面處理業(yè)務(wù)邏輯的SessionFilter或SessionHandler。比如線程處理的擴(kuò)展:加入 ThreadPoolFilter,可以指定讓某個線程池來進(jìn)行后面事件的處理。比如日志記錄的擴(kuò)展:加入LogFilter,則可以記錄相應(yīng)的事件信 息。

          所列舉的這些只是一些基于SessionFilter的常見應(yīng)用,應(yīng)用可以根據(jù)自身的業(yè)務(wù)需要來進(jìn)行選擇。Cindy所推薦的實(shí)踐是將不同的業(yè)務(wù)邏輯分散到不同的SessionFilter中,在SessionHandler中只處理核心邏輯。

          在這里可以示范一個ZipFilter的偽碼:

          								public class ZipFilter extends SessionFilterAdapter {

          public void packetReceived(SessionFilterChain filterChain, Packet packet) throws Exception {
          Packet unzippedPacket = unzip(packet); //解壓縮
          super.packetReceived(filterChain, unzippedPacket);//把解壓縮后的包傳遞給下一個Filter
          }

          public void packetSend(SessionFilterChain filterChain, Packet packet) throws Exception {
          Packet zippedPacket = zip(packet); //壓縮
          super.packetSend(filterChain, zippedPacket); //把壓縮后的包傳遞給下一個Filter
          }
          }

          Buffer/Packet

          在前面的示例中我們已經(jīng)接觸到了Buffer/Packet,Buffer是數(shù)據(jù)流的抽象,Packet是網(wǎng)絡(luò)包的抽象。

          Java NIO中已經(jīng)提供了java.nio.ByteBuffer類用于表示字節(jié)流,為什么不直接使用java.nio.ByteBuffer?

          java.nio.ByteBuffer雖然是NIO中表示字節(jié)流的標(biāo)準(zhǔn)類,但是對于高負(fù)荷的網(wǎng)絡(luò)應(yīng)用而言,其設(shè)計(jì)上存在著以下缺陷:

          • ByteBuffer并不是一個接口,而是一個抽象類。最為關(guān)鍵的地方是其構(gòu)造函數(shù)為包級私有,這意味著我們無法繼承自ByteBuffer構(gòu)造子類。
          • ByteBuffer 僅僅是其所持有內(nèi)容的一個外部包裝,多個不同的ByteBuffer可以共享相同的內(nèi)容。比如通過slice、duplicate等方法構(gòu)造一個新的 ByteBuffer,該新ByteBuffer和原ByteBuffer共享的是同一份內(nèi)容。這就意味著無法構(gòu)造基于ByteBuffer的緩存機(jī)制。

          比如如下代碼:

          ByteBuffer buffer1 = ByteBuffer.allocate(100);
          ByteBuffer buffer2 = buffer1.slice();
          System.out.println(buffer1 == buffer2);
          System.out.println(buffer1.equals(buffer2));

          打印出來的都是false,而實(shí)際上兩個ByteBuffer共享的是同一份數(shù)據(jù)。在不經(jīng)意的情況下,可能發(fā)生應(yīng)用把兩個ByteBuffer返回緩存中,被緩存當(dāng)成是不同的對象進(jìn)行處理,可能破壞數(shù)據(jù)完整性。

          為什么Cindy使用自定義的Buffer接口?

          • Buffer是一個接口,如果對現(xiàn)有的實(shí)現(xiàn)類不滿意,應(yīng)用可以方便的加入自己的實(shí)現(xiàn)
          • 支持一系列的工具方法,比如indexOf/getString/putString/getUnsignedXXX等等,加入這些常用方法會給應(yīng)用帶來很大的方便
          • 可以非常方便的與nio中的ByteBuffer做轉(zhuǎn)換,并且效率上不會有太大損失。由于大部分的網(wǎng)絡(luò)類庫都是基于nio的ByteBuffer來設(shè)計(jì)的,這樣保證了兼容性
          • NIO的ByteBuffer無法表示ByteBuffer數(shù)組,而Cindy中提供了工具類把Buffer數(shù)組包裝成一個Buffer
          • Cindy的Buffer是可緩存的,對于高負(fù)荷的網(wǎng)絡(luò)應(yīng)用而言,這會帶來性能上優(yōu)勢

          在一般情況下,應(yīng)用應(yīng)該直接通過BufferFactory類來構(gòu)造Buffer實(shí)例。目前BufferFactory類中有如下方法:

          • Buffer wrap(byte[] array)
          • Buffer wrap(byte[] array, int offset, int length)
          • Buffer wrap(ByteBuffer buffer)
          • Buffer wrap(Buffer[] buffers)
          • Buffer allocate(int capacity)
          • Buffer allocate(int capacity, boolean direct)

          前四種方法都是包裝方法,將現(xiàn)有的字節(jié)數(shù)組、ByteBuffer以及Buffer數(shù)組包裝成一個單一的Buffer對象。第五和第六種方法是構(gòu)造 一個新的Buffer對象,新構(gòu)造Buffer對象的內(nèi)容是不確定的(java.nio.ByteBuffer.allocate得到的內(nèi)容是全0),可 能來自緩存或直接生成,依賴于對Buffer緩存的配置。一般情況下都是通過第五種方法構(gòu)造新的Buffer對象,通過運(yùn)行期參數(shù)- Dnet.sf.cindy.useDirectBuffer可以改變默認(rèn)行為,是構(gòu)造Non-Direct Buffer還是構(gòu)造Direct Buffer。

          默認(rèn)情況下生成的Buffer是可以被緩存,在調(diào)用了session.send/flush方法后,Buffer的內(nèi)容就會被釋放掉,或者手動調(diào)用 Buffer.release也能釋放Buffer所持有的內(nèi)容。通過Buffer.isReleased方法可以可以判斷當(dāng)前的Buffer是否被釋放 掉。對已經(jīng)釋放掉的Buffer進(jìn)行g(shù)et/put操作都會導(dǎo)致ReleasedBufferException,但是對 position/limit/capacity的操作還是有效的。如果應(yīng)用不希望Buffer的內(nèi)容被釋放,可以通過設(shè)置permanent屬性為 true來使得Buffer的內(nèi)容不被釋放。

          是否所有Buffer都需要手動調(diào)用release來釋放?

          這里有一個基本的原則:誰生成,誰釋放。比如應(yīng)用調(diào)用BufferFactory.allocate(1024)得到了一個Buffer實(shí)例,這個Buffer實(shí)例是由應(yīng)用生成出來的,那么應(yīng)用在使用完該Buffer實(shí)例后就應(yīng)該負(fù)責(zé)調(diào)用buffer.release將它釋放掉。

          不過框架為了提供應(yīng)用的方便,所有通過session.send/flush方法發(fā)送的Buffer都會在發(fā)送完成后被釋放。如果上面得到的這個 Buffer實(shí)例通過調(diào)用session.flush方法被發(fā)送出去,那么該Buffer實(shí)例就不需要再通過release方法手工釋放了。

          如果應(yīng)用不手動release自己生成出來的Buffer,也不會造成內(nèi)存泄漏,因?yàn)檫@些Buffer會通過Java的垃圾回收機(jī)制被回收掉。唯一的缺陷在于由于無法重用對象,性能可能會有少許的降低。

          舉個例子,假設(shè)你實(shí)現(xiàn)了一個特定的PacketDecoder:

          								public class MyPacketDecoder implements PacketDecoder {

          publicObject decode(Session session, Packet packet) throws Exception {
          Buffer content = packet.getContent();
          Buffer result = allocateBuffer();
          decodeContent(content, result);
          return result;
          }

          }

          可以看到,在MyPacketDecoder的實(shí)現(xiàn)當(dāng)中,并沒有對接收到的Buffer調(diào)用release方法,因?yàn)檫@個Buffer并不是應(yīng)用生 成出來的,不應(yīng)該由應(yīng)用來釋放。在該MyPacketDecoder實(shí)現(xiàn)中生成了一個新的Buffer實(shí)例,并當(dāng)作PacketDecoder的返回值。 這個新的實(shí)例是應(yīng)用生成出來的,則應(yīng)該由應(yīng)用來釋放,所以應(yīng)用應(yīng)該在相應(yīng)的objectReceived事件中釋放該Buffer實(shí)例,如:

          								public void objectReceived(Session session, Object obj) throws Exception {
          Buffer buffer = (Buffer) obj;
          try {
          process(buffer);
          } finally {
          buffer.release();
          }
          }

          Advanced Topic

          JMX支持

          通過運(yùn)行期屬性-Dnet.sf.cindy.useJmx可以開啟JMX支持(Java 5.0中內(nèi)置了JMX支持,如果運(yùn)行在Java 1.4版本中,則需要手工下載相應(yīng)類庫)。

          下圖是通過jconsole進(jìn)行JMX管理的示例:

          流量控制

          當(dāng)網(wǎng)絡(luò)接收的速度大于應(yīng)用的處理速度時,如果不控制接收速率,則收到的消息會在隊(duì)列中堆積,應(yīng)用無法及時處理而造成內(nèi)存溢出。Cindy 3.0中加入了流量控制功能,當(dāng)接收隊(duì)列中消息超過指定數(shù)量時,Cindy會放慢網(wǎng)絡(luò)接收速度。該功能對于防止內(nèi)存溢出以及應(yīng)用程序調(diào)試有很大幫助。

          可以通過運(yùn)行期參數(shù)-Dnet.sf.cindy.dispatcher.capacity來指定隊(duì)列中最多可以堆積的消息數(shù)。默認(rèn)值是1000,即隊(duì)列中消息數(shù)超過1000,網(wǎng)絡(luò)接收速度就會放緩,等到消息數(shù)少于1000后,接收速度就會恢復(fù)正常。

          Read packet size

          Read packet size是指接收時每次讀包所構(gòu)造的緩沖區(qū)大小。對于TCP而言,這個值影響到的僅僅是效率;對于UDP則影響到數(shù)據(jù)正確性。

          對于UDP應(yīng)用,假設(shè)發(fā)送方發(fā)送的包所攜帶數(shù)據(jù)大小為1000字節(jié),接收方的read packet size設(shè)置為600字節(jié),則后面400個字節(jié)會被丟棄,這樣
          構(gòu)造出來的邏輯包可能會不正確。

          對于TCP應(yīng)用,如果邏輯包是定長的,這個值最好也設(shè)為邏輯包的長度。該值太少,則可能導(dǎo)致在過多的包組合操作(比如邏輯包長度為1000字節(jié), read packet size設(shè)置為100,則可能需要把10次接收到的packet組合成一個packet才能構(gòu)造出一個邏輯包);該值太大,可能造成內(nèi)存的浪費(fèi)(不過由于 Buffer緩存的存在會緩解這一情況)。

          默認(rèn)的read packet size是8192字節(jié),可以通過運(yùn)行期屬性-Dnet.sf.cindy.session.readPacketSize來更改。如果對單獨(dú)的 Session進(jìn)行更改,則可以通過session的readPacketSize屬性進(jìn)行設(shè)置。

          Direct ByteBuffer vs Non-Direct ByteBuffer

          java.nio.ByteBuffer引入了這兩種類型的ByteBuffer,在Cindy中也有基于這兩種ByteBuffer的Buffer包裝類。那么這兩者的區(qū)別在什么地方?在什么環(huán)境下采用哪種類型的ByteBuffer會更有效率?

          Non-direct ByteBuffer內(nèi)存是分配在堆上的,直接由Java虛擬機(jī)負(fù)責(zé)垃圾收集,你可以把它想象成一個字節(jié)數(shù)組的包裝類,如下偽碼所示:

          HeapByteBuffer extends ByteBuffer {
          byte[] content;
          int position, limit, capacity;
          ......
          }

          而Direct ByteBuffer是通過JNI在Java虛擬機(jī)外的內(nèi)存中分配了一塊,該內(nèi)存塊并不直接由Java虛擬機(jī)負(fù)責(zé)垃圾收集,但是在Direct ByteBuffer包裝類被回收時,會通過Java Reference機(jī)制來釋放該內(nèi)存塊。如下偽碼所示:

          DirectByteBuffer extends ByteBuffer {
          long address;
          int position, limit, capacity;

          protected void finalize() throws Throwable{
          //釋放內(nèi)存塊,該段代碼僅僅用于演示,真正的Direct ByteBuffer并不是通過finalize來釋放的
          releaseAddress();
          ......
          }
          ......
          }

          除開以上的這些外,如果我們查找Java實(shí)現(xiàn)類的代碼,就可以了解到這兩者之間更深入的區(qū)別。比如在Sun的Java實(shí)現(xiàn)中,絕大部分 Channel類都是通過sun.nio.ch.IOUtil這個工具類和外界進(jìn)行通訊的,如FileChannel/SocketChannel等等。 簡單的用偽碼把write方法給表達(dá)出來(read方法也差不多,就不多做說明了):

          								int write(ByteBuffer src, ......) {
          if (src instanceof DirectBuffer)
          return writeFromNativeBuffer(...);
          ByteBuffer direct = getTemporaryDirectBuffer(src);
          writeFromNativeBuffer(direct,......);
          updatePosition(src);
          releaseTemporaryDirectBuffer(direct);
          }

          是的,在發(fā)送和接收前會把Non-direct ByteBuffer轉(zhuǎn)換為Direct ByteBuffer,然后再進(jìn)行相關(guān)的操作,最后更新原始ByteBuffer的position。這意味著什么?假設(shè)我們要從網(wǎng)絡(luò)中讀入一段數(shù)據(jù),再 把這段數(shù)據(jù)發(fā)送出去的話,采用Non-direct ByteBuffer的流程是這樣的:

          								
          網(wǎng)絡(luò) --> 臨時的Direct ByteBuffer --> 應(yīng)用 Non-direct ByteBuffer --> 臨時的Direct ByteBuffer --> 網(wǎng)絡(luò)

          而采用Direct ByteBuffer的流程是這樣的:

          網(wǎng)絡(luò) --> 應(yīng)用 Direct ByteBuffer --> 網(wǎng)絡(luò)

          可以看到,除開構(gòu)造和析構(gòu)臨時Direct ByteBuffer的時間外,起碼還能節(jié)約兩次內(nèi)存拷貝的時間。那么是否在任何情況下都采用Direct Buffer呢?

          答案是否定的。對于大部分應(yīng)用而言,兩次內(nèi)存拷貝的時間幾乎可以忽略不計(jì),而構(gòu)造和析構(gòu)Direct Buffer的時間卻相對較長。在JVM的實(shí)現(xiàn)當(dāng)中,某些方法會緩存一部分臨時Direct ByteBuffer,意味著如果采用Direct ByteBuffer僅僅能節(jié)約掉兩次內(nèi)存拷貝的時間,而無法節(jié)約構(gòu)造和析構(gòu)的時間。就用Sun的實(shí)現(xiàn)來說,write(ByteBuffer)和 read(ByteBuffer)方法都會緩存臨時Direct ByteBuffer,而write(ByteBuffer[])和read(ByteBuffer[])每次都生成新的臨時Direct ByteBuffer。

          根據(jù)這些區(qū)別,在選擇ByteBuffer類型上有如下的建議:

          • 如果你做中小規(guī)模的應(yīng)用(在這里,應(yīng)用大小是按照使用ByteBuffer的次數(shù)和規(guī)模來做劃分的),并不在乎這些細(xì)節(jié)問題,請選擇Non-direct ByteBuffer
          • 如果采用Direct ByteBuffer后性能并沒有出現(xiàn)你所期待的變化,請選擇Non-direct ByteBuffer
          • 如果沒有Direct ByteBuffer Pool,盡量不要使用Direct ByteBuffer
          • 除非你確定該ByteBuffer會長時間存在,并且和外界有頻繁交互,可采用Direct ByteBuffer
          • 如 果采用Non-direct ByteBuffer,那么采用非聚集(gather)的write/read(ByteBuffer)效果反而*可能*超出聚集的write/read (ByteBuffer[]),因?yàn)榫奂膚rite/read的臨時Direct ByteBuffer是非緩存的(在Sun的實(shí)現(xiàn)上是這樣,其他的實(shí)現(xiàn)則不確定)

          基本上,采用Non-direct ByteBuffer總是對的!因?yàn)閮?nèi)存拷貝需要的開銷對大部分應(yīng)用而言都可以忽略不計(jì)。在Cindy中,一般的應(yīng)用只需要通過 BufferFactory.allocate方法來得到Buffer實(shí)例即可,默認(rèn)設(shè)置下采用的是Non-Direct Buffer;通過BufferFactory.wrap方法包裝字節(jié)數(shù)組或ByteBuffer得到的Buffer類也有很高的效率;只有通過 BufferFactory.wrap(Buffer[])方法目前還處于實(shí)驗(yàn)階段,其效率不一定比生成一個大的Buffer,然后拷貝現(xiàn)有內(nèi)容更快。如 果應(yīng)用非常注重效率,要使用該方法上要多加注意。

          默認(rèn)參數(shù)設(shè)置

          對Cindy中一些默認(rèn)參數(shù)的配置可以通過以下兩種方式:

          • 配置文件配置

          Cindy在啟動時會在當(dāng)前classpath上尋找cindy.properties文件,如果找到后會把該屬性配置文件讀入到緩存中。在該 properties中的配置無需net.sf.cindy.前綴,即如果要配置net.sf.cindy.enableJmx=true并且使用 DirectBuffer,則只需要在cindy.properties中加入:

          enableJmx=true
          useDirectBuffer=true
          • 運(yùn)行期屬性配置

          運(yùn)行時通過-D參數(shù)指定的配置,如果和上面配置文件中的Key相同,則會覆蓋配置文件的配置。通過-D參數(shù)指定配置時不能省略net.sf.cindy.前綴。

          當(dāng)前版本全部可以配置的屬性請參見Cindy源代碼包的readme.txt。

          posted on 2007-01-19 00:09 苦笑枯 閱讀(2264) 評論(0)  編輯  收藏 所屬分類: Java
          收藏來自互聯(lián)網(wǎng),僅供學(xué)習(xí)。若有侵權(quán),請與我聯(lián)系!

          <2007年1月>
          31123456
          78910111213
          14151617181920
          21222324252627
          28293031123
          45678910

          常用鏈接

          留言簿(2)

          隨筆分類(56)

          隨筆檔案(56)

          搜索

          •  

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 天全县| 望江县| 琼海市| 南投县| 波密县| 万全县| 资阳市| 敦化市| 西乌珠穆沁旗| 浠水县| 阿合奇县| 休宁县| 介休市| 江安县| 乌什县| 井冈山市| 宁阳县| 商丘市| 桂平市| 庆城县| 裕民县| 共和县| 汉中市| 桓仁| 天峻县| 三门县| 灵寿县| 库尔勒市| 高州市| 霍邱县| 开化县| 贞丰县| 芦溪县| 巍山| 荥经县| 泽州县| 岑溪市| 绥芬河市| 长垣县| 杂多县| 彩票|