隨筆-23  評論-58  文章-0  trackbacks-0

          JAVA NIO 多線程服務(wù)器是 Nut (lucene + hadoop 分布式搜索運(yùn)行框架)  Nut Search層封裝代碼


          public interface Reactor 
          {
              
          void execute(SelectionKey key);
          }


          public class Reader implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Reader.class);
              
              
          private byte[] bytes=new byte[0];
              
          private ExecutorService executor;
              
              
          public Reader(ExecutorService executor)
              
          {
                  
          this.executor=executor;
              }

              
              @Override
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();

                  
          try
                  
          {
                      ByteBuffer buffer
          =ByteBuffer.allocate(1024);
                      
          int len=-1;
                      
          while((len=sc.read(buffer))>0)
                      
          {
                          buffer.flip();
                            
          byte [] content = new byte[buffer.limit()];
                          buffer.get(content);
                          bytes
          =NutUtil.ArrayCoalition(bytes,content);
                          buffer.clear();
                      }

                      
          if(len==0)
                          key.interestOps(SelectionKey.OP_READ);
                      
          else
                      
          {
                          Callable
          <byte[]> call=new ProcessCallable(bytes);
                          Future
          <byte[]> task=executor.submit(call);
                          ByteBuffer output
          =ByteBuffer.wrap(task.get());
                          sc.register(key.selector(), SelectionKey.OP_WRITE, 
          new Writer(output));
                      }

                  }

                  
          catch(Exception e)
                  
          {
                      log.info(e);
                  }

              }

          }


          public class Writer implements Reactor 
          {
              
          private static final Log log = LogFactory.getLog(Writer.class);
              
              
          private ByteBuffer output;
              
              
          public Writer(ByteBuffer output)
              
          {
                  
          this.output=output;
              }

              
              
          public void execute(SelectionKey key)
              
          {
                  SocketChannel sc 
          = (SocketChannel) key.channel();
                  
          try
                  
          {
                      
          while(output.hasRemaining())
                      
          {
                          
          int len=sc.write(output);
                          
          if(len<0)
                          

                              
          throw new EOFException(); 
                          }
           
                          
          if(len==0
                          

                              key.interestOps(SelectionKey.OP_WRITE); 
                              key.selector().wakeup(); 
                              
          break
                          }

                      }

                      
          if(!output.hasRemaining())
                      
          {
                          output.clear();
                          key.cancel();
                          sc.close();
                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

          }


          public class MiniServer
          {
              
          private static final Log log = LogFactory.getLog(MiniServer.class);
              
              
          private final Selector s;
              
          private final ServerSocketChannel ssc;
              
          private ExecutorService executor;
              
              
          private static Map<String,Long> map=new TreeMap<String,Long>();//保存不能正確完成的SelectionKey
              private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
              
              
          public MiniServer(int portnumber,ExecutorService executor) throws IOException
              
          {
                  scheduled.scheduleAtFixedRate(task,
          10,10,TimeUnit.MINUTES);//每10分鐘清空一次map
                  this.executor=executor;
                  s 
          = Selector.open();
                  ssc 
          = ServerSocketChannel.open();
                  ssc.socket().bind(
          new InetSocketAddress(portnumber));
                  ssc.configureBlocking(
          false);
                  ssc.register(s,SelectionKey.OP_ACCEPT);
              }

              
              
          public void execute()
              
          {
                  
          try
                  
          {
                      
          while(s.isOpen())
                      
          {
                          
          int nKeys=s.select();
                          
          if(nKeys==0)
                          
          {
                              
          for (SelectionKey key : s.keys())
                              
          {
                                  log.info(
          "channel " + key.channel() + " waiting for " + key.interestOps());
                                  
          //如果超過2分鐘就廢除
                                  if(map.containsKey(key.toString()))
                                  
          {
                                      Long t
          = map.get(key.toString());
                                      
          if((NutUtil.now()-t)>200);
                                      
          {
                                          map.remove(key.toString());
                                          s.keys().remove(key);
                                          key.cancel();
                                      }

                                  }

                                  
          else
                                  
          {
                                      map.put(key.toString(), NutUtil.now());
                                  }

                              }

                              
          continue;
                          }

                          
                          Iterator
          <SelectionKey> it = s.selectedKeys().iterator();  
                          
          while (it.hasNext()) 
                          
          {
                              SelectionKey key 
          = it.next();
                              it.remove();
                              
          if (!key.isValid() || !key.channel().isOpen())
                                  
          continue;
                              
          if(key.isAcceptable())
                              
          {
                                  SocketChannel sc 
          = ssc.accept();
                                  
          if (sc != null)
                                  
          {
                                      sc.configureBlocking(
          false);
                                      sc.register(s, SelectionKey.OP_READ, 
          new Reader(executor));
                                  }

                              }

                              
          else if(key.isReadable()||key.isWritable())
                              
          {
                                  Reactor reactor 
          = (Reactor) key.attachment();
                                  reactor.execute(key);
                              }

                          }

                      }

                  }

                  
          catch(IOException e)
                  
          {
                      log.info(e);
                  }

              }

              
              Runnable task 
          = new Runnable()
              
          {
                  
          public void run()
                  
          {
                      map.clear();
                  }

              }
          ;
          }
          posted on 2010-07-26 11:31 nianzai 閱讀(2708) 評論(2)  編輯  收藏 所屬分類: NIO

          評論:
          # re: JAVA NIO 多線程服務(wù)器 1.1版 2010-07-26 20:34 | intex充氣床
          謝謝!  回復(fù)  更多評論
            
          # re: JAVA NIO 多線程服務(wù)器 1.1版 2013-02-02 16:11 | jnan77
          ProcessCallable 這是什么包的呢  回復(fù)  更多評論
            

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 伊通| 铁岭县| 巴中市| 和田市| 丹东市| 古丈县| 图木舒克市| 怀安县| 徐汇区| 临高县| 历史| 读书| 化隆| 新余市| 武汉市| 沙河市| 承德市| 泰来县| 彩票| 灵台县| 汽车| 祁阳县| 凌源市| 五台县| 屏边| 江安县| 张家港市| 屏东市| 修武县| 临泉县| 龙里县| 滨海县| 黑水县| 延边| 浮梁县| 大荔县| 隆林| 晋中市| 象山县| 周宁县| 金寨县|