hengheng123456789

            BlogJava :: 首頁 :: 聯(lián)系 :: 聚合  :: 管理
            297 Posts :: 68 Stories :: 144 Comments :: 0 Trackbacks
           

          MINA Beginning

          http://mina.apache.org/

          http://mina.apache.org/documentation.html

          1.         傳統(tǒng)Socket:阻塞式通信

          java傳統(tǒng)socket技術(shù)中,每建立一個Socket連接時,須同時創(chuàng)建一個新線程對該Socket進行單獨通信(采用阻塞的方式通信)。

          這種方式具有很高的響應(yīng)速度,并且控制起來也很簡單,在連接數(shù)較少的時候非常有效,但是如果對每一個連接都產(chǎn)生一個線程無疑是對系統(tǒng)資源的一種浪費,如果連接數(shù)較多將會出現(xiàn)資源不足的情況。下面的代碼就說明了這一點。

          a)         server code:

          package Socket;

          import java.io.BufferedReader;

          import java.io.IOException;

          import java.io.InputStreamReader;

          import java.net.ServerSocket;

          import java.net.Socket;

          public class MultiUserServer extends Thread {

                 private Socket client;

                 public MultiUserServer(Socket c) {

                        this.client = c;

                 }

                 public void run() {

                        try {

                               BufferedReader in = new BufferedReader(new InputStreamReader(client

                                             .getInputStream()));

                               // Mutil User but can't parallel

                               while (true) {

                                      String str = in.readLine();

                                      System.out.println("receive message: " + str);

                                      if (str.equals("end"))

                                             break;

                               }

                               client.close();

                        } catch (IOException ex) {

                        }

                 }

                 public static void main(String[] args) throws IOException {

                        int port = 10086;

                        if (args.length > 0)

                               port = Integer.parseInt(args[0]);

                        ServerSocket server = new ServerSocket(port);

                        System.out.println("the server socket application is created!");

                        while (true) {

                               // transfer location change Single User or Multi User

                               MultiUserServer mu = new MultiUserServer(server.accept());

                               mu.start();

                        }

                 }

          }

          b)        client code:

          package Socket;

          import java.io.BufferedReader;

          import java.io.InputStreamReader;

          import java.io.PrintWriter;

          import java.net.Socket;

          public class Client {

                 static Socket server;

                 public static void main(String[] args) throws Exception {

                        String host = "192.168.0. 10";

                        int port = 10086;

                        if (args.length > 1) {

                               host = args[0];

                               port = Integer.parseInt(args[1]);

                        }

                        System.out.println("connetioning:" + host + ":" + port);

                        server = new Socket(host, port);

                        PrintWriter out = new PrintWriter(server.getOutputStream());

                        BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));

                        while (true) {

                               String str = wt.readLine();

                               out.println(str);

                               out.flush();

                               if (str.equals("end")) {

                                      break;

                               }

                        }

                        server.close();

                 }

          }

          2.         nio socket: 非阻塞通訊模式

          a)         NIO 設(shè)計背后的基石:反應(yīng)器模式

          反應(yīng)器模式: 用于事件多路分離和分派的體系結(jié)構(gòu)模式。

          反應(yīng)器模式的核心功能如下:

          n         將事件多路分用

          n         將事件分派到各自相應(yīng)的事件處理程序

          b)        NIO 的非阻塞 I/O 機制是圍繞 選擇器 通道構(gòu)建的。

          選擇器(Selector) Channel 的多路復(fù)用器。 Selector 類將傳入客戶機請求多路分用并將它們分派到各自的請求處理程序。

          通道(Channel ):表示服務(wù)器和客戶機之間的一種通信機制,一個通道負責處理一類請求/事件。

          簡單的來說:

          NIO是一個基于事件的IO架構(gòu),最基本的思想就是:有事件我會通知你,你再去做與此事件相關(guān)的事情。而且NIO主線程只有一個,不像傳統(tǒng)的模型,需要多個線程以應(yīng)對客戶端請求,也減輕了JVM的工作量。

          c)        Channel注冊至Selector以后,經(jīng)典的調(diào)用方法如下:

                  while (somecondition) {

                      int n = selector.select(TIMEOUT);

                      if (n == 0)

                          continue;

                      for (Iterator iter = selector.selectedKeys().iterator(); iter

                              .hasNext();) {

                          if (key.isAcceptable())

                              doAcceptable(key);

                          if (key.isConnectable())

                              doConnectable(key);

                          if (key.isValid() && key.isReadable())

                              doReadable(key);

                          if (key.isValid() && key.isWritable())

                              doWritable(key);

                          iter.remove();

                      }

                  }

          NIO 有一個主要的類Selector,這個類似一個觀察者,只要我們把需要探知的socketchannel告訴Selector,我們接著做別的事情,當有事件發(fā)生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的socketchannel,然后,我們從這個Channel中讀取數(shù)據(jù),放心,包準能夠讀到,接著我們可以處理這些數(shù)據(jù)。

          Selector內(nèi)部原理實際是在做一個對所注冊的channel的輪詢訪問,不斷的輪詢(目前就這一個算法),一旦輪詢到一個channel有所注冊的事情發(fā)生,比如數(shù)據(jù)來了,他就會站起來報告,交出一把鑰匙,讓我們通過這把鑰匙來讀取這個channel的內(nèi)容。

          d)        Sample01

          package NIO;

          // ==================== Program Discription =====================

          // 程序名稱:示例12-14 : SocketChannelDemo.java

          // 程序目的:學習Java NIO#SocketChannel

          // ==============================================================

          import java.net.InetSocketAddress;

          import java.net.ServerSocket;

          import java.nio.ByteBuffer;

          import java.nio.channels.SelectableChannel;

          import java.nio.channels.SelectionKey;

          import java.nio.channels.Selector;

          import java.nio.channels.ServerSocketChannel;

          import java.nio.channels.SocketChannel;

          import java.util.Iterator;

          public class SocketChannelDemo {

                 public static int PORT_NUMBER = 23;// 監(jiān)聽端口

                 static String line = "";

                 ServerSocketChannel serverChannel;

                 ServerSocket serverSocket;

                 Selector selector;

                 private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

                 public static void main(String[] args) throws Exception {

                        SocketChannelDemo server = new SocketChannelDemo();

                        server.init(args);

                        server.startWork();

                 }

                 public void init(String[] argv) throws Exception {

                        int port = PORT_NUMBER;

                        if (argv.length > 0) {

                               port = Integer.parseInt(argv[0]);

                        }

                        System.out.println("Listening on port " + port);

                        // 分配一個ServerSocketChannel

                        serverChannel = ServerSocketChannel.open();

                        // ServerSocketChannel里獲得一個對應(yīng)的Socket

                        serverSocket = serverChannel.socket();

                        // 生成一個Selector

                        selector = Selector.open();

                        // Socket綁定到端口上

                        serverSocket.bind(new InetSocketAddress(port));

                        // serverChannel為非bolck

                        serverChannel.configureBlocking(false);

                        // 通過Selector注冊ServerSocetChannel

                        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

                 }

                 public void startWork() throws Exception {

                        while (true) {

                               int n = selector.select();// 獲得IO準備就緒的channel數(shù)量

                               if (n == 0) {

                                      continue; // 沒有channel準備就緒,繼續(xù)執(zhí)行

                               }

                               // 用一個iterator返回Selectorselectedkeys

                               Iterator it = selector.selectedKeys().iterator();

                               // 處理每一個SelectionKey

                               while (it.hasNext()) {

                                      SelectionKey key = (SelectionKey) it.next();

                                      // 判斷是否有新的連接到達

                                      if (key.isAcceptable()) {

                                             // 返回SelectionKeyServerSocketChannel

                                             ServerSocketChannel server = (ServerSocketChannel) key

                                                           .channel();

                                             SocketChannel channel = server.accept();

                                             registerChannel(selector, channel, SelectionKey.OP_READ);

                                             doWork(channel);

                                      }

                                      // 判斷是否有數(shù)據(jù)在此channel里需要讀取

                                      if (key.isReadable()) {

                                             processData(key);

                                      }

                                      // 刪除 selectedkeys

                                      it.remove();

                               }

                        }

                 }

                 protected void registerChannel(Selector selector,

                               SelectableChannel channel, int ops) throws Exception {

                        if (channel == null) {

                               return;

                        }

                        channel.configureBlocking(false);

                        channel.register(selector, ops);

                 }

                 // 處理接收的數(shù)據(jù)

                 protected void processData(SelectionKey key) throws Exception {

                        SocketChannel socketChannel = (SocketChannel) key.channel();

                        int count;

                        buffer.clear(); // 清空buffer

                        // 讀取所有的數(shù)據(jù)

                        while ((count = socketChannel.read(buffer)) > 0) {

                               buffer.flip();

                               // send the data, dont assume it goes all at once

                               while (buffer.hasRemaining()) {

                                      char c = (char) buffer.get();

                                      line += c;

                                      // 如果收到回車鍵,則在返回的字符前增加[echo]$字樣,并且server端打印出字符串

                                      if (c == (char) 13) {

                                             buffer.clear();

                                             buffer.put("[echo]$".getBytes());

                                             buffer.flip();

                                             System.out.println(line); //

                                             line = "";

                                      }

                                      socketChannel.write(buffer);// Socket里寫數(shù)據(jù)

                               }

                               buffer.clear(); // 清空buffer

                        }

                        if (count < 0) {

                               // count<0,說明已經(jīng)讀取完畢

                               socketChannel.close();

                        }

                 }

                 private void doWork(SocketChannel channel) throws Exception {

                        buffer.clear();

                        buffer

                                      .put("Hello,I am working,please input some thing,and i will echo to you![echo]$"

                                                    .getBytes());

                        buffer.flip();

                        channel.write(buffer);

                 }

          }

          運行此程序,然后在控制臺輸入命令telnet localhost 23

          e)         Server code:

          public class NonBlockingServer

          {

              public Selector sel = null;

              public ServerSocketChannel server = null;

              public SocketChannel socket = null;

              public int port = 4900;

              String result = null;

              public NonBlockingServer()

              {

                        System.out.println("Inside default ctor");

              }

                 public NonBlockingServer(int port)

              {

                        System.out.println("Inside the other ctor");

                        this.port = port;

              }

              public void initializeOperations() throws IOException,UnknownHostException

              {

                        System.out.println("Inside initialization");

                        sel = Selector.open();

                        server = ServerSocketChannel.open();

                        server.configureBlocking(false);

                        InetAddress ia = InetAddress.getLocalHost();

                        InetSocketAddress isa = new InetSocketAddress(ia,port);

                        server.socket().bind(isa);

              }

                 public void startServer() throws IOException

              {

                        System.out.println("Inside startserver");

                  initializeOperations();

                        System.out.println("Abt to block on select()");

                        SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );     

                        while (acceptKey.selector().select() > 0 )

                        {    

                    

                               Set readyKeys = sel.selectedKeys();

                               Iterator it = readyKeys.iterator();

                               while (it.hasNext()) {

                                      SelectionKey key = (SelectionKey)it.next();

                                      it.remove();

                          

                                      if (key.isAcceptable()) {

                                             System.out.println("Key is Acceptable");

                                             ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                                             socket = (SocketChannel) ssc.accept();

                                             socket.configureBlocking(false);

                                             SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);

                                      }

                                      if (key.isReadable()) {

                                             System.out.println("Key is readable");

                                             String ret = readMessage(key);

                                             if (ret.length() > 0) {

                                                    writeMessage(socket,ret);

                                             }

                                      }

                                      if (key.isWritable()) {

                                             System.out.println("THe key is writable");

                                             String ret = readMessage(key);

                                             socket = (SocketChannel)key.channel();

                                             if (result.length() > 0 ) {

                                                    writeMessage(socket,ret);

                                             }

                                      }

                               }

                        }

              }

              public void writeMessage(SocketChannel socket,String ret)

              {

                        System.out.println("Inside the loop");

                        if (ret.equals("quit") || ret.equals("shutdown")) {

                               return;

                        }

                        try

                        {

                               String s = "This is content from server!-----------------------------------------";

                               Charset set = Charset.forName("us-ascii");

                               CharsetDecoder dec = set.newDecoder();

                               CharBuffer charBuf = dec.decode(ByteBuffer.wrap(s.getBytes()));

                               System.out.println(charBuf.toString());

                               int nBytes = socket.write(ByteBuffer.wrap((charBuf.toString()).getBytes()));

                               System.out.println("nBytes = "+nBytes);

                                      result = null;

                        }

                        catch(Exception e)

                        {

                               e.printStackTrace();

                        }

              }

              public String readMessage(SelectionKey key)

              {

                        int nBytes = 0;

                        socket = (SocketChannel)key.channel();

                  ByteBuffer buf = ByteBuffer.allocate(1024);

                        try

                        {

                      nBytes = socket.read(buf);

                               buf.flip();

                               Charset charset = Charset.forName("us-ascii");

                               CharsetDecoder decoder = charset.newDecoder();

                               CharBuffer charBuffer = decoder.decode(buf);

                               result = charBuffer.toString();

                    

                  }

                        catch(IOException e)

                        {

                               e.printStackTrace();

                        }

                        return result;

              }

              public static void main(String args[])

              {

                     NonBlockingServer nb;

                     if (args.length < 1)

                     {

                            nb = new NonBlockingServer();

                     }

                     else

                     {

                            int port = Integer.parseInt(args[0]);

                            nb = new NonBlockingServer(port);

                     }

                         

                        try

                        {

                               nb.startServer();

                               System.out.println("the nonBlocking server is started!");

                        }

                        catch (IOException e)

                        {

                               e.printStackTrace();

                               System.exit(-1);

                        }

                 }

          }

          2.2.4.2    Client code:

          public class Client {

                 public SocketChannel client = null;

                 public InetSocketAddress isa = null;

                 public RecvThread rt = null;

                 private String host;

                 private int port;

                 public Client(String host, int port) {

                        this.host = host;

                        this.port = port;

                 }

                 public void makeConnection() {

                        String proxyHost = "192.168.254.212";

                        String proxyPort = "1080";

                        System.getProperties().put("socksProxySet", "true");

                        System.getProperties().put("socksProxyHost", proxyHost);

                        System.getProperties().put("socksProxyPort", proxyPort);

                        int result = 0;

                        try {

                               client = SocketChannel.open();

                               isa = new InetSocketAddress(host, port);

                               client.connect(isa);

                               client.configureBlocking(false);

                               receiveMessage();

                        } catch (UnknownHostException e) {

                               e.printStackTrace();

                        } catch (IOException e) {

                               e.printStackTrace();

                        }

                        long begin = System.currentTimeMillis();

                        sendMessage();

                        long end = System.currentTimeMillis();

                        long userTime = end - begin;

                        System.out.println("use tiem: " + userTime);

                        try {

                               interruptThread();

                               client.close();

                               System.exit(0);

                        } catch (IOException e) {

                               e.printStackTrace();

                        }

                 }

                 public int sendMessage() {

                        System.out.println("Inside SendMessage");

                        String msg = null;

                        ByteBuffer bytebuf;

                        int nBytes = 0;

                        try {

                               msg = "It's message from client!";

                               System.out.println("msg is "+msg);

                               bytebuf = ByteBuffer.wrap(msg.getBytes());

                               for (int i = 0; i < 1000; i++) {

                                      nBytes = client.write(bytebuf);

                                      System.out.println(i + " finished");

                               }

                               interruptThread();

                               try {

                                      Thread.sleep(5000);

                               } catch (Exception e) {

                                      e.printStackTrace();

                               }

                               client.close();

                               return -1;

                        } catch (IOException e) {

                               e.printStackTrace();

                        }

                        return nBytes;

                 }

                 public void receiveMessage() {

                        rt = new RecvThread("Receive THread", client);

                        rt.start();

                 }

                 public void interruptThread() {

                        rt.val = false;

                 }

                 public static void main(String args[]) {

                        if (args.length < 2) {

                               System.err.println("You should put 2 args: host,port");

                        } else {

                               String host = args[0];

                               int port = Integer.parseInt(args[1]);

                               Client cl = new Client(host, port);

                               cl.makeConnection();

                        }

                        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

                        String msg;

                 }

                 public class RecvThread extends Thread {

                        public SocketChannel sc = null;

                        public boolean val = true;

                        public RecvThread(String str, SocketChannel client) {

                               super(str);

                               sc = client;

                        }

                        public void run() {

                               int nBytes = 0;

                               ByteBuffer buf = ByteBuffer.allocate(2048);

                               try {

                                      while (val) {

                                             while ((nBytes = nBytes = client.read(buf)) > 0) {

                                                    buf.flip();

                                                    Charset charset = Charset.forName("us-ascii");

                                                    CharsetDecoder decoder = charset.newDecoder();

                                                    CharBuffer charBuffer = decoder.decode(buf);

                                                    String result = charBuffer.toString();

                                                    System.out.println("the server return: " + result);

                                                    buf.flip();

                                             }

                                      }

                               } catch (IOException e) {

                                      e.printStackTrace();

                               }

                        }

                 }

          }

          Reactor模式和NIO

          當前分布式計算 Web Services盛行天下,這些網(wǎng)絡(luò)服務(wù)的底層都離不開對socket的操作。他們都有一個共同的結(jié)構(gòu):

          u        Read request

          u        Decode request

          u        Process service

          u        Encode reply

          u        Send reply

          經(jīng)典的網(wǎng)絡(luò)服務(wù)的設(shè)計如下圖,在每個線程中完成對數(shù)據(jù)的處理:

          但這種模式在用戶負載增加時,性能將下降非常的快。我們需要重新尋找一個新的方案,保持數(shù)據(jù)處理的流暢,很顯然,事件觸發(fā)機制是最好的解決辦法,當有事件發(fā)生時,會觸動handler,然后開始數(shù)據(jù)的處理。

          Reactor模式類似于AWT中的Event處理:

          Reactor模式參與者

          1.Reactor 負責響應(yīng)IO事件,一旦發(fā)生,廣播發(fā)送給相應(yīng)的Handler去處理,這類似于AWTthread
          2.Handler
          是負責非堵塞行為,類似于AWT ActionListeners;同時負責將handlersevent事件綁定,類似于AWT addActionListener

          如圖:

          JavaNIOreactor模式提供了實現(xiàn)的基礎(chǔ)機制,它的Selector當發(fā)現(xiàn)某個channel有數(shù)據(jù)時,會通過SlectorKey來告知我們,在此我們實現(xiàn)事件和handler的綁定。

          我們來看看Reactor模式代碼:


          public class Reactor implements Runnable{

            final Selector selector;
            final ServerSocketChannel serverSocket;

            Reactor(int port) throws IOException {
              selector = Selector.open();
              serverSocket = ServerSocketChannel.open();
              InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
              serverSocket.socket().bind(address);

              serverSocket.configureBlocking(false);
              //selector注冊該channel
               SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

              logger.debug("-->Start serverSocket.register!");

              //利用skattache功能綁定Acceptor 如果有事情,觸發(fā)Acceptor
              sk.attach(new Acceptor());
              logger.debug("-->attach(new Acceptor()!");
            }


            public void run() { // normally in a new Thread
              try {
              while (!Thread.interrupted())
              {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                //Selector如果發(fā)現(xiàn)channelOP_ACCEPTREAD事件發(fā)生,下列遍歷就會進行。
                while (it.hasNext())
                  //來一個事件第一次觸發(fā)一個accepter線程
                  //以后觸發(fā)SocketReadHandler
                  dispatch((SelectionKey)(it.next()));
                  selected.clear();
                }
              }catch (IOException ex) {
                  logger.debug("reactor stop!"+ex);
              }
            }

            //運行AcceptorSocketReadHandler
            void dispatch(SelectionKey k) {
              Runnable r = (Runnable)(k.attachment());
              if (r != null){
                // r.run();

              }
            }

            class Acceptor implements Runnable { // inner
              public void run() {
              try {
                logger.debug("-->ready for accept!");
                SocketChannel c = serverSocket.accept();
                if (c != null)
                  //調(diào)用Handler來處理channel
                  new SocketReadHandler(selector, c);
                }
              catch(IOException ex) {
                logger.debug("accept stop!"+ex);
              }
              }
            }
          }

          以上代碼中巧妙使用了SocketChannelattach功能,將Hanlder和可能會發(fā)生事件的channel鏈接在一起,當發(fā)生事件時,可以立即觸發(fā)相應(yīng)鏈接的Handler

          再看看Handler代碼:

          public class SocketReadHandler implements Runnable {

            public static Logger logger = Logger.getLogger(SocketReadHandler.class);

            private Test test=new Test();

            final SocketChannel socket;
            final SelectionKey sk;

             static final int READING = 0, SENDING = 1;
            int state = READING;

            public SocketReadHandler(Selector sel, SocketChannel c)
              throws IOException {

              socket = c;

              socket.configureBlocking(false);
               sk = socket.register(sel, 0);

              //SelectionKey綁定為本Handler 下一步有事件觸發(fā)時,將調(diào)用本類的run方法。
              //參看dispatch(SelectionKey k)
              sk.attach(this);

              //同時將SelectionKey標記為可讀,以便讀取。
              sk.interestOps(SelectionKey.OP_READ);
              sel.wakeup();
            }

            public void run() {
              try{
              // test.read(socket,input);
                readRequest() ;
              }catch(Exception ex){
              logger.debug("readRequest error"+ex);
              }
            }


          /**
          *
          處理讀取data
          * @param key
          * @throws Exception
          */
          private void readRequest() throws Exception {

            ByteBuffer input = ByteBuffer.allocate(1024);
            input.clear();
            try{

              int bytesRead = socket.read(input);

              ......

              //激活線程池處理這些request
              requestHandle(new Request(socket,btt));

              .....


            }catch(Exception e) {
            }

          }

          注意在Handler里面又執(zhí)行了一次attach,這樣,覆蓋前面的Acceptor,下次該Handler又有READ事件發(fā)生時,將直接觸發(fā)Handler.從而開始了數(shù)據(jù)的讀 處理 寫 發(fā)出等流程處理。

          將數(shù)據(jù)讀出后,可以將這些數(shù)據(jù)處理線程做成一個線程池,這樣,數(shù)據(jù)讀出后,立即扔到線程池中,這樣加速處理速度:

          更進一步,我們可以使用多個Selector分別處理連接和讀事件。

          一個高性能的Java網(wǎng)絡(luò)服務(wù)機制就要形成,激動人心的集群并行計算即將實現(xiàn)。

          3.         Socket網(wǎng)絡(luò)框架 MINA

          a)         Overview

          MINA是一個網(wǎng)絡(luò)應(yīng)用框架,在不犧牲性能和可擴展性的前提下用于解決如下問題:

          n         快速開發(fā)自己的應(yīng)用。

          n         高可維護性,高可復(fù)用性:網(wǎng)絡(luò)I/O編碼,消息的編/解碼,業(yè)務(wù)邏輯互相分離。

          n         相對容易的進行單元測試。

          b)        MINA架構(gòu):

          IoSessionManager: Where real I/O occurs

          IoFilters: Filters I/O events • requests

          IoHandler: Your protocol logic

          IoSession: Represents a connection

          n         IoFilters

          IoFilterMINA的功能擴展提供了接口。它攔截所有的IO事件進行事件的預(yù)處理和河畜處理(AOP)。我們可以把它想象成Servletfilters

          IoFilter能夠?qū)崿F(xiàn)以下幾種目的:

          事件日志

          性能檢測

          數(shù)據(jù)轉(zhuǎn)換(e.g. SSL support)codec

          防火墻…等等

          n         codec: ProtocolCodecFactory

          MINA提供了方便的Protocol支持。如上說講,codecIoFilters中設(shè)置。

          通過它的EncoderDecoder,可以方便的擴展并支持各種基于Socket的網(wǎng)絡(luò)協(xié)議,比如HTTP服務(wù)器、FTP服務(wù)器、Telnet服務(wù)器等等。

          要實現(xiàn)自己的編碼/解碼器(codec)只需要實現(xiàn)interface: ProtocolCodecFactory即可.

          MINA 1.0版本,MINA已經(jīng)實現(xiàn)了幾個常用的(codec factory):

          DemuxingProtocolCodecFactory,

          NettyCodecFactory,

          ObjectSerializationCodecFactory,

          TextLineCodecFactory

          其中:

          n         TextLineCodecFactory:

          A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java

          string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.

          n         ObjectSerializationCodecFactory:

          A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when

          you have to prototype your application rapidly without any specific codec.

          n         DemuxingProtocolCodecFactory

          A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders and MessageDecoders.

          n         NettyCodecFactory:

          A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.

          n         IoHandler :business logic

          MINA中,所有的業(yè)務(wù)邏輯都在實現(xiàn)了IoHandlerclass完成。

          Interface Handle:

           all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically.

           當事件發(fā)生時,將觸發(fā)IoHandler中的方法:

           sessionCreated:當一個session創(chuàng)建的時候調(diào)用;

           sessionOpened:在sessionCreated調(diào)用之后被調(diào)用;

          sessionClosed:當IO連接被關(guān)閉時被調(diào)用;

           sessionIdle:當在遠程實體和用戶程序之間沒有數(shù)據(jù)傳輸?shù)臅r候被調(diào)用;

          exceptionCaught:當IoAcceptor 或者IoHandler.中出現(xiàn)異常時被調(diào)用;

          messageReceived:當接受到消息時調(diào)用;

          messageSent:當發(fā)出請求時調(diào)用。

          MINA 1.0中,IoHandler的實現(xiàn)類:

          ChainedIoHandler

           DemuxingIoHandler,

          IoHandlerAdapter

           SingleSessionIoHandlerDelegate

           StreamIoHandler

          具體細節(jié)可參考javadoc

          c)        MINA的高級主題:線程模式

          MINA通過它靈活的filter機制來提供多種線程模型。

          沒有線程池過濾器被使用時MINA運行在一個單線程模式。

          如果添加了一個IoThreadPoolFilterIoAcceptor,將得到一個leader-follower模式的線程池。

          如果再添加一個ProtocolThreadPoolFilterserver將有兩個線程池:

          一個(IoThreadPoolFilter)被用于對message對象進行轉(zhuǎn)換,另外一個(ProtocolThreadPoolFilter)被用于處理業(yè)務(wù)邏輯。

          SimpleServiceRegistry加上IoThreadPoolFilterProtocolThreadPoolFilter的缺省實現(xiàn)即可適用于需要高伸縮性的應(yīng)用。如果想使用自己的線程模型,請參考SimpleServiceRegistry的源代碼,并且自己

          初始化Acceptor

          IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();

          IoAcceptor acceptor = new SocketAcceptor();

          acceptor.getFilterChain().addLast( "threadPool", threadPool);

          ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();

          threadPool2.start();

          ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );

          acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

          ...

          threadPool2.stop();

          threadPool.stop();

          d)        采用MINA進行socket開發(fā),一般步驟如下:

          n         Begin:

          IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器

          or client:

          SocketConnector connector = new SocketConnector(); //建立一個連接器

          n         server的屬性配置:

                  SocketAcceptorConfig cfg = new SocketAcceptorConfig();

                  cfg.setReuseAddress(true);

                  cfg.getFilterChain().addLast(

                              "codec",

                              new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //對象序列化 codec factory

                  cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

          n         綁定addressbusiness logic

          server:

                  acceptor.bind(

                          new InetSocketAddress( SERVER_PORT ),

                          new ServerSessionHandler( ), cfg ); // 綁定addresshandler

          client:

                  connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                                  new ClientSessionHandler(msg), cfg );

          n         實現(xiàn)自己的業(yè)務(wù)邏輯: IoHandler

          n         如有必要,實現(xiàn)自己的CODEC

          下面的代碼演示了采用ObjectSerializationCodecFactory給服務(wù)端傳送文件:

          e)         Client

          public class Client

          {

              private static final String HOSTNAME = "192.168.0.81";

              private static final int PORT = 8080;

              private static final int CONNECT_TIMEOUT = 30; // seconds

              public static void main( String[] args ) throws Throwable

              {

                  System.out.println("in nio client");

                  SocketConnector connector = new SocketConnector();       

                  // Configure the service.

                  SocketConnectorConfig cfg = new SocketConnectorConfig();

                  cfg.setConnectTimeout( CONNECT_TIMEOUT );

                    cfg.getFilterChain().addLast(

                              "codec",

                              new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

                  cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

                  IoSession session;

                  if(args.length > 1)

                  {

                      connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                              new ClientSessionHandler(args), cfg );

                  }

                  else

                  {

                      String[] files = {"E:/music/lcl/juhuatai.mp3",

                                              "E:/music/lcl/jimosazhouleng.mp3"};

                      connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                              new ClientSessionHandler(files), cfg );

                  }

              }

          }

          f)         Clint handleclient端的業(yè)務(wù)代碼)

          public class ClientSessionHandler extends IoHandlerAdapter

              private String[] files;

              public ClientSessionHandler(String[] files)

              {

                  this.files = files;

              }

              public void sessionOpened( IoSession session )

              {

                  for (int i = 0; i < this.files.length; i++)

                  {

                      Thread sendMessageThread = new SendMessageThread("Thread" + i, session,files[i]);

                      sendMessageThread.start();

                  }

              }

              public void messageReceived( IoSession session, Object message )

              {

                  System.out.println("in messageReceived!");

              }

              public void exceptionCaught( IoSession session, Throwable cause )

              {

                  session.close();

              }

              public class SendMessageThread extends Thread

              {

                  private IoSession session;

                  private String filename;

                  public SendMessageThread(String name, IoSession session, String filename)

                  {

                      super(name);

                      this.session = session;

                      this.filename = filename;

                  }

                  public void run()

                  {

                      System.out.println("start thread: " + this.getName());

                      try {               

                          ByteBuffer buf = ByteBuffer.allocate(Constants.BUF_SIZE);

                          

                          FileChannel fc = new FileInputStream(filename).getChannel();

                          int index;

                          while ((index = NioFileUtil.readFile(fc, buf)) > 0)

                          {

                            buf.flip();

                            byte[] bs;

                            if (index == buf.capacity())

                            {

                                bs = buf.array();

                            }

                            else

                            {

                                bs = new byte[index];

                                int i = 0;

                               while (buf.hasRemaining())

                                {

                                    bs[i++] = buf.get();

                                }

                            }

                            Message msg = new Message(filename,Constants.CMD_SEND, bs);

                            session.write(msg);

                          }

                          Message msg = new Message(filename, Constants.CMD_FINISHED, null);

                          session.write(msg);        

                      } catch (Exception e) {

                          e.printStackTrace();

                      }          

                  }

              }

          }

          g)        Server

          public class Server

          {

              private static final int SERVER_PORT = 8080;

              public static void main( String[] args ) throws Throwable

              {

                  IoAcceptor acceptor = new SocketAcceptor();

                  // Prepare the service configuration.

                  SocketAcceptorConfig cfg = new SocketAcceptorConfig();

                  cfg.setReuseAddress( true );

                  cfg.getFilterChain().addLast(

                              "codec",

                              new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

                  cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

                  acceptor.bind(

                          new InetSocketAddress( SERVER_PORT ),

                          new ServerSessionHandler( ), cfg );

                  System.out.println( "nioFileServer Listening on port " + SERVER_PORT );

              }

          }

          h)        Server handle:(Server端業(yè)務(wù)代碼)

          public class ServerSessionHandler extends IoHandlerAdapter

          {   

              public void sessionOpened( IoSession session )

              {

                  // set idle time to 60 seconds

                  System.out.println("in sessionOpened");

                  session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );

                  session.setAttribute("times",new Integer(0));

              }

              public void messageReceived( IoSession session, Object message )

              {

                  System.out.println("in messageReceived");

                     Message msg = (Message) message;

                     System.out.println("the file name is: " + msg.getFileName() + ""n");

                     this.process(session, msg);

                    

              }

              private void process(IoSession session, Message message)

              {

                  String[] temparray = message.getFileName().split("[//]");

                  String filename ="d:/" + temparray[temparray.length - 1];

                  if (session.containsAttribute(message.getFileName()))

                  {

                      FileChannel channel = (FileChannel)session.getAttribute(message.getFileName());

                      if (message.getType().equals(Constants.CMD_SEND))

                      {

                          try {

                              NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                          } catch (Exception e) {

                              e.printStackTrace();

                          }               

                      }

                      else

                      {

                          try {

                              channel.close();

                              channel = null;

                              session.removeAttribute(message.getFileName());

                          } catch (IOException e) {

                              e.printStackTrace();

                          }

                      }

                  }

                  else

                  {

                      try {

                          FileChannel channel = new FileOutputStream(filename).getChannel();

                          NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                          session.setAttribute(message.getFileName(), channel);

                      } catch (Exception e) {

                          // TODO Auto-generated catch block

                          e.printStackTrace();

                      }          

                  }

              }

              public void sessionIdle( IoSession session, IdleStatus status )

              {

                  SessionLog.info( session, "Disconnecting the idle." );

                  // disconnect an idle client

                  session.close();

              }

              public void exceptionCaught( IoSession session, Throwable cause )

              {

                  // close the connection on exceptional situation

                  session.close();

              }

          }

          i)          文件操作:

          public class NioFileUtil {

              public static void writeFile(FileChannel fileChannel, ByteBuffer buf) throws Exception

              {

                  buf.clear();

                  fileChannel.write(buf);    

              }

              public static int readFile(FileChannel fileChannel,ByteBuffer buf) throws IOException

              {

                  buf.rewind();

                  int index = fileChannel.read(buf);

                  return index;

              } 

          }

          j)          常量:

          public class Constants {

              public static final String CMD_FINISHED = "FINISHED";

              public static final String CMD_SEND = "SEND";    

              public static final int BUF_SIZE = 10240;

              private Constants(){}   

          }

          Demo

          Introduction

          org.apache.mina.example.chat

          Chat server which demonstates using the text line codec and Spring integration.

          org.apache.mina.example.chat.client

          Swing based chat client.

          org.apache.mina.example.echoserver

          Echo server which demonstates low-level I/O layer and SSL support.

          org.apache.mina.example.echoserver.ssl

          SSL support classes.

          org.apache.mina.example.httpserver.codec

          A HTTP server implemented with protocol codec (needs more work).

          org.apache.mina.example.httpserver.stream

          A simplistic HTTP server which demonstates stream-based I/O support.

          org.apache.mina.example.netcat

          NetCat client (Network + Unix cat command) which demonstates low-level I/O layer.

          org.apache.mina.example.proxy

          A TCP/IP tunneling proxy example.

          org.apache.mina.example.reverser

          Reverser server which reverses all text lines demonstating high-level protocol layer.

          org.apache.mina.example.sumup

          SumUp Server and Client which sums up all ADD requests.

          org.apache.mina.example.sumup.codec

          Protocol codec implementation for SumUp protocol.

          org.apache.mina.example.sumup.message

          Protocol mmessage classes for SumUp protocol.

          org.apache.mina.example.tennis

          Two tennis players play a game which demonstates in-VM pipes.

          n         友情提示:

          下載并運行MINAdemo程序還頗非周折:

          運行MINA demo applition

          1:JDK5

          產(chǎn)生錯誤:

          Exception in thread "main" java.lang.NoClassDefFoundError: edu/emory/mathcs/backport/java/util/concurrent/Executor

                 at org.apache.mina.example.reverser.Main.main(Main.java:44)

          察看minaQA email:

          http://www.mail-archive.com/mina-dev@directory.apache.org/msg02252.html

          原來需要下載:backport-util-concurrent.jar并加入classpath

          http://dcl.mathcs.emory.edu/util/backport-util-concurrent/

          繼續(xù)運行還是報錯:

          Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

          原來MINA采用了slf4j項目作為log,繼續(xù)下載

          slf4j-simple.jar等,并加入classpath:

          http://www.slf4j.org/download.html

          posted on 2007-09-03 15:38 哼哼 閱讀(2982) 評論(0)  編輯  收藏 所屬分類: JAVA-Web
          主站蜘蛛池模板: 大安市| 福州市| 嘉义县| 屏东市| 西藏| 临夏县| 仁寿县| 尚志市| 河源市| 辉县市| 九龙县| 昔阳县| 新野县| 崇左市| 怀柔区| 合江县| 丰原市| 兰坪| 清原| 鹤庆县| 会昌县| 丰镇市| 中超| 都兰县| 剑阁县| 河北省| 保德县| 楚雄市| 江门市| 惠安县| 双柏县| 嘉荫县| 南部县| 邢台县| 德清县| 永善县| 霍州市| 嘉荫县| 嵊州市| 安徽省| 吉水县|