Dict.CN 在線詞典, 英語學習, 在線翻譯

          都市淘沙者

          荔枝FM Everyone can be host

          統計

          留言簿(23)

          積分與排名

          優秀學習網站

          友情連接

          閱讀排行榜

          評論排行榜

          用NIO實現的一個Chat Demo [轉]

          發現網上找到的許多NIO的用例在跑起來后都有許多問題,最常見的就是沒有對interest event進行合理的registry和unregistry,導致程序一直在loopling,又或者當客戶端或服務器端連接斷開時有死循環的跡象。忍不住自己做了一個demo,我想可以作為一個NIO應用的模板去修改,只要把doRead,doWrite之類的用基于線程的Handler去處理,那就基本可以滿足需求了。
          這個Demo的目的是在Client和Server間建立類似QQ聊天那樣的功能,讓客戶端和服務器端都支持用戶輸入和異步消息顯示(因為服務器端要支持用戶的console輸入,所以不要用多個客戶端進行連接,否則可能會出現難以預測的問題)。
          代碼中用紅色顯示的地方是我認為需要注意的地方,說老實話NIO雖然很強大,但完全用Non-Blocking來編程,有許多需要小心的地方,一不小心還可能造成死循環。就像ReentrantLock之于Synchronized,如果基本的IO能滿足需求,就不必強求應用NIO。
          注意:OP_WRITE應該是在寫入準備就緒的時候才添加到SelectionKey里面去,而且在寫入完成后一定要去除,否則selector.select()方法就不會被blocking而造成死循環。

          MyNioServer.java

          import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.util.Iterator;
          import java.util.LinkedList;
          import java.util.Set;
          import java.net.InetAddress;
          import java.net.InetSocketAddress;
          import java.nio.ByteBuffer;
          import java.nio.charset.Charset;
          import java.nio.channels.ServerSocketChannel;
          import java.nio.channels.Selector;
          import java.nio.channels.SelectionKey;
          import java.nio.channels.SocketChannel;

          public class MyNioServer {

              private int BUFFERSIZE = 1024*10;
              private String CHARSET = "GBK";
              private Selector sel;

              public MyNioServer(int port) throws IOException {
                  ServerSocketChannel ssc = ServerSocketChannel.open();
                  ssc.configureBlocking(false);
                  ssc.socket().bind(
                          new InetSocketAddress(InetAddress.getLocalHost(), port));
                  sel = Selector.open();
                  ssc.register(sel, SelectionKey.OP_ACCEPT);
              }

              public void startup() {
                  System.out.println("Server start...");
                  try {
                      while (!Thread.interrupted()) {
                          int keysCount = sel.select();
                          System.out.println("Catched " + keysCount + " SelectionKeys");
                          if (keysCount < 1) {
                              continue;
                          }
                          Set<SelectionKey> set = sel.selectedKeys();
                          Iterator<SelectionKey> it = set.iterator();
                          while (it.hasNext()) {
                              SelectionKey key = it.next();
                              if (key.isAcceptable()) {
                                  System.out.println("Key isAcceptable");
                                  doAccept(key);
                              }
                              if (key.isValid() && key.isReadable()) {
                                  System.out.println("Key isReadable");
                                  doRead(key);
                              }
                              if (key.isValid() && key.isWritable()) {
                                  System.out.println("Key isWritable");
                                  doWrite(key);
                              }
                          }
                          set.clear();
                      }
                      System.err.println("Program is interrupted.");
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  System.out.println("Server stop...");
                  shutdown();
              }
             
              public void shutdown(){
                  Set<SelectionKey> keys = sel.keys();
                  for(SelectionKey key:keys){
                      try {
                          key.channel().close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
                  try {
                      sel.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }

              private void doAccept(SelectionKey key) {
                  try {
                      SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
                      sc.configureBlocking(false);
                      SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
                      newkey.attach(new LinkedList<ByteBuffer>());
                      new Thread(new UserInteractive(newkey)).start();
                  } catch (IOException e) {
                      e.printStackTrace();
                      System.err.println("Failed to accept new client.");
                  }
                  System.out.println("end doAccept");
              }

              // TODO buffersize performance testing
              private void doRead(SelectionKey key) {
                  try {
                      SocketChannel sc = (SocketChannel) key.channel();
                      ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                      StringBuffer sb = new StringBuffer();
                      int count = 0;
                      while ( (count = sc.read(bb)) > 0) {
                          bb.flip();
                          sb.append(Charset.forName(CHARSET).decode(bb));
                          bb.flip();
                      }
                      //if client disconnected, read return -1
                      if(count == -1){
                          System.out.println("client disconnected");
                          disconnect(key);   
                      } else {
                          System.out.println("message received from client:" + sb.toString());
                      }
                  } catch (IOException e) {
                      disconnect(key);
                      e.printStackTrace();
                  }
                  System.out.println("end doRead");
              }

              private void doWrite(SelectionKey key) {
                  SocketChannel sc = (SocketChannel) key.channel();
                  LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                          .attachment();
                  ByteBuffer bb = outseq.poll();
                  if(bb == null){
                      return;
                  }
                  try {
                      while(bb.hasRemaining()){
                          sc.write(bb);
                      }           
                  } catch (IOException e) {
                      disconnect(key);
                      e.printStackTrace();
                  }
                  if (outseq.size() == 0) {
                      System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                      key.interestOps(SelectionKey.OP_READ);
                  }
                  System.out.println("end doWrote");
              }

              private void disconnect(SelectionKey key) {
                  try {
                      key.channel().close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }

              //TODO find out how to shutdown
              private class UserInteractive implements Runnable {

                  SelectionKey key;

                  public UserInteractive(SelectionKey key) {
                      this.key = key;
                  }

                  public void run() {
                      System.out.println("UserInteractive thread start...");
                      BufferedReader br = new BufferedReader(new InputStreamReader(
                              System.in));
                      while (true) {
                          try {
                              String inputLine = br.readLine();
                              ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                              bb = ByteBuffer.wrap(inputLine.getBytes());
                              ((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
                              System.out
                                      .println("after input, register OP_WRITE to interestOps and wakeup selector");
                              key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                              key.selector().wakeup();
                          } catch (IOException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              }

              /**
               * @param args
               */
              public static void main(String[] args) {
                  try {
                      MyNioServer server = new MyNioServer(10001);
                      server.startup();
                  } catch (Exception e) {
                      e.printStackTrace();
                      System.err.println("Exception caught, program exiting…");
                  }
              }
          }


          MyNioClient.java

          import java.io.BufferedReader;
          import java.io.IOException;
          import java.io.InputStreamReader;
          import java.net.InetAddress;
          import java.net.InetSocketAddress;
          import java.nio.ByteBuffer;
          import java.nio.CharBuffer;
          import java.nio.charset.Charset;
          import java.nio.channels.Selector;
          import java.nio.channels.SocketChannel;
          import java.nio.channels.SelectionKey;
          import java.text.MessageFormat;
          import java.util.LinkedList;
          import java.util.Set;
          import java.util.Iterator;

          public class MyNioClient {

              private int BUFFERSIZE = 1024*10;
              private String CHARSET = "GBK";
              private Selector sel;

              public MyNioClient(int port) throws IOException {
                  SocketChannel sc = SocketChannel.open();
                  sc.configureBlocking(false);    // this operation need to be executed before socket.connnect, for OP_CONNECT event
                  sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
                  sel = Selector.open();
                  sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
              }

              public void startup() {
                  System.out.println("Client start...");
                  try {
                      while (!Thread.interrupted()) {
                          int keysCount = sel.select();
                          System.out.println("Catched " + keysCount + " SelectionKeys");
                          if (keysCount < 1) {
                              continue;
                          }               
                          Set<SelectionKey> selectedKeys = sel.selectedKeys();
                          Iterator<SelectionKey> it = selectedKeys.iterator();
                          while (it.hasNext()) {
                              SelectionKey key = it.next();
                              //printKeyInfo(key);
                              if (key.isConnectable()) {
                                  System.out.println("Key isConnectable");
                                  doConnect(key);
                              } else if (key.isValid() && key.isReadable()) {
                                  System.out.println("Key isReadable");
                                  doRead(key);
                              } else if (key.isValid() && key.isWritable()) {
                                  System.out.println("Key isWritable");
                                  doWrite(key);
                              }
                          }
                          selectedKeys.clear();
                      }
                      System.err.println("Program is interrupted.");
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  System.out.println("Client stop...");
                  shutdown();
              }
             
              public void shutdown(){
                  Set<SelectionKey> keys = sel.keys();
                  for(SelectionKey key:keys){
                      try {
                          key.channel().close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
                  try {
                      sel.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }

              private void printKeyInfo(SelectionKey key) {
                  String keyStr = MessageFormat
                          .format(
                                  "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
                                  key.interestOps(), key.readyOps(), key.isValid(), key
                                          .isAcceptable(), key.isConnectable(), key
                                          .isReadable(), key.isWritable());
                  System.out.println(keyStr);
              }

              private void doConnect(SelectionKey key) {
                  try {
                      boolean flag = ((SocketChannel) key.channel()).finishConnect();
                  } catch (IOException e) {
                      e.printStackTrace();
                      System.exit(1);
                  }
                  System.out.println("unregister OP_CONNECT from interestOps");
                  key.interestOps(SelectionKey.OP_READ);
                  key.attach(new LinkedList<ByteBuffer>());
                  new Thread(new UserInteractive(key)).start();
              }

              private void doRead(SelectionKey key) {
                  try {
                      SocketChannel sc = (SocketChannel) key.channel();
                      ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                      StringBuffer sb = new StringBuffer();
                      while (sc.read(bb) > 0) {
                          bb.flip();
                          sb.append(Charset.forName(CHARSET).decode(bb));
                          bb.flip();
                      }
                      System.out.println("message received from server:" + sb.toString());
                  } catch (IOException e) {
                      e.printStackTrace();
                      disconnect(key);
                      System.exit(1);
                  }
                  System.out.println("now end readMessage");
              }

              private void doWrite(SelectionKey key) {
                  SocketChannel sc = (SocketChannel) key.channel();
                  LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                          .attachment();
                  ByteBuffer bb = outseq.poll();
                  if(bb == null){
                      return;
                  }
                  try {
                      while(bb.hasRemaining()){
                          sc.write(bb);
                      }           
                  } catch (IOException e) {
                      disconnect(key);
                      e.printStackTrace();
                  }
                  if (outseq.size() == 0) {
                      System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                      key.interestOps(SelectionKey.OP_READ);
                  }
                  System.out.println("end doWrote");
              }

              private void disconnect(SelectionKey key) {
                  try {
                      key.channel().close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }

              private class UserInteractive implements Runnable {

                  SelectionKey key;

                  public UserInteractive(SelectionKey key) {
                      this.key = key;
                  }

                  public void run() {
                      LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                              .attachment();
                       BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                      while (true) {
                          try {
                              String inputLine = br.readLine();
                              if ("quit".equalsIgnoreCase(inputLine)) {
                                  key.channel().close();
                                  System.exit(1);
                                  break;
                              }
                              ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                              bb = ByteBuffer.wrap(inputLine.getBytes());
                              outseq.offer(bb);
                              System.out
                              .println("after input, register OP_WRITE to interestOps and wakeup selector");
                              key.interestOps(SelectionKey.OP_READ
                                      | SelectionKey.OP_WRITE);
                              sel.wakeup();
                          } catch (IOException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              }

              /**
               * @param args
               */
              public static void main(String[] args) {
                  try {
                      MyNioClient client = new MyNioClient(10001);
                      client.startup();
                  } catch (Exception e) {
                      e.printStackTrace();
                      System.err.println("Exception caught, program exiting...");
                  }
              }

          }

          posted on 2010-05-29 12:38 都市淘沙者 閱讀(1353) 評論(1)  編輯  收藏 所屬分類: 多線程并發編程

          評論

          # re: 用NIO實現的一個Chat Demo [轉][未登錄] 2015-04-08 23:36 harry

          請教一個問題,為什么UserInteractive里面 SelctionKey.wakeup以后,就成了寫就緒模式呢(key.isWritable()是true)  回復  更多評論   

          主站蜘蛛池模板: 静乐县| 正阳县| 厦门市| 陆河县| 木兰县| 孟州市| 扶沟县| 宕昌县| 新竹县| 肃北| 辽中县| 吉木乃县| 吴堡县| 福建省| 财经| 舞钢市| 绥德县| 明溪县| 甘洛县| 黑山县| 宜春市| 黄大仙区| 深圳市| 昔阳县| 柳江县| 平昌县| 台湾省| 宁蒗| 京山县| 九龙坡区| 吉林市| 龙川县| 凤庆县| 青海省| 奉新县| 长治市| 荃湾区| 浦北县| 长宁区| 青海省| 蓬溪县|