隨筆-23  評論-58  文章-0  trackbacks-0
          Reactor 模式的 JAVA NIO 多線程服務器

          public class MiniServer extends Thread
          {
              
          private static final Log log = LogFactory.getLog(MiniServer.class);
              
              
          private final Selector s;
              
          private final ServerSocketChannel ssc;
              
          private ExecutorService executor;
              
              
          public MiniServer(int portnumber,ExecutorService executor) throws IOException
              
          {
                  
          this.executor=executor;
                  s 
          = Selector.open();
                  ssc 
          = ServerSocketChannel.open();
                  ssc.socket().bind(
          new InetSocketAddress(portnumber));
                  ssc.configureBlocking(
          false);
                  ssc.register(s,SelectionKey.OP_ACCEPT);
              }

              
              
          public void run()
              
          {
                  
          try
                  
          {
                      
          while(s.isOpen())
                      
          {
                          
          int nKeys=s.select();
                          
          if(nKeys>0)
                          
          {
                              Iterator
          <SelectionKey> it = s.selectedKeys().iterator();
                              
          while (it.hasNext()) 
                              
          {
                                  SelectionKey key 
          = it.next();
                                  it.remove();
                                  
          if (!key.isValid() || !key.channel().isOpen())
                                      
          continue;
                                  
          if(key.isAcceptable())
                                  
          {
                                      SocketChannel sc 
          = ssc.accept();
                                      
          if (sc != null)
                                      
          {
                                          sc.configureBlocking(
          false);
                                          sc.register(s, SelectionKey.OP_READ, 
          new Reader(executor));
                                      }

                                  }

                                  
          else if(key.isReadable()||key.isWritable())
                                  
          {
                                      Reactor reactor 
          = (Reactor) key.attachment();
                                      reactor.execute(key);
                                  }

                              }

                          }

                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

          }



          public interface Reactor 
          {
              
          void execute(SelectionKey key);
          }



          public class Reader implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Reader.class);
              
              
          private byte[] bytes=new byte[0];
              
          private ExecutorService executor;
              
              
          public Reader(ExecutorService executor)
              
          {
                  
          this.executor=executor;
              }

              
              @Override
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();
                  
          try
                  
          {
                      ByteBuffer buffer
          =ByteBuffer.allocate(1024);
                      
          int len=-1;
                      
          while(sc.isConnected() && (len=sc.read(buffer))>0)
                      
          {
                          buffer.flip();
                            
          byte [] content = new byte[buffer.limit()];
                          buffer.get(content);
                          bytes
          =NutUtil.ArrayCoalition(bytes,content);
                          buffer.clear();
                      }

                      
          if(len==0)
                      
          {
                          key.interestOps(SelectionKey.OP_READ);
                          key.selector().wakeup(); 
                      }

                      
          else if(len==-1)
                      
          {
                          Callable
          <byte[]> call=new ProcessCallable(bytes);
                          Future
          <byte[]> task=executor.submit(call);
                          ByteBuffer output
          =ByteBuffer.wrap(task.get());
                          sc.register(key.selector(), SelectionKey.OP_WRITE, 
          new Writer(output));
                      }

                  }

                  
          catch(Exception e)
                  
          {
                      log.info(e);
                  }

              }

          }



          public class Writer implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Writer.class);
              
              
          private ByteBuffer output;
              
              
          public Writer(ByteBuffer output)
              
          {
                  
          this.output=output;
              }

              
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();
                  
          try
                  
          {
                      
          while(sc.isConnected() && output.hasRemaining())
                      
          {
                          
          int len=sc.write(output);
                          
          if(len<0)
                          

                              
          throw new EOFException(); 
                          }
           
                          
          if(len==0
                          

                              key.interestOps(SelectionKey.OP_WRITE); 
                              key.selector().wakeup(); 
                              
          break
                          }

                      }

                      
          if(!output.hasRemaining())
                      
          {
                          output.clear();
                          key.cancel();
                          sc.close();
                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

          }

          posted on 2011-08-29 18:35 nianzai 閱讀(3103) 評論(3)  編輯  收藏 所屬分類: NIO

          評論:
          # re: JAVA NIO 多線程服務器 1.2版 2011-08-30 13:59 | seo千里眼
          這個多線程程序挺實用哦。  回復  更多評論
            
          # re: JAVA NIO 多線程服務器 1.2版 2011-09-03 16:13 | 阿不都外力
          收藏一下!以后看。。。  回復  更多評論
            
          # re: JAVA NIO 多線程服務器 1.2版 2011-09-05 23:54 | 步步為營
          Tomcat中用NIO比較多,搭建高性能服務器時NIO挺好用的,呵呵  回復  更多評論
            
          主站蜘蛛池模板: 沁水县| 新源县| 永福县| 上虞市| 金山区| 玉门市| 岐山县| 南昌县| 奎屯市| 山西省| 南澳县| 文安县| 封开县| 嵩明县| 延长县| 玛纳斯县| 金门县| 阆中市| 安岳县| 壶关县| 黎城县| 曲松县| 贡嘎县| 通城县| 蛟河市| 天峻县| 辛集市| 湟中县| 西安市| 田东县| 武强县| 犍为县| 视频| 南召县| 千阳县| 武清区| 万载县| 绵阳市| 于田县| 汽车| 宝清县|