隨筆-23  評論-58  文章-0  trackbacks-0
          Reactor 模式的 JAVA NIO 多線程服務器,這是比較完善的一版了。Java 的 NIO 網絡模型實在是不好用,還是使用現成的好。
          public class NIOServer implements Runnable 
          {
              
          private static final Log log = LogFactory.getLog(NIOServer.class);

              
          private ExecutorService executor=null;
              
          private final Selector sel;
              
          private final ServerSocketChannel ssc;
              
          private HandleUtil ho;
              
              
          public NIOServer(int portnumber,HandleUtil ho) throws IOException
              
          {
                  
          this.ho=ho;
                  sel 
          = Selector.open();
                  ssc 
          = ServerSocketChannel.open();
                  ssc.socket().bind(
          new InetSocketAddress(portnumber));
                  ssc.configureBlocking(
          false);
                  ssc.register(sel,SelectionKey.OP_ACCEPT,
          new Acceptor());
              }

              
              
          public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException
              
          {
                  
          this.ho=ho;
                  
          this.executor=executor;
                  sel 
          = Selector.open();
                  ssc 
          = ServerSocketChannel.open();
                  ssc.socket().bind(
          new InetSocketAddress(portnumber));
                  ssc.configureBlocking(
          false);
                  ssc.register(sel,SelectionKey.OP_ACCEPT,
          new Acceptor());
              }

              
              @Override
              
          public void run()
              
          {
                  
          try
                  
          {
                      
          while(sel.isOpen())
                      
          {
                          
          int nKeys=sel.select(100);
                          
          if(nKeys==0)
                              Thread.sleep(
          100);
                          
          else if(nKeys>0)
                          
          {
                              Iterator
          <SelectionKey> it = sel.selectedKeys().iterator();
                              
          while (it.hasNext()) 
                              
          {
                                  SelectionKey sk 
          = it.next();
                                  it.remove();
                                  
          if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())
                                  
          {
                                      Runnable r 
          = (Runnable)sk.attachment();
                                      r.run();
                                  }

                              }

                          }

                      }

                  }

                  
          catch(IOException | InterruptedException e)        { log.info(ExceptionUtil.getExceptionMessage(e));    }
              }

              
              
          class Acceptor implements Runnable 
              
          {
                  @Override
                  
          public void run() 
                  
          {
                      
          try
                      
          {
                          SocketChannel sc 
          = ssc.accept();
                          
          if (sc != null)
                          
          {
                              sc.configureBlocking(
          false);
                              sc.socket().setTcpNoDelay(
          true);
                              sc.socket().setSoLinger(
          false-1);
                              SelectionKey sk
          =sc.register(sel, SelectionKey.OP_READ);
                              sk.attach(
          new Reader(sk));
                              sel.wakeup();
                          }

                      }

                      
          catch(IOException e) { log.info(ExceptionUtil.getExceptionMessage(e)); }
                  }

              }

              
              
          class Reader implements Runnable 
              
          {
                  
          private byte[] bytes=new byte[0];
                  
          private SelectionKey sk;
                  
                  
          public Reader(SelectionKey sk)
                  
          {
                      
          this.sk=sk;
                  }

                  
                  @Override
                  
          public void run()
                  
          {
                      
          try
                      
          {
                          SocketChannel sc 
          = (SocketChannel) sk.channel();
                          Handle handle
          =null;
                          
          if(ho.getParameterTypes()==null)
                              handle
          =(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
                          
          else
                              handle
          =(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
                          handle.setSocketChannel(sc);
                          ByteBuffer buffer
          =ByteBuffer.allocate(1024);
                          
          int len=-1;
                          
          while(sc.isConnected() && (len=sc.read(buffer))>0)
                          
          {
                              buffer.flip();
                                
          byte [] content = new byte[buffer.limit()];
                              buffer.get(content);
                              bytes
          =StringUtil.arrayCoalition(bytes,content);
                              buffer.clear();
                          }

                          
          if(len==0)
                          
          {
                              
          if(executor==null)
                              
          {
                                  
          byte[] bb=handle.execute(bytes);
                                  sk.interestOps(SelectionKey.OP_WRITE);
                                  sk.attach(
          new Writer(sk,ByteBuffer.wrap(bb)));
                                  sk.selector().wakeup();
                              }

                              
          else
                              
          {
                                  handle.setData(bytes);
                                  Future
          <byte[]> future=executor.submit(handle);
                                  sk.interestOps(SelectionKey.OP_WRITE);
                                  sk.attach(
          new Writer(sk,future));
                                  sk.selector().wakeup();
                              }

                          }

                          
          else if(len==-1)
                          
          {
                              sk.cancel();
                              sk.selector().selectNow();
                              sc.close();
                          }

                      }

                      
          catch(Exception e)
                      
          {
                          sk.cancel();
                          log.info(ExceptionUtil.getExceptionMessage(e));
                      }

                  }

              }

              
              
          public class Writer implements Runnable 
              
          {
                  
          private SelectionKey sk;
                  
          private ByteBuffer output;
                  
                  
          public Writer(SelectionKey sk,ByteBuffer output)
                  
          {
                      
          this.sk=sk;
                      
          this.output=output;
                  }

                  
                  
          public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException
                  
          {
                      
          this.sk=sk;
                      
          this.output=ByteBuffer.wrap(future.get());
                  }

                  
                  @Override
                  
          public void run()
                  
          {
                      SocketChannel sc 
          = (SocketChannel) sk.channel();
                      
          try
                      
          {
                          
          while(sc.isConnected() && output.hasRemaining())
                          
          {
                              
          int len=sc.write(output);
                              
          if(len<0)
                                  
          throw new EOFException();
                              
          else if(len==-1)
                              
          {
                                  sk.cancel();
                                  sk.selector().selectNow();
                                  sc.close();
                              }

                          }

                          
          if(!output.hasRemaining())
                          
          {
                              output.clear();
                              sk.interestOps(SelectionKey.OP_READ);
                              sk.attach(
          new Reader(sk));
                              sk.selector().wakeup();
                          }

                      }

                      
          catch(Exception e)
                      
          {
                          sk.cancel();
                          log.info(ExceptionUtil.getExceptionMessage(e));
                      }

                  }

              }

              
              
          public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException
              
          {
                  SelectionKey sk
          =sc.register(sel, SelectionKey.OP_WRITE);
                  sk.attach(
          new Writer(sk,ByteBuffer.wrap(bytes)));
                  sel.wakeup();
              }

          }


          posted on 2013-05-14 16:31 nianzai 閱讀(2730) 評論(1)  編輯  收藏 所屬分類: NIO

          評論:
          # re: JAVA NIO 多線程服務器 1.3版 [未登錄] 2013-09-18 15:34 | z
          Handle 這個方法里面寫的是什么處理呢?能否也貼出來看看  回復  更多評論
            
          主站蜘蛛池模板: 福泉市| 永昌县| 外汇| 丰城市| 清水河县| 邯郸县| 芜湖县| 隆林| 奈曼旗| 新田县| 那曲县| 罗城| 曲麻莱县| 青冈县| 古交市| 连州市| 溆浦县| 信丰县| 湘潭县| 芦山县| 新野县| 宜章县| 莱西市| 武陟县| 荆州市| 晋州市| 仁化县| 唐海县| 甘肃省| 安吉县| 大同市| 益阳市| 澄城县| 台前县| 乌鲁木齐县| 宜兴市| 东乡| 阿荣旗| 阜新市| 瑞昌市| 荔浦县|