用NIO實現的一個Chat Demo [轉]
發現網上找到的許多NIO的用例在跑起來后都有許多問題,最常見的就是沒有對interest event進行合理的registry和unregistry,導致程序一直在loopling,又或者當客戶端或服務器端連接斷開時有死循環的跡象。忍不住自己做了一個demo,我想可以作為一個NIO應用的模板去修改,只要把doRead,doWrite之類的用基于線程的Handler去處理,那就基本可以滿足需求了。
這個Demo的目的是在Client和Server間建立類似QQ聊天那樣的功能,讓客戶端和服務器端都支持用戶輸入和異步消息顯示(因為服務器端要支持用戶的console輸入,所以不要用多個客戶端進行連接,否則可能會出現難以預測的問題)。
代碼中用紅色顯示的地方是我認為需要注意的地方,說老實話NIO雖然很強大,但完全用Non-Blocking來編程,有許多需要小心的地方,一不小心還可能造成死循環。就像ReentrantLock之于Synchronized,如果基本的IO能滿足需求,就不必強求應用NIO。
注意:OP_WRITE應該是在寫入準備就緒的時候才添加到SelectionKey里面去,而且在寫入完成后一定要去除,否則selector.select()方法就不會被blocking而造成死循環。
MyNioServer.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Iterator; import java.util.LinkedList; import java.util.Set; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.channels.ServerSocketChannel; import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; public class MyNioServer { private int BUFFERSIZE = 1024*10; private String CHARSET = "GBK"; private Selector sel; public MyNioServer(int port) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind( new InetSocketAddress(InetAddress.getLocalHost(), port)); sel = Selector.open(); ssc.register(sel, SelectionKey.OP_ACCEPT); } public void startup() { System.out.println("Server start..."); try { while (!Thread.interrupted()) { int keysCount = sel.select(); System.out.println("Catched " + keysCount + " SelectionKeys"); if (keysCount < 1) { continue; } Set<SelectionKey> set = sel.selectedKeys(); Iterator<SelectionKey> it = set.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isAcceptable()) { System.out.println("Key isAcceptable"); doAccept(key); } if (key.isValid() && key.isReadable()) { System.out.println("Key isReadable"); doRead(key); } if (key.isValid() && key.isWritable()) { System.out.println("Key isWritable"); doWrite(key); } } set.clear(); } System.err.println("Program is interrupted."); } catch (IOException e) { e.printStackTrace(); } System.out.println("Server stop..."); shutdown(); } public void shutdown(){ Set<SelectionKey> keys = sel.keys(); for(SelectionKey key:keys){ try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } try { sel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doAccept(SelectionKey key) { try { SocketChannel sc = ((ServerSocketChannel) key.channel()).accept(); sc.configureBlocking(false); SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ); newkey.attach(new LinkedList<ByteBuffer>()); new Thread(new UserInteractive(newkey)).start(); } catch (IOException e) { e.printStackTrace(); System.err.println("Failed to accept new client."); } System.out.println("end doAccept"); } // TODO buffersize performance testing private void doRead(SelectionKey key) { try { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE); StringBuffer sb = new StringBuffer(); int count = 0; while ( (count = sc.read(bb)) > 0) { bb.flip(); sb.append(Charset.forName(CHARSET).decode(bb)); bb.flip(); } //if client disconnected, read return -1 if(count == -1){ System.out.println("client disconnected"); disconnect(key); } else { System.out.println("message received from client:" + sb.toString()); } } catch (IOException e) { disconnect(key); e.printStackTrace(); } System.out.println("end doRead"); } private void doWrite(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key .attachment(); ByteBuffer bb = outseq.poll(); if(bb == null){ return; } try { while(bb.hasRemaining()){ sc.write(bb); } } catch (IOException e) { disconnect(key); e.printStackTrace(); } if (outseq.size() == 0) { System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps"); key.interestOps(SelectionKey.OP_READ); } System.out.println("end doWrote"); } private void disconnect(SelectionKey key) { try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } //TODO find out how to shutdown private class UserInteractive implements Runnable { SelectionKey key; public UserInteractive(SelectionKey key) { this.key = key; } public void run() { System.out.println("UserInteractive thread start..."); BufferedReader br = new BufferedReader(new InputStreamReader( System.in)); while (true) { try { String inputLine = br.readLine(); ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE); bb = ByteBuffer.wrap(inputLine.getBytes()); ((LinkedList<ByteBuffer>) key.attachment()).offer(bb); System.out .println("after input, register OP_WRITE to interestOps and wakeup selector"); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); key.selector().wakeup(); } catch (IOException e) { e.printStackTrace(); } } } } /** * @param args */ public static void main(String[] args) { try { MyNioServer server = new MyNioServer(10001); server.startup(); } catch (Exception e) { e.printStackTrace(); System.err.println("Exception caught, program exiting…"); } } } |
MyNioClient.java
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.SelectionKey; import java.text.MessageFormat; import java.util.LinkedList; import java.util.Set; import java.util.Iterator; public class MyNioClient { private int BUFFERSIZE = 1024*10; private String CHARSET = "GBK"; private Selector sel; public MyNioClient(int port) throws IOException { SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); // this operation need to be executed before socket.connnect, for OP_CONNECT event sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port)); sel = Selector.open(); sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ); } public void startup() { System.out.println("Client start..."); try { while (!Thread.interrupted()) { int keysCount = sel.select(); System.out.println("Catched " + keysCount + " SelectionKeys"); if (keysCount < 1) { continue; } Set<SelectionKey> selectedKeys = sel.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); //printKeyInfo(key); if (key.isConnectable()) { System.out.println("Key isConnectable"); doConnect(key); } else if (key.isValid() && key.isReadable()) { System.out.println("Key isReadable"); doRead(key); } else if (key.isValid() && key.isWritable()) { System.out.println("Key isWritable"); doWrite(key); } } selectedKeys.clear(); } System.err.println("Program is interrupted."); } catch (IOException e) { e.printStackTrace(); } System.out.println("Client stop..."); shutdown(); } public void shutdown(){ Set<SelectionKey> keys = sel.keys(); for(SelectionKey key:keys){ try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } try { sel.close(); } catch (IOException e) { e.printStackTrace(); } } private void printKeyInfo(SelectionKey key) { String keyStr = MessageFormat .format( "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}", key.interestOps(), key.readyOps(), key.isValid(), key .isAcceptable(), key.isConnectable(), key .isReadable(), key.isWritable()); System.out.println(keyStr); } private void doConnect(SelectionKey key) { try { boolean flag = ((SocketChannel) key.channel()).finishConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } System.out.println("unregister OP_CONNECT from interestOps"); key.interestOps(SelectionKey.OP_READ); key.attach(new LinkedList<ByteBuffer>()); new Thread(new UserInteractive(key)).start(); } private void doRead(SelectionKey key) { try { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE); StringBuffer sb = new StringBuffer(); while (sc.read(bb) > 0) { bb.flip(); sb.append(Charset.forName(CHARSET).decode(bb)); bb.flip(); } System.out.println("message received from server:" + sb.toString()); } catch (IOException e) { e.printStackTrace(); disconnect(key); System.exit(1); } System.out.println("now end readMessage"); } private void doWrite(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key .attachment(); ByteBuffer bb = outseq.poll(); if(bb == null){ return; } try { while(bb.hasRemaining()){ sc.write(bb); } } catch (IOException e) { disconnect(key); e.printStackTrace(); } if (outseq.size() == 0) { System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps"); key.interestOps(SelectionKey.OP_READ); } System.out.println("end doWrote"); } private void disconnect(SelectionKey key) { try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } } private class UserInteractive implements Runnable { SelectionKey key; public UserInteractive(SelectionKey key) { this.key = key; } public void run() { LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key .attachment(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); while (true) { try { String inputLine = br.readLine(); if ("quit".equalsIgnoreCase(inputLine)) { key.channel().close(); System.exit(1); break; } ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE); bb = ByteBuffer.wrap(inputLine.getBytes()); outseq.offer(bb); System.out .println("after input, register OP_WRITE to interestOps and wakeup selector"); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); sel.wakeup(); } catch (IOException e) { e.printStackTrace(); } } } } /** * @param args */ public static void main(String[] args) { try { MyNioClient client = new MyNioClient(10001); client.startup(); } catch (Exception e) { e.printStackTrace(); System.err.println("Exception caught, program exiting..."); } } } |
posted on 2010-05-29 12:38 都市淘沙者 閱讀(1353) 評論(1) 編輯 收藏 所屬分類: 多線程并發編程