隨筆-23  評論-58  文章-0  trackbacks-0
          Reactor 模式的 JAVA NIO 多線程服務器

          public class MiniServer extends Thread
          {
              
          private static final Log log = LogFactory.getLog(MiniServer.class);
              
              
          private final Selector s;
              
          private final ServerSocketChannel ssc;
              
          private ExecutorService executor;
              
              
          public MiniServer(int portnumber,ExecutorService executor) throws IOException
              
          {
                  
          this.executor=executor;
                  s 
          = Selector.open();
                  ssc 
          = ServerSocketChannel.open();
                  ssc.socket().bind(
          new InetSocketAddress(portnumber));
                  ssc.configureBlocking(
          false);
                  ssc.register(s,SelectionKey.OP_ACCEPT);
              }

              
              
          public void run()
              
          {
                  
          try
                  
          {
                      
          while(s.isOpen())
                      
          {
                          
          int nKeys=s.select();
                          
          if(nKeys>0)
                          
          {
                              Iterator
          <SelectionKey> it = s.selectedKeys().iterator();
                              
          while (it.hasNext()) 
                              
          {
                                  SelectionKey key 
          = it.next();
                                  it.remove();
                                  
          if (!key.isValid() || !key.channel().isOpen())
                                      
          continue;
                                  
          if(key.isAcceptable())
                                  
          {
                                      SocketChannel sc 
          = ssc.accept();
                                      
          if (sc != null)
                                      
          {
                                          sc.configureBlocking(
          false);
                                          sc.register(s, SelectionKey.OP_READ, 
          new Reader(executor));
                                      }

                                  }

                                  
          else if(key.isReadable()||key.isWritable())
                                  
          {
                                      Reactor reactor 
          = (Reactor) key.attachment();
                                      reactor.execute(key);
                                  }

                              }

                          }

                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

          }



          public interface Reactor 
          {
              
          void execute(SelectionKey key);
          }



          public class Reader implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Reader.class);
              
              
          private byte[] bytes=new byte[0];
              
          private ExecutorService executor;
              
              
          public Reader(ExecutorService executor)
              
          {
                  
          this.executor=executor;
              }

              
              @Override
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();
                  
          try
                  
          {
                      ByteBuffer buffer
          =ByteBuffer.allocate(1024);
                      
          int len=-1;
                      
          while(sc.isConnected() && (len=sc.read(buffer))>0)
                      
          {
                          buffer.flip();
                            
          byte [] content = new byte[buffer.limit()];
                          buffer.get(content);
                          bytes
          =NutUtil.ArrayCoalition(bytes,content);
                          buffer.clear();
                      }

                      
          if(len==0)
                      
          {
                          key.interestOps(SelectionKey.OP_READ);
                          key.selector().wakeup(); 
                      }

                      
          else if(len==-1)
                      
          {
                          Callable
          <byte[]> call=new ProcessCallable(bytes);
                          Future
          <byte[]> task=executor.submit(call);
                          ByteBuffer output
          =ByteBuffer.wrap(task.get());
                          sc.register(key.selector(), SelectionKey.OP_WRITE, 
          new Writer(output));
                      }

                  }

                  
          catch(Exception e)
                  
          {
                      log.info(e);
                  }

              }

          }



          public class Writer implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Writer.class);
              
              
          private ByteBuffer output;
              
              
          public Writer(ByteBuffer output)
              
          {
                  
          this.output=output;
              }

              
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();
                  
          try
                  
          {
                      
          while(sc.isConnected() && output.hasRemaining())
                      
          {
                          
          int len=sc.write(output);
                          
          if(len<0)
                          

                              
          throw new EOFException(); 
                          }
           
                          
          if(len==0
                          

                              key.interestOps(SelectionKey.OP_WRITE); 
                              key.selector().wakeup(); 
                              
          break
                          }

                      }

                      
          if(!output.hasRemaining())
                      
          {
                          output.clear();
                          key.cancel();
                          sc.close();
                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

          }

          posted on 2011-08-29 18:35 nianzai 閱讀(3103) 評論(3)  編輯  收藏 所屬分類: NIO

          評論:
          # re: JAVA NIO 多線程服務器 1.2版 2011-08-30 13:59 | seo千里眼
          這個多線程程序挺實用哦。  回復  更多評論
            
          # re: JAVA NIO 多線程服務器 1.2版 2011-09-03 16:13 | 阿不都外力
          收藏一下!以后看。。。  回復  更多評論
            
          # re: JAVA NIO 多線程服務器 1.2版 2011-09-05 23:54 | 步步為營
          Tomcat中用NIO比較多,搭建高性能服務器時NIO挺好用的,呵呵  回復  更多評論
            
          主站蜘蛛池模板: 金山区| 博野县| 曲沃县| 南川市| 那坡县| 丹寨县| 韶山市| 建水县| 湖南省| 兰州市| 连云港市| 光山县| 青龙| 克山县| 兴国县| 长海县| 上蔡县| 泗阳县| 余江县| 阳曲县| 永丰县| 保德县| 平顺县| 澄江县| 洪雅县| 固阳县| 威信县| 南汇区| 古蔺县| 衡水市| 阜康市| 青冈县| 大埔区| 寻乌县| 监利县| 拉孜县| 岚皋县| 桦甸市| 珲春市| 柳州市| 牟定县|