E81086713E446D36F62B2AA2A3502B5EB155

          Java雜家

          雜七雜八。。。一家之言

          BlogJava 首頁 新隨筆 聯系 聚合 管理
            40 Posts :: 1 Stories :: 174 Comments :: 0 Trackbacks
          前一篇博客,我簡單提了下怎么為NIO2增加TransmitFile支持,文件傳送吞吐量是一個性能關注點,此外,并發連接數也是重要的關注點。

          不過JDK7中又一次做了簡單的實現,不支持同時投遞多個AcceptEx請求,只支持一次一個,返回后再投遞。這樣,客戶端連接的接受速度必然大打折扣。不知道為什么sun會做這樣的實現,WSASend()/WSAReceive()一次只允許一個還是可以理解,畢竟簡化了編程,不用考慮封包亂序問題。
          也降低了內存耗盡的風險。AcceptEx卻沒有這樣的理由了。

          于是再一次為了性能,我增加了同時投遞多個的支持。

          另外,在JDK7的默認實現中,AcceptEx返回后,為了設置遠程和本地InetSocketAddress也采用了效率很低的方法。4次通過JNI調用getsockname,2次為了取sockaddr,2次為了取port. 這些操作本人采用GetAcceptExSockaddrs一次完成,進一步提高效率。


          先看Java部分的代碼,框架跟JDK7的一樣,細節處理不一樣:

          /**
           * 
           
          */
          package sun.nio.ch;

          import java.io.IOException;
          import java.lang.reflect.Field;
          import java.lang.reflect.Method;
          import java.net.InetAddress;
          import java.net.InetSocketAddress;
          import java.nio.channels.AcceptPendingException;
          import java.nio.channels.AsynchronousCloseException;
          import java.nio.channels.AsynchronousServerSocketChannel;
          import java.nio.channels.AsynchronousSocketChannel;
          import java.nio.channels.ClosedChannelException;
          import java.nio.channels.CompletionHandler;
          import java.nio.channels.NotYetBoundException;
          import java.nio.channels.ShutdownChannelGroupException;
          import java.security.AccessControlContext;
          import java.security.AccessController;
          import java.security.PrivilegedAction;
          import java.util.Queue;
          import java.util.concurrent.ConcurrentLinkedQueue;
          import java.util.concurrent.Future;
          import java.util.concurrent.atomic.AtomicBoolean;
          import java.util.concurrent.atomic.AtomicInteger;

          import sun.misc.Unsafe;

          /**
           * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
           * 
          @author Yvon
           *
           
          */
          public class WindowsMultiAcceptSupport {

              WindowsAsynchronousServerSocketChannelImpl schannel;

              
          private static final Unsafe unsafe = Unsafe.getUnsafe();

              
          // 2 * (sizeof(SOCKET_ADDRESS) + 16)
              private static final int ONE_DATA_BUFFER_SIZE = 88;

              
          private long handle;
              
          private Iocp iocp;

              
          // typically there will be zero, or one I/O operations pending. In rare
              
          // cases there may be more. These rare cases arise when a sequence of accept
              
          // operations complete immediately and handled by the initiating thread.
              
          // The corresponding OVERLAPPED cannot be reused/released until the completion
              
          // event has been posted.
              private PendingIoCache ioCache;

              
          private Queue<Long> dataBuffers;
              
          // the data buffer to receive the local/remote socket address
              
          //        private final long dataBuffer;

              
          private AtomicInteger pendingAccept;
              
          private int maxPending;

              Method updateAcceptContextM;
              Method acceptM;

              WindowsMultiAcceptSupport() {
                  
          //dummy for JNI code.
              }

              
          public void close() throws IOException {

                  schannel.close();

                  
          for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
                  {
                      
          long addr = dataBuffers.poll();
                      
          // release  resources
                      unsafe.freeMemory(addr);
                  }

              }

              
          /**
               * 
               
          */
              
          public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
                  
          if (maxPost <= 0 || maxPost > 1024)
                      
          throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
                  
          this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
                  maxPending 
          = maxPost;
                  dataBuffers 
          = new ConcurrentLinkedQueue<Long>();
                  
          for (int i = 0; i < maxPending + 1; i++) {
                      dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
                  }

                  pendingAccept 
          = new AtomicInteger(0);
                  
          try {
                      Field f 
          = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
                      f.setAccessible(
          true);
                      handle 
          = f.getLong(schannel);


                      f 
          = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
                      f.setAccessible(
          true);
                      iocp 
          = (Iocp) f.get(schannel);

                      f 
          = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
                      f.setAccessible(
          true);
                      ioCache 
          = (PendingIoCache) f.get(schannel);

                      f 
          = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
                      f.setAccessible(
          true);
                      AtomicBoolean accepting 
          = (AtomicBoolean) f.get(schannel);

                      accepting.set(
          true);//disable accepting by origin channel.

                  } 
          catch (Exception e) {
                      e.printStackTrace();
                  }

              }

              @SuppressWarnings(
          "unchecked")
              
          public final <A> void accept(A attachment,
                  CompletionHandler
          <AsynchronousSocketChannel, ? super A> handler) {
                  
          if (handler == null)
                      
          throw new NullPointerException("'handler' is null");
                  implAccept(attachment, (CompletionHandler
          <AsynchronousSocketChannel, Object>) handler);
              }

              
          /**
               * Task to initiate accept operation and to handle result.
               
          */
              
          private class AcceptTask implements Runnable, Iocp.ResultHandler {

                  
          private final WindowsAsynchronousSocketChannelImpl channel;
                  
          private final AccessControlContext acc;
                  
          private final PendingFuture<AsynchronousSocketChannel, Object> result;
                  
          private final long dataBuffer;

                  AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
                      
          long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
                      
          this.channel = channel;
                      
          this.acc = acc;
                      
          this.result = result;
                      
          this.dataBuffer = dataBuffer;
                  }

                  
          void enableAccept() {
                      pendingAccept.decrementAndGet();
                      dataBuffers.add(dataBuffer);
                  }

                  
          void closeChildChannel() {
                      
          try {
                          channel.close();
                      } 
          catch (IOException ignore) {
                      }
                  }

                  
          // caller must have acquired read lock for the listener and child channel.
                  void finishAccept() throws IOException {
                      
          /**
                       * JDK7 use 4 calls to getsockname  to setup
                       * local& remote address, this is very inefficient.
                       * 
                       * I change this to use GetAcceptExSockaddrs
                       
          */

                      InetAddress[] socks 
          = new InetAddress[2];
                      
          int[] ports = new int[2];
                      updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
                      InetSocketAddress local 
          = new InetSocketAddress(socks[0], ports[0]);
                      
          final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
                      channel.setConnected(local, remote);

                      
          // permission check (in context of initiating thread)
                      if (acc != null) {
                          AccessController.doPrivileged(
          new PrivilegedAction<Void>() {

                              
          public Void run() {
                                  SecurityManager sm 
          = System.getSecurityManager();
                                  sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

                                  
          return null;
                              }
                          }, acc);
                      }
                  }

                  
          /**
                   * Initiates the accept operation.
                   
          */
                  @Override
                  
          public void run() {
                      
          long overlapped = 0L;

                      
          try {
                          
          // begin usage of listener socket
                          schannel.begin();
                          
          try {
                              
          // begin usage of child socket (as it is registered with
                              
          // completion port and so may be closed in the event that
                              
          // the group is forcefully closed).
                              channel.begin();

                              
          synchronized (result) {
                                  overlapped 
          = ioCache.add(result);

                                
                                  
          int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
                                  if (n == IOStatus.UNAVAILABLE) {
                                      
          return;
                                  }

                                  
          // connection accepted immediately
                                  finishAccept();

                                  
          // allow another accept before the result is set
                                  enableAccept();
                                  result.setResult(channel);
                              }
                          } 
          finally {
                              
          // end usage on child socket
                              channel.end();
                          }
                      } 
          catch (Throwable x) {
                          
          // failed to initiate accept so release resources
                          if (overlapped != 0L)
                              ioCache.remove(overlapped);
                          closeChildChannel();
                          
          if (x instanceof ClosedChannelException)
                              x 
          = new AsynchronousCloseException();
                          
          if (!(x instanceof IOException) && !(x instanceof SecurityException))
                              x 
          = new IOException(x);
                          enableAccept();
                          result.setFailure(x);
                      } 
          finally {
                          
          // end of usage of listener socket
                          schannel.end();
                      }

                      
          // accept completed immediately but may not have executed on
                      
          // initiating thread in which case the operation may have been
                      
          // cancelled.
                      if (result.isCancelled()) {
                          closeChildChannel();
                      }

                      
          // invoke completion handler
                      Invoker.invokeIndirectly(result);
                  }

                  
          /**
                   * Executed when the I/O has completed
                   
          */
                  @Override
                  
          public void completed(int bytesTransferred, boolean canInvokeDirect) {
                      
          try {
                          
          // connection accept after group has shutdown
                          if (iocp.isShutdown()) {
                              
          throw new IOException(new ShutdownChannelGroupException());
                          }

                          
          // finish the accept
                          try {
                              schannel.begin();
                              
          try {
                                  channel.begin();
                                  finishAccept();
                              } 
          finally {
                                  channel.end();
                              }
                          } 
          finally {
                              schannel.end();
                          }

                          
          // allow another accept before the result is set
                          enableAccept();
                          result.setResult(channel);
                      } 
          catch (Throwable x) {
                          enableAccept();
                          closeChildChannel();
                          
          if (x instanceof ClosedChannelException)
                              x 
          = new AsynchronousCloseException();
                          
          if (!(x instanceof IOException) && !(x instanceof SecurityException))
                              x 
          = new IOException(x);
                          result.setFailure(x);
                      }

                      
          // if an async cancel has already cancelled the operation then
                      
          // close the new channel so as to free resources
                      if (result.isCancelled()) {
                          closeChildChannel();
                      }

                      
          // invoke handler (but not directly)
                      Invoker.invokeIndirectly(result);
                  }

                  @Override
                  
          public void failed(int error, IOException x) {
                      enableAccept();
                      closeChildChannel();

                      
          // release waiters
                      if (schannel.isOpen()) {
                          result.setFailure(x);
                      } 
          else {
                          result.setFailure(
          new AsynchronousCloseException());
                      }
                      Invoker.invokeIndirectly(result);
                  }
              }

              Future
          <AsynchronousSocketChannel> implAccept(Object attachment,
                  
          final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
                  
          if (!schannel.isOpen()) {
                      Throwable exc 
          = new ClosedChannelException();
                      
          if (handler == null)
                          
          return CompletedFuture.withFailure(exc);
                      Invoker.invokeIndirectly(schannel, handler, attachment, 
          null, exc);
                      
          return null;
                  }
                  
          if (schannel.isAcceptKilled())
                      
          throw new RuntimeException("Accept not allowed due to cancellation");

                  
          // ensure channel is bound to local address
                  if (schannel.localAddress == null)
                      
          throw new NotYetBoundException();

                  
          // create the socket that will be accepted. The creation of the socket
                  
          // is enclosed by a begin/end for the listener socket to ensure that
                  
          // we check that the listener is open and also to prevent the I/O
                  
          // port from being closed as the new socket is registered.
                  WindowsAsynchronousSocketChannelImpl ch = null;
                  IOException ioe 
          = null;
                  
          try {
                      schannel.begin();
                      ch 
          = new WindowsAsynchronousSocketChannelImpl(iocp, false);
                  } 
          catch (IOException x) {
                      ioe 
          = x;
                  } 
          finally {
                      schannel.end();
                  }
                  
          if (ioe != null) {
                      
          if (handler == null)
                          
          return CompletedFuture.withFailure(ioe);
                      Invoker.invokeIndirectly(
          this.schannel, handler, attachment, null, ioe);
                      
          return null;
                  }

                  
          // need calling context when there is security manager as
                  
          // permission check may be done in a different thread without
                  
          // any application call frames on the stack
                  AccessControlContext acc =
                      (System.getSecurityManager() 
          == null? null : AccessController.getContext();

                  PendingFuture
          <AsynchronousSocketChannel, Object> result =
                      
          new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);

                  
          // check and set flag to prevent concurrent accepting
                  if (pendingAccept.get() >= maxPending)
                      
          throw new AcceptPendingException();
                  pendingAccept.incrementAndGet();
                  AcceptTask task 
          = new AcceptTask(ch, acc, dataBuffers.poll(), result);
                  result.setContext(task);

                  
          // initiate I/O
                  if (Iocp.supportsThreadAgnosticIo()) {
                      task.run();
                  } 
          else {
                      Invoker.invokeOnThreadInThreadPool(
          this.schannel, task);
                  }
                  
          return result;
              }

              
          //    //reimplements for performance
              static native void updateAcceptContext(long listenSocket, long acceptSocket,
                  InetAddress[] addresses, 
          int[] ports, long dataBuffer) throws IOException;

              
          static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

          }


          對應的CPP代碼如下:


          /*
           * Class:     sun_nio_ch_WindowsMultiAcceptSupport
           * Method:    updateAcceptContext
           * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
           
          */
          JNIEXPORT 
          void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
          (JNIEnv 
          *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
          {
              SOCKET s1 
          = (SOCKET)jlong_to_ptr(listenSocket);
              SOCKET s2 
          = (SOCKET)jlong_to_ptr(acceptSocket);
              PVOID outputBuffer 
          = (PVOID)jlong_to_ptr(buf);
              INT iLocalAddrLen
          =0;
              INT iRemoteAddrLen
          =0;
              SOCKETADDRESS
          * lpLocalAddr;
              SOCKETADDRESS
          * lpRemoteAddr;
              jobject localAddr;
              jobject remoteAddr;
              jint ports[
          2]={0};

              

              setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (
          char *)&s1, sizeof(s1));

              (lpGetAcceptExSockaddrs)(outputBuffer,
                  
          0,
                  
          sizeof(SOCKETADDRESS)+16,
                  
          sizeof(SOCKETADDRESS)+16,
                  (LPSOCKADDR
          *)&lpLocalAddr,
                  
          &iLocalAddrLen,
                  (LPSOCKADDR
          *)&lpRemoteAddr,
                  
          &iRemoteAddrLen);

              localAddr
          =lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
              remoteAddr
          =lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

              env
          ->SetObjectArrayElement(sockArray,0,localAddr);
              env
          ->SetObjectArrayElement(sockArray,1,remoteAddr);
              env
          ->SetIntArrayRegion(portArray,0,2,ports);

          }

          /*
           * Class:     sun_nio_ch_WindowsMultiAcceptSupport
           * Method:    accept0
           * Signature: (JJJJ)I
           
          */
          jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
            (JNIEnv 
          *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
          {

              BOOL res;
              SOCKET s1 
          = (SOCKET)jlong_to_ptr(listenSocket);
              SOCKET s2 
          = (SOCKET)jlong_to_ptr(acceptSocket);
              PVOID outputBuffer 
          = (PVOID)jlong_to_ptr(buf);

              DWORD nread 
          = 0;
              OVERLAPPED
          * lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
              ZeroMemory((PVOID)lpOverlapped, 
          sizeof(OVERLAPPED));

              

              
          //why use SOCKETADDRESS?
              
          //because client may use IPv6 to connect to server.
              res = (lpAcceptEx)(s1,
                  s2,
                  outputBuffer,
                  
          0,
                  
          sizeof(SOCKETADDRESS)+16,
                  
          sizeof(SOCKETADDRESS)+16,
                  
          &nread,
                  lpOverlapped);

              
              
          if (res == 0) {
                  
          int error = WSAGetLastError();
                  
                  
          if (error == ERROR_IO_PENDING) {
                      
                      
          return NIO2_IOS_UNAVAILABLE;
                  }
              
              
                  
          return NIO2_THROWN;
              }



              
              
          return 0;

          }

          這里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,從DLL里加載。相應代碼如下:

          *
           
          * Class:     com_yovn_jabhttpd_utilities_SunPackageFixer
           
          * Method:    initFds
           
          * Signature: ()V
           
          */
          JNIEXPORT 
          void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
            (JNIEnv 
          *env, jclass clazz)
          {


              GUID GuidAcceptEx 
          = WSAID_ACCEPTEX;
              GUID GuidTransmitFile 
          = WSAID_TRANSMITFILE;
              GUID GuidGetAcceptExSockAddrs 
          = WSAID_GETACCEPTEXSOCKADDRS;
              SOCKET s;
              
          int rv;
              DWORD dwBytes;
              HMODULE hModule;


              s 
          = socket(AF_INET, SOCK_STREAM, 0);
              
          if (s == INVALID_SOCKET) {
                  JNU_ThrowByName(env,
          "java/io/IOException""socket failed");
                  
          return;
              }
              rv 
          = WSAIoctl(s,
                  SIO_GET_EXTENSION_FUNCTION_POINTER,
                  (LPVOID)
          &GuidAcceptEx,
                  
          sizeof(GuidAcceptEx),
                  
          &lpAcceptEx,
                  
          sizeof(lpAcceptEx),
                  
          &dwBytes,
                  NULL,
                  NULL);
              
          if (rv != 0)
              {
                  JNU_ThrowByName(env, 
          "java/io/IOException","WSAIoctl failed on get AcceptEx ");
                  
          goto _ret;
              }
              rv 
          = WSAIoctl(s,
                  SIO_GET_EXTENSION_FUNCTION_POINTER,
                  (LPVOID)
          &GuidTransmitFile,
                  
          sizeof(GuidTransmitFile),
                  
          &lpTransmitFile,
                  
          sizeof(lpTransmitFile),
                  
          &dwBytes,
                  NULL,
                  NULL);
              
          if (rv != 0)
              {
                  JNU_ThrowByName(env, 
          "java/io/IOException","WSAIoctl failed on get TransmitFile");
                  
          goto _ret;
              }
              rv 
          = WSAIoctl(s,
                  SIO_GET_EXTENSION_FUNCTION_POINTER,
                  (LPVOID)
          &GuidGetAcceptExSockAddrs,
                  
          sizeof(GuidGetAcceptExSockAddrs),
                  
          &lpGetAcceptExSockaddrs,
                  
          sizeof(lpGetAcceptExSockaddrs),
                  
          &dwBytes,
                  NULL,
                  NULL);
              
          if (rv != 0)
              {
                  JNU_ThrowByName(env, 
          "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
                  
          goto _ret;
              }

              hModule
          =LoadLibrary("net.dll");
              
          if(hModule==NULL)
              {
                  JNU_ThrowByName(env, 
          "java/io/IOException","can't load java net.dll");
                  
          goto _ret;
              }


              lpNET_SockaddrToInetAddress
          =(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");

              
          if(lpNET_SockaddrToInetAddress==NULL)
              {
                  JNU_ThrowByName(env, 
          "java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");
                  
                  
              }

          _ret:
              closesocket(s);
              
          return;


          }

          細心的同學可能會發現,在創建socket之前沒有初始化WinSock庫,因為在這段代碼前,我初始化了一個InetSocketAddress對象,這樣JVM會加載NET.DLL并初始化WinSock庫了。

          OK,現在,你可以在支持類上同時發起多個AcceptEx請求了。

          PS:基于這個我簡單測試了下我的服務器,同時開5000個線程,每個下載3M多點的文件,一分鐘內能夠全部正確完成。
          服務器正在開發中,有興趣的請加入:http://code.google.com/p/jabhttpd


          posted on 2009-12-04 17:57 DoubleH 閱讀(3898) 評論(6)  編輯  收藏

          Feedback

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-04-23 20:54 java
          很想看看牛人的實現,但報403,說我沒有權限,可否傳一份給我.郵箱wahel30615571@hotmail.com  回復  更多評論
            

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-11-16 16:37 隨意
          哥們,代碼能發到zhouchn8@163.com一份嗎,謝謝  回復  更多評論
            

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-12-19 10:47 VV
          老師 代碼給一份吧,,,, vanlin (AT) 139 dot com  回復  更多評論
            

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-02-05 14:39 夢想在飛
          兄弟:給我一份,
          544286609@qq.com 謝謝  回復  更多評論
            

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-03-12 21:14 為夢想奔跑
          老師,能否發份代碼asshp1790@126.com,謝謝  回復  更多評論
            

          # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-05-08 17:05 xingye
          老師,你好,可以給我一份代碼嗎?liangxingye@163.com,謝謝  回復  更多評論
            


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 沂南县| 龙胜| 东城区| 卢湾区| 垣曲县| 中西区| 织金县| 呼图壁县| 商水县| 博白县| 巴彦淖尔市| 郧西县| 双峰县| 施甸县| 会泽县| 敦煌市| 堆龙德庆县| 新源县| 鲁甸县| 宾阳县| 隆回县| 叙永县| 呼玛县| 河间市| 富民县| 台前县| 开鲁县| 宁城县| 山阳县| 龙里县| 阿拉善右旗| 阿拉善左旗| 泸水县| 阳城县| 铜梁县| 嵩明县| 南木林县| 甘南县| 全椒县| 合山市| 东乡族自治县|