夢(mèng)幻之旅

          DEBUG - 天道酬勤

             :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            671 隨筆 :: 6 文章 :: 256 評(píng)論 :: 0 Trackbacks
          最近做的一個(gè)項(xiàng)目用到了開(kāi)源的C/S應(yīng)用的服務(wù)器框架MINA,當(dāng)初做的時(shí)候資料非常少,只能自己不停的測(cè)試,總結(jié)出了一些規(guī)律經(jīng)驗(yàn)。

          從網(wǎng)上看的資料上看,這個(gè)服務(wù)器框架還是比較穩(wěn)定和支持的并發(fā)數(shù)還是很不錯(cuò)的,不過(guò)沒(méi)有準(zhǔn)確的數(shù)據(jù),而且我做完的時(shí)候也沒(méi)有拿到真正的實(shí)際環(huán)境中測(cè)試過(guò),用的時(shí)候也發(fā)現(xiàn)了很多優(yōu)點(diǎn)和缺點(diǎn),使用者可以自己去根據(jù)自己的使用需求去衡量是否使用該框架。

          服務(wù)器是商業(yè)系統(tǒng)很重要的一部分,主要負(fù)責(zé)數(shù)據(jù)采集,文件分發(fā),與端機(jī)的通信,和自動(dòng)作業(yè)等任務(wù),服務(wù)器大多是24小時(shí)運(yùn)行的,因此服務(wù)器的實(shí)現(xiàn)必須強(qiáng)壯、穩(wěn)定、安全,而速度雖然也是很重要,不過(guò)最重要的還是前三者。服務(wù)器框架MINA就是要為這樣的服務(wù)器提供了一個(gè)網(wǎng)絡(luò)應(yīng)用框架,當(dāng)然這樣的服務(wù)器框架也可以自己去實(shí)現(xiàn)。MINA為我們封裝了socket的底層通信實(shí)現(xiàn),提供了日志,線程池等功能,使用起來(lái)非常簡(jiǎn)單、方便。

          MINA是一個(gè)異步框架,是通過(guò)網(wǎng)絡(luò)事件激發(fā)的,它包含兩層:IO層和協(xié)議層。首先介紹IO層,要說(shuō)明的是我用的版本是0.
          8.2,可能不同版本會(huì)稍有不同。

          Client產(chǎn)生一個(gè)底層IO事件,比如說(shuō)連接和發(fā)送數(shù)據(jù)包,IoAcceptor執(zhí)行所有底層IO,將他們翻譯成抽象的IO事件,接著這里可以添加(也可以部添加)一個(gè)IoFilters對(duì)IO事件進(jìn)行過(guò)濾,并把翻譯過(guò)的事件或過(guò)濾過(guò)的事件和關(guān)聯(lián)的IoSession 發(fā)送給IoHandler。IoSession是一個(gè)有效的網(wǎng)絡(luò)連接會(huì)話,此會(huì)話將一直保持連接,除非網(wǎng)絡(luò)斷開(kāi)或用戶主動(dòng)斷開(kāi)連接(session.close()),用戶可以通過(guò)IoSession獲得有關(guān)該會(huì)話連接的信息和配置會(huì)話的對(duì)象和屬性;IoHandler是網(wǎng)絡(luò)事件的監(jiān)聽(tīng)器,也就是說(shuō)當(dāng)有網(wǎng)絡(luò)事件發(fā)生時(shí)會(huì)通知IoHandler,用戶不用去主動(dòng)接受數(shù)據(jù)。用戶只要實(shí)現(xiàn)此接口愛(ài)干嗎干嗎去吧。IoFilter:Io過(guò)濾器,對(duì)Io事件進(jìn)行過(guò)濾,比如添加日志過(guò)濾器和線程池過(guò)濾器。

          使用說(shuō)明:

          import java.util.logging.Level;

          import org.apache.mina.common.ByteBuffer;
          import org.apache.mina.common.IdleStatus;
          import org.apache.mina.common.SessionConfig;
          import org.apache.mina.io.IoHandlerAdapter;
          import org.apache.mina.io.IoSession;
          import org.apache.mina.io.socket.SocketSessionConfig;
          import org.apache.mina.util.SessionLog;

          public class ServerHandler extends IoHandlerAdapter {

            
          public ServerHandler() {

           }


           
          public void dataRead(IoSession session, ByteBuffer buffer) throws Exception {

          //當(dāng)有數(shù)據(jù)讀入時(shí)此方法被調(diào)用,數(shù)據(jù)封裝在ByteBuffer中,可以用以下方法對(duì)出buffer的數(shù)據(jù),ByteBuffer的數(shù)據(jù)讀出后內(nèi)存中就沒(méi)有了。
          //String message = "";
          //  byte[] bytes = new byte[rb.remaining()];
          //  int j = 0;                                                                   //  while (rb.hasRemaining()) {
           
          //  bytes[j++] = rb.get();
           
          // }
           
          //  message = new String(bytes); 

          //接著可以進(jìn)行邏輯處理

          }


           
          public void dataWritten(IoSession session, Object mark) throws Exception {

          //當(dāng)數(shù)據(jù)被寫(xiě)入通道時(shí)此方法被調(diào)用,實(shí)際就是調(diào)用了session.write(IoSession,Object)方法
            SessionLog.log(Level.INFO,session,mark.toString());//必要時(shí)打印所寫(xiě)入的內(nèi)容,mark的內(nèi)容就是session.write(session,mark)中的第二個(gè)參數(shù)
           }


           
          public void exceptionCaught(IoSession session, Throwable arg1)
             
          throws Exception {

          //當(dāng)出現(xiàn)異常時(shí)此方法被調(diào)用,從而進(jìn)行各種異常處理,該方法可以捕捉網(wǎng)絡(luò)異常(如連接非正常關(guān)閉)和所有其他方法產(chǎn)生的異常,這里要注意如果客戶端要保持與服務(wù)器端的連接時(shí)不要在這里馬上重新連接不然會(huì)拋出CancelKeyException運(yùn)行期異常直接導(dǎo)致程序死掉(特別是與服務(wù)器端有超過(guò)兩個(gè)連接時(shí)一定會(huì)發(fā)生并且此異常無(wú)法捕獲),建議的方法是啟動(dòng)一個(gè)單獨(dú)的線程來(lái)完成與服務(wù)器端的重新連接,還有要注意的是如果網(wǎng)絡(luò)是正常關(guān)閉的,比如說(shuō)是客戶端正常關(guān)閉連接,而此時(shí)服務(wù)器端是不愿意關(guān)閉的話,這個(gè)異常本方法是捕捉不了的,因此只能在session.close()方法中處理這種情況。
            session.close();
            
           }


           
          public void sessionClosed(IoSession session) throws Exception {

          //當(dāng)網(wǎng)絡(luò)連接被關(guān)閉是此方法被調(diào)用
            SessionLog.log(Level.INFO,session,"Close a Session");//必要時(shí)打印出信息
           }


           
          public void sessionCreated(IoSession session) throws Exception {

          //當(dāng)網(wǎng)絡(luò)連接被創(chuàng)建時(shí)此方法被調(diào)用(這個(gè)肯定在sessionOpened(IoSession session)方法之前被調(diào)用),這里可以對(duì)Socket設(shè)置一些網(wǎng)絡(luò)參數(shù)
            SessionConfig cfg = session.getConfig();
            
          if (cfg instanceof SocketSessionConfig) {
             ((SocketSessionConfig) cfg).setSessionReceiveBufferSize(
          2048);
             ((SocketSessionConfig) cfg).setKeepAlive(
          true);
             ((SocketSessionConfig) cfg).setSoLinger(
          true0);
             ((SocketSessionConfig) cfg).setTcpNoDelay(
          true);
             ((SocketSessionConfig) cfg).setWriteTimeout(
          1000 * 5);
            }

           }


           
          public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
            
          // 當(dāng)網(wǎng)絡(luò)通道空閑時(shí)此方法被調(diào)用,在這里可以判斷是讀空閑、寫(xiě)空閑還是兩個(gè)都空閑,以便做出正確的處理

          一半的網(wǎng)絡(luò)通訊程序都要與服務(wù)器端保持長(zhǎng)連接,所以這里可以發(fā)一下網(wǎng)絡(luò)測(cè)試數(shù)據(jù)以保持與服務(wù)器端的連接
           }


           
          public void sessionOpened(IoSession session) throws Exception {

          //當(dāng)網(wǎng)絡(luò)連接被打開(kāi)時(shí)此方法被調(diào)用,這里可以對(duì)session設(shè)置一些參數(shù)或者添加一些IoFilter的實(shí)現(xiàn),也可以對(duì)客戶端做一些認(rèn)證之類(lèi)的工作
            session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
           }


          }


          //啟動(dòng)監(jiān)聽(tīng)連接的服務(wù)器

          import org.apache.mina.common.*;
          import org.apache.mina.io.*;
          import org.apache.mina.io.filter.*;
          import org.apache.mina.registry.*;

          public class Server{
              
          /** Choose your favorite port number. */
              
          private static final int PORT = 8080;
              
              
          public static void main( String[] args ) throws Exception
              
          {
                  ServiceRegistry registry 
          = new SimpleServiceRegistry();

          //可以添加各種過(guò)濾器,比如線程池過(guò)濾器,增加一個(gè)線程池處理來(lái)自不同的連接

             IoAcceptor ioAcceptor 
          = registry.getIoAcceptor();
             IoThreadPoolFilter ioThreadPoolFilter 
          = new IoThreadPoolFilter();
             ioThreadPoolFilter.setMaximumPoolSize(
          10);
             ioThreadPoolFilter.start();
             ioAcceptor.getFilterChain().addLast(
          "IoThreadPool",
               ioThreadPoolFilter);
                 
          // Bind
                  Service service = new Service( "serviceName",
          TransportType.SOCKET, PORT );
                  registry.bind( service, 
          new ServerHandler() );

                  System.out.println( 
          "Listening on port " + PORT );
              }

          }


          //如果是連接服務(wù)器的可以如下啟動(dòng)連接請(qǐng)求

          import org.apache.mina.io.filter.IoThreadPoolFilter;
          import org.apache.mina.io.socket.SocketConnector;
          import java.net.InetSocketAddress;

          public class Client{
                
          public static void main( String[] args ) throws Exception
              
          {
              
          private static final int CONNECT_TIMEOUT = 3//設(shè)置超時(shí)連接時(shí)間

          //可以添加各種過(guò)濾器,比如線程池過(guò)濾器,增加一個(gè)線程池處理來(lái)自不同的連接

          IoThreadPoolFilter ioThreadPoolFilter 
          = new IoThreadPoolFilter();
             ioThreadPoolFilter.setMaximumPoolSize(
          10);
             ioThreadPoolFilter.start();
             SocketConnector connector 
          = new SocketConnector();

             connector.getFilterChain().addFirst(
          "threadPool",
               ioThreadPoolFilter);

          //初始化客戶端的監(jiān)聽(tīng)處理器
             ClientHandler clientHandler = new ClientHandler();  

            InetSocketAddress address 
          = new InetSocketAddress("serverIp",serverPort);

            
          try {

          connector.connect(address, CONNECT_TIMEOUT,
                  clientHandler);      

          System.out.println(
          "connect sucessfully!");

             }
           catch(Exception e){

              System.err.println(
          "Failed to connect.");

          }
             
          }


          如果一個(gè)協(xié)議非常復(fù)雜,如果只用一個(gè)Io層是非常復(fù)雜的,因?yàn)镮O層沒(méi)有幫助你分離‘message解析’和‘實(shí)際的業(yè)務(wù)邏輯,MINA提供了一個(gè)協(xié)議層來(lái)解決這個(gè)問(wèn)題。

           

          使用協(xié)議層必須實(shí)現(xiàn)5個(gè)接口:ProtocolHandler, ProtocolProvider, ProtocolCodecFactory, ProtocolEncoder, 和 ProtocolDecoder:

          第一步:實(shí)現(xiàn)ProtocolDecoder和ProtocolEncoder,當(dāng)有IO事件時(shí)便先調(diào)用這兩個(gè)類(lèi)的方法

          import org.apache.mina.common.ByteBuffer;
          import org.apache.mina.protocol.ProtocolDecoderOutput;
          import org.apache.mina.protocol.ProtocolSession;
          import org.apache.mina.protocol.ProtocolViolationException;
          import org.apache.mina.protocol.codec.MessageDecoder;
          import org.apache.mina.protocol.codec.MessageDecoderResult;
          import java.util.*;

          public class ServerDecoder implements MessageDecoder {

           
          public ServerTranInfoDecoder() {

           }



           
          public MessageDecoderResult decodable(ProtocolSession session, ByteBuffer in) {

          //對(duì)接受的數(shù)據(jù)判斷是否與協(xié)議相同,如果相同返回MessageDecoderResult.OK,否則返回MessageDecoderResult.NOT_OK,這里如果要從ByteBuffer讀出數(shù)據(jù),需要重新用ByteBuffer.put(ByteBuffer)放回內(nèi)存中,以便decode方法使用;
            return MessageDecoderResult.OK;
            
          return MessageDecoderResult.NOT_OK;
           }


            
          public MessageDecoderResult decode(ProtocolSession session, ByteBuffer in,
             ProtocolDecoderOutput out) 
          throws ProtocolViolationException {

          //根據(jù)協(xié)議將介紹到的數(shù)據(jù)(放在ByteBuffer中)組裝成相對(duì)應(yīng)的實(shí)體,調(diào)用out.write(Object)方法發(fā)送給協(xié)議層進(jìn)行業(yè)務(wù)邏輯的處理,如果成功返回MessageDecoderResult.OK,否則返回MessageDecoderResult.NOT_OK;

          out.write(object);

            }


          import org.apache.mina.common.ByteBuffer;
          import org.apache.mina.protocol.ProtocolEncoderOutput;
          import org.apache.mina.protocol.ProtocolSession;
          import org.apache.mina.protocol.ProtocolViolationException;
          import org.apache.mina.protocol.codec.MessageEncoder;
          import java.util.*;

          public class ServerEncoder implements MessageEncoder{
           
          public static Set TYPES;
           
              
          public ServerEncoder(){
               
              }

             
              
          public Set getMessageTypes(){
               HashSet set 
          = new HashSet();
               set.add(Send.
          class);  //增加要進(jìn)行編碼的實(shí)體類(lèi)
               TYPES = Collections.unmodifiableSet( set );
                  
          return TYPES; 
              }

           

          public void encode( ProtocolSession session, Object message, ProtocolEncoderOutput out )
                                                              
          throws ProtocolViolationException {
              
          //將回應(yīng)報(bào)文實(shí)體message編碼層returnStr后發(fā)送到客戶端  

            
          byte[] bytes = returnStr.getBytes();
            ByteBuffer rb 
          = ByteBuffer.allocate(bytes.length);
            rb.put(bytes);
            rb.flip();
            out.write(rb);
             }

          }


          第二步:實(shí)現(xiàn)ProtocolCodecFactory

          import org.apache.mina.protocol.codec.DemuxingProtocolCodecFactory;

          public class ServerProtocolCodecFactory extends DemuxingProtocolCodecFactory {
           
          public ServerProtocolCodecFactory(boolean server) {
            
          if (server) {
             
          super.register(ServerDecoder.class);
             
          super.register(ServerEncoder.class);
            }

           }

          }


          第三步:實(shí)現(xiàn)ProtocolHandler,在有IO事件發(fā)生后,經(jīng)過(guò)decode和encode的處理后就把協(xié)議實(shí)體交個(gè)這個(gè)處理器進(jìn)行業(yè)務(wù)邏輯的處理,因此實(shí)現(xiàn)了協(xié)議解釋和業(yè)務(wù)邏輯的分離,它與IoHandler非常相似,不過(guò)這里處理的是經(jīng)過(guò)編碼與解碼后的對(duì)象實(shí)體。

          import org.apache.mina.common.IdleStatus;
          import org.apache.mina.protocol.ProtocolSession;
          import org.apache.mina.util.SessionLog;
          import org.apache.mina.protocol.handler.DemuxingProtocolHandler;

          public class ServerSessionHandler extends DemuxingProtocolHandler {
           
          public ServerSessionHandler() {
           }


           
          public void sessionCreated(ProtocolSession session) throws Exception {
           }


           
          public void sessionOpened(ProtocolSession session) throws Exception {
              session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 
          60);
            }


           
          public void sessionClosed(ProtocolSession session) {
            }


            
          public void messageReceived(ProtocolSession session, Object message)
             
          throws Exception {

          //根據(jù)解碼后的message,進(jìn)行業(yè)務(wù)邏輯的處理
          session.close();
             }


           
          public void messageSent(ProtocolSession session, Object message) {
             }


            
          public void sessionIdle(ProtocolSession session, IdleStatus status)
             
          throws Exception {

          //網(wǎng)絡(luò)出現(xiàn)空閑時(shí)進(jìn)行處理,并關(guān)掉連接
              session.close();
            }


           
          public void exceptionCaught(ProtocolSession session, Throwable cause) {
            cause.printStackTrace();

          //處理所有Handler方法拋出的異常,和Mina架構(gòu)拋出的異常,并關(guān)掉連接
              session.close();
           }

          }


          第四步:實(shí)現(xiàn)ProtocolProvider

          import org.apache.mina.protocol.*;

          public class ServerProtocolProvider implements ProtocolProvider {
           
          private static final ProtocolCodecFactory CODEC_FACTORY = new SemsProtocolCodecFactory(
             
          true);

           
          private static final ProtocolHandler HANDLER = new ServerSessionHandler();

           
          public ServerProtocolProvider() {

           }


           
          public ProtocolCodecFactory getCodecFactory() {
            
          return CODEC_FACTORY;
           }


           
          public ProtocolHandler getHandler() {
            
          return HANDLER;
           }

          }


          這樣協(xié)議層便完成了,啟動(dòng)時(shí)跟IO層的差不多,不過(guò)我們還可以在IO層和協(xié)議層用兩個(gè)線程池,如下:

          public class Server {
           
          //服務(wù)器的監(jiān)聽(tīng)端口號(hào)
           public static final int SERVER_PORT = 8000;

           
          public static void main(String[] args) {
              
              
          //進(jìn)行服務(wù)器的相關(guān)配置
             ServiceRegistry registry = new SimpleServiceRegistry();
             IoProtocolAcceptor protocolAcceptor 
          = (IoProtocolAcceptor) registry
               .getProtocolAcceptor(TransportType.SOCKET);
             ProtocolThreadPoolFilter protocolThreadPoolFilter 
          = new ProtocolThreadPoolFilter();
             protocolThreadPoolFilter.setMaximumPoolSize(
          10);
             protocolThreadPoolFilter.start();
             protocolAcceptor.getFilterChain().addLast(
          "IoProtocolThreadPool",
               protocolThreadPoolFilter);

             IoAcceptor ioAcceptor 
          = protocolAcceptor.getIoAcceptor();
             IoThreadPoolFilter ioThreadPoolFilter 
          = new IoThreadPoolFilter();
             ioThreadPoolFilter.setMaximumPoolSize(
          10);
             ioThreadPoolFilter.start();
             ioAcceptor.getFilterChain().addLast(
          "IoThreadPool",
               ioThreadPoolFilter);

             Service service 
          = new Service("TranServer", TransportType.SOCKET,
               SERVER_PORT);

             
          //綁定了剛剛實(shí)現(xiàn)的ServerProtocolProvider
             registry.bind(service, new ServerProtocolProvider());

             }


          整個(gè)MINA框架經(jīng)常用到的就是這些了,這樣的事件觸發(fā)框架和兩層框架使用起來(lái)非常方便,不過(guò)這種異步框架還是有些非常明顯的缺陷:

          第一,MINA只會(huì)為每個(gè)Session分配一個(gè)線程,也就是只能一個(gè)一個(gè)事件按順序執(zhí)行,就算你在某個(gè)方法執(zhí)行時(shí)產(chǎn)生了新的事件,比如收到新的數(shù)據(jù),MINA也會(huì)先將該事件緩沖起來(lái),所以你在執(zhí)行某個(gè)方法時(shí)是不可能執(zhí)行dataRead方法的,所以MINA框架是不會(huì)阻塞的,要想在一個(gè)邏輯方法中實(shí)現(xiàn)交互是實(shí)現(xiàn)不了的,因此要想出另外的實(shí)現(xiàn)方法。

          第二,如果客戶端發(fā)完一個(gè)數(shù)據(jù)給服務(wù)器就想馬上得到回復(fù),而不等整個(gè)業(yè)務(wù)邏輯執(zhí)行完,也是實(shí)現(xiàn)不到的,因?yàn)镸INA框架要將整個(gè)接收事件處理完了,再把回復(fù)信息發(fā)給客戶端。

          第三,如果MINA是作為服務(wù)器端等待連接的,當(dāng)客戶端正常關(guān)閉后業(yè)務(wù)邏輯也可繼續(xù)正常執(zhí)行,但是如果MINA是連接服務(wù)器的客戶端,則當(dāng)服務(wù)器關(guān)閉后,MINA的session也會(huì)關(guān)閉。

          最后要說(shuō)明的是MINA使用的線程池是用Leader
          /Followers Tread Pool實(shí)現(xiàn)的,默認(rèn)最大支持2G的線程。當(dāng)然MINA框架是開(kāi)源的,用戶可以根據(jù)自己的需要改寫(xiě)代碼,而其MINA的功能也是不斷可以擴(kuò)展的。

          以上是我使用MINA的經(jīng)驗(yàn)總結(jié),其實(shí)MINA的相關(guān)文檔和例子也介紹了很多了,我這里算是一個(gè)總結(jié)吧,不過(guò)有很多地方只是我的個(gè)人見(jiàn)解,不一定正確,如果有不對(duì)的,希望高手可以提出。
          http://riddickbryant.iteye.com/blog/564415
          posted on 2011-09-05 00:40 HUIKK 閱讀(4621) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): java Net
          主站蜘蛛池模板: 上思县| 缙云县| 泸西县| 尚志市| 前郭尔| 河东区| 安多县| 嘉善县| 江永县| 甘谷县| 东兴市| 东宁县| 临漳县| 大港区| 岳普湖县| 德阳市| 新龙县| 镇江市| 滕州市| 贵溪市| 铜山县| 北宁市| 建平县| 措美县| 永春县| 福海县| 雷州市| 忻州市| 新和县| 常宁市| 突泉县| 永仁县| 固安县| 札达县| 漠河县| 松滋市| 甘孜| 克东县| 黄冈市| 图木舒克市| 宜章县|