狂奔 lion

          自強不息

          NIO學習之Web服務器示例

          1 根據cpu core數量確定selector數量
          2 用一個selector服務accept,其他selector按照core-1分配線程數運行
          3 accept selector作為生產者把獲得的請求放入隊列
          4 某個selector作為消費者從blocking queue中取出請求socket channel,并向自己注冊
          5 當獲得read信號時,selector建立工作任務線程worker,并提交給系統線程池
          6 worker線程排隊后在線程池中執行,把協議頭讀入緩沖區,然后解析,處理,響應,關閉連接

          import java.io.FileInputStream;
          import java.io.IOException;
          import java.net.InetSocketAddress;
          import java.nio.ByteBuffer;
          import java.nio.channels.ClosedChannelException;
          import java.nio.channels.SelectionKey;
          import java.nio.channels.Selector;
          import java.nio.channels.ServerSocketChannel;
          import java.nio.channels.SocketChannel;
          import java.util.Date;
          import java.util.Iterator;
          import java.util.Set;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;

          public class NioServer {
              
          private static NioServer svr = new NioServer();
              
          private final int numProcessors = Runtime.getRuntime().availableProcessors();
              
          private static LinkedBlockingQueue<SocketChannel> queue = new LinkedBlockingQueue<SocketChannel>();
              
          private static ExecutorService es = Executors.newFixedThreadPool(3);
              
              
          private NioServer(){}
              
              
          public static NioServer getInstance(){return svr;}
              
              
          public void init(){
                  
          try {
                      
          for(int i=0;i<numProcessors - 1;i++){
                          Thread tk 
          = new Thread(new Talker());
                          tk.start();
                      }

                      
                      Selector acceptSelector 
          = Selector.open();

                      ServerSocketChannel ssc 
          = ServerSocketChannel.open();
                      ssc.configureBlocking(
          false);

                      InetSocketAddress isa 
          = new InetSocketAddress("127.0.0.1"8080);
                      ssc.socket().bind(isa);

                      ssc.register(acceptSelector, SelectionKey.OP_ACCEPT);

                      
          while (acceptSelector.select() > 0{
                          Set
          <SelectionKey> readyKeys = acceptSelector.selectedKeys();
                          Iterator
          <SelectionKey> i = readyKeys.iterator();

                          
          while (i.hasNext()) {
                              SelectionKey sk 
          = i.next();
                              i.remove();
                              
          if (sk.isAcceptable()) {
                                  ServerSocketChannel nextReady 
          = (ServerSocketChannel) sk
                                          .channel();
                                  SocketChannel s 
          = nextReady.accept();
                                  s.configureBlocking(
          false);
                                  queue.offer(s);
                              }

                          }

                      }

                  }
           catch (Exception e) {
                      e.printStackTrace();
                  }

              }

              
              
          private static class Talker implements Runnable{
                  
          private Selector se = null;

                  
                  
          public Talker(){
                      
          try {
                          
          this.se = Selector.open();
                      }
           catch (IOException e) {
                          e.printStackTrace();
                      }

                  }

                  
                  
          public void addChannelIfAvailable(){
                      
          try {
                          
          if(queue.isEmpty())
                              
          return;
                          SocketChannel sc 
          = queue.poll();
                          sc.register(se, SelectionKey.OP_READ);
                      }
           catch (ClosedChannelException e) {
                          e.printStackTrace();
                      }

                  }

                  
                  
          public void run() {
                      
          try {
                          
          while (true{
                              
          int skOps = se.select(20);
                              addChannelIfAvailable();
                              
          if(skOps <= 0){
                                  
          continue;
                              }

                              
                              Set
          <SelectionKey> readyKeys = se.selectedKeys();
                              Iterator
          <SelectionKey> i = readyKeys.iterator();
                              
          while (i.hasNext()) {
                                  SelectionKey sk 
          = i.next();
                                  i.remove();
                                  
                                  
          if (sk.isValid() && sk.isReadable()) {
                                      SocketChannel sc 
          = (SocketChannel) sk.channel();
                                      sc.configureBlocking(
          false);
                                      
                                      Worker worker 
          = new Worker(sc);
                                      es.execute(worker);    
                                  }

                              }

                              Thread.sleep(
          300);
                          }

                      }
           catch (IOException e) {
                          e.printStackTrace();
                      }
           catch (InterruptedException e) {
                          e.printStackTrace();
                      }

                  }

              }

              
              
          private static class Worker implements Runnable{
                  ByteBuffer bb 
          = ByteBuffer.allocateDirect(1024);
                  SocketChannel sc 
          = null;
                  
                  
          public Worker(SocketChannel sc){
                      Thread.currentThread().setName(
          this.toString());
                      
          this.sc = sc;
                  }

                  
                  
          public void run() {
                      
          try {
                          
          try{
                              sc.read(bb);
                          }
          catch(IOException e){
                              e.printStackTrace();
                              sc.finishConnect();
                              sc.close();
                              
          return;
                          }

                          bb.flip();
                          
          byte[] bs = new byte[bb.limit()];
                          bb.get(bs);
                          bb.clear();
                          
                          StringBuilder sb 
          = new StringBuilder();
                          sb.append(
          "HTTP/1.1 200 OK").append("\n").append("Date:" + new Date()).append("\n");
                          sb.append(
          "Server:NIO Server By Yi Yang\n\n");
                          bb.put(sb.toString().getBytes());
                          
                          bb.flip();
                          sc.write(bb);
                          bb.clear();
                          
                          FileInputStream is 
          = new FileInputStream("E:/test.html");
                          is.getChannel().transferTo(
          01024, sc);
                          
                          is.close();
                          sc.finishConnect();
                          sc.close();
                      }
           catch (IOException e) {
                          e.printStackTrace();
                      }

                  }

              }

              
              
          public static void main(String[] args) throws Exception {
                  NioServer server 
          = NioServer.getInstance();
                  server.init();
              }

          }
          ===============06/27/10 
          如何解析header?,以行為單位讀取,按照header敏感的關鍵字進行匹配 對于首行取得對方調用的方法GET/POST 地址 和協議版本 
          然后根據用戶的配置,和解析地址請求,獲得響應的servlet,并把通過反射+默認構造函數構造這個servlet,解析地址參數等設置到對象httpservletrequest和httpservletresponse中,然后通過反射invoke對應的get/post/put/delete等方法,并把封裝的兩個對象作為參數傳進去,同時在response的header中傳遞一個cookie作為session的依據。


           @2008 楊一. 版權所有. 保留所有權利

          posted on 2010-06-25 19:19 楊一 閱讀(1969) 評論(0)  編輯  收藏 所屬分類: Java SE

          <2010年6月>
          303112345
          6789101112
          13141516171819
          20212223242526
          27282930123
          45678910

          導航

          公告

          本人在blogjava上發表的文章及隨筆除特別聲明外均為原創或翻譯,作品受知識產權法保護并被授權遵從 知識分享協議:署名-非商業性使用-相同方式共享 歡迎轉載,請在轉載時注明作者姓名(楊一)及出處(www.aygfsteel.com/yangyi)
          /////////////////////////////////////////
          我的訪問者

          常用鏈接

          留言簿(5)

          隨筆分類(55)

          隨筆檔案(55)

          相冊

          Java

          其他技術

          生活

          最新隨筆

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          自強不息


          用心 - 珍惜時間,勇于創造
          主站蜘蛛池模板: 中卫市| 磴口县| 榆林市| 阳泉市| 习水县| 牙克石市| 海丰县| 天津市| 青铜峡市| 桃园市| 定襄县| 宜兴市| 新巴尔虎右旗| 通山县| 黄骅市| 务川| 巧家县| 庆元县| 哈巴河县| 遂宁市| 辽宁省| 东安县| 南涧| 郎溪县| 梁平县| 紫阳县| 浦东新区| 汶上县| 宜章县| 大关县| 汉中市| 新巴尔虎左旗| 始兴县| 黔西县| 申扎县| 思南县| 红原县| 江北区| 淮北市| 汤原县| 绥中县|