jinfeng_wang

          G-G-S,D-D-U!

          BlogJava 首頁 新隨筆 聯系 聚合 管理
            400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks

          ZooKeeper.class

           1 public String create(final String path, byte data[], List<ACL> acl,
           2             CreateMode createMode)
           3         throws KeeperException, InterruptedException
           4     {
           5         final String clientPath = path;
           6         PathUtils.validatePath(clientPath, createMode.isSequential());
           7 
           8         final String serverPath = prependChroot(clientPath);
           9 
          10         RequestHeader h = new RequestHeader();
          11         h.setType(ZooDefs.OpCode.create);
          12         CreateRequest request = new CreateRequest();
          13         CreateResponse response = new CreateResponse();
          14         request.setData(data);
          15         request.setFlags(createMode.toFlag());
          16         request.setPath(serverPath);
          17         if (acl != null && acl.size() == 0) {
          18             throw new KeeperException.InvalidACLException();
          19         }
          20         request.setAcl(acl);
          21         ReplyHeader r = cnxn.submitRequest(h, request, response, null);
          22         if (r.getErr() != 0) {
          23             throw KeeperException.create(KeeperException.Code.get(r.getErr()),
          24                     clientPath);
          25         }
          26         if (cnxn.chrootPath == null) {
          27             return response.getPath();
          28         } else {
          29             return response.getPath().substring(cnxn.chrootPath.length());
          30         }
          31     }


          ClientCnxn.class, 放到隊列中,循環等到packet標識位finished。

           1 public ReplyHeader submitRequest(RequestHeader h, Record request,
           2             Record response, WatchRegistration watchRegistration)
           3             throws InterruptedException {
           4         ReplyHeader r = new ReplyHeader();
           5         Packet packet = queuePacket(h, r, request, response, nullnullnull,
           6                     null, watchRegistration);
           7         synchronized (packet) {
           8             while (!packet.finished) {
           9                 packet.wait();
          10             }
          11         }
          12         return r;
          13     }


           1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
           2             Record response, AsyncCallback cb, String clientPath,
           3             String serverPath, Object ctx, WatchRegistration watchRegistration)
           4     {
           5         Packet packet = null;
           6 
           7         // Note that we do not generate the Xid for the packet yet. It is
           8         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
           9         // where the packet is actually sent.
          10         synchronized (outgoingQueue) {
          11             packet = new Packet(h, r, request, response, watchRegistration);
          12             packet.cb = cb;
          13             packet.ctx = ctx;
          14             packet.clientPath = clientPath;
          15             packet.serverPath = serverPath;
          16             if (!state.isAlive() || closing) {
          17                 conLossPacket(packet);
          18             } else {
          19                 // If the client is asking to close the session then
          20                 // mark as closing
          21                 if (h.getType() == OpCode.closeSession) {
          22                     closing = true;
          23                 }
          24                 outgoingQueue.add(packet);
          25             }
          26         }
          27         sendThread.getClientCnxnSocket().wakeupCnxn();
          28         return packet;
          29     }


          ClientCnxnSocket.class

           1 @Override
           2     void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
           3                      ClientCnxn cnxn)
           4             throws IOException, InterruptedException {
           5         selector.select(waitTimeOut);
           6         Set<SelectionKey> selected;
           7         synchronized (this) {
           8             selected = selector.selectedKeys();
           9         }
          10         // Everything below and until we get back to the select is
          11         // non blocking, so time is effectively a constant. That is
          12         // Why we just have to do this once, here
          13         updateNow();
          14         for (SelectionKey k : selected) {
          15             SocketChannel sc = ((SocketChannel) k.channel());
          16             if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
          17                 if (sc.finishConnect()) {
          18                     updateLastSendAndHeard();
          19                     sendThread.primeConnection();
          20                 }
          21             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
          22                 doIO(pendingQueue, outgoingQueue, cnxn);
          23             }
          24         }
          25         if (sendThread.getZkState().isConnected()) {
          26             synchronized(outgoingQueue) {
          27                 if (findSendablePacket(outgoingQueue,
          28                         cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
          29                     enableWrite();
          30                 }
          31             }
          32         }
          33         selected.clear();
          34     }



           1 void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
           2       throws InterruptedException, IOException {
           3         SocketChannel sock = (SocketChannel) sockKey.channel();
           4         if (sock == null) {
           5             throw new IOException("Socket is null!");
           6         }
           7         if (sockKey.isReadable()) {
           8             int rc = sock.read(incomingBuffer);
           9             if (rc < 0) {
          10                 throw new EndOfStreamException(
          11                         "Unable to read additional data from server sessionid 0x"
          12                                 + Long.toHexString(sessionId)
          13                                 + ", likely server has closed socket");
          14             }
          15             if (!incomingBuffer.hasRemaining()) {
          16                 incomingBuffer.flip();
          17                 if (incomingBuffer == lenBuffer) {
          18                     recvCount++;
          19                     readLength();
          20                 } else if (!initialized) {
          21                     readConnectResult();
          22                     enableRead();
          23                     if (findSendablePacket(outgoingQueue,
          24                             cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
          25                         // Since SASL authentication has completed (if client is configured to do so),
          26                         // outgoing packets waiting in the outgoingQueue can now be sent.
          27                         enableWrite();
          28                     }
          29                     lenBuffer.clear();
          30                     incomingBuffer = lenBuffer;
          31                     updateLastHeard();
          32                     initialized = true;
          33                 } else {
          34                     sendThread.readResponse(incomingBuffer);
          35                     lenBuffer.clear();
          36                     incomingBuffer = lenBuffer;
          37                     updateLastHeard();
          38                 }
          39             }
          40         }
          41         if (sockKey.isWritable()) {
          42             synchronized(outgoingQueue) {
          43                 Packet p = findSendablePacket(outgoingQueue,
          44                         cnxn.sendThread.clientTunneledAuthenticationInProgress());
          45 
          46                 if (p != null) {
          47                     updateLastSend();
          48                     // If we already started writing p, p.bb will already exist
          49                     if (p.bb == null) {
          50                         if ((p.requestHeader != null) &&
          51                                 (p.requestHeader.getType() != OpCode.ping) &&
          52                                 (p.requestHeader.getType() != OpCode.auth)) {
          53                             p.requestHeader.setXid(cnxn.getXid());
          54                         }
          55                         p.createBB();
          56                     }
          57                     sock.write(p.bb);
          58                     if (!p.bb.hasRemaining()) {
          59                         sentCount++;
          60                         outgoingQueue.removeFirstOccurrence(p);
          61                         if (p.requestHeader != null
          62                                 && p.requestHeader.getType() != OpCode.ping
          63                                 && p.requestHeader.getType() != OpCode.auth) {
          64                             synchronized (pendingQueue) {
          65                                 pendingQueue.add(p);
          66                             }
          67                         }
          68                     }
          69                 }
          70                 if (outgoingQueue.isEmpty()) {
          71                     // No more packets to send: turn off write interest flag.
          72                     // Will be turned on later by a later call to enableWrite(),
          73                     // from within ZooKeeperSaslClient (if client is configured
          74                     // to attempt SASL authentication), or in either doIO() or
          75                     // in doTransport() if not.
          76                     disableWrite();
          77                 } else if (!initialized && p != null && !p.bb.hasRemaining()) {
          78                     // On initial connection, write the complete connect request
          79                     // packet, but then disable further writes until after
          80                     // receiving a successful connection response.  If the
          81                     // session is expired, then the server sends the expiration
          82                     // response and immediately closes its end of the socket.  If
          83                     // the client is simultaneously writing on its end, then the
          84                     // TCP stack may choose to abort with RST, in which case the
          85                     // client would never receive the session expired event.  See
          86                     // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
          87                     disableWrite();
          88                 } else {
          89                     // Just in case
          90                     enableWrite();
          91                 }
          92             }
          93         }
          94     }



          SendThread.class

            1 void readResponse(ByteBuffer incomingBuffer) throws IOException {
            2             ByteBufferInputStream bbis = new ByteBufferInputStream(
            3                     incomingBuffer);
            4             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            5             ReplyHeader replyHdr = new ReplyHeader();
            6 
            7             replyHdr.deserialize(bbia, "header");
            8             if (replyHdr.getXid() == -2) {
            9                 // -2 is the xid for pings
           10                 if (LOG.isDebugEnabled()) {
           11                     LOG.debug("Got ping response for sessionid: 0x"
           12                             + Long.toHexString(sessionId)
           13                             + " after "
           14                             + ((System.nanoTime() - lastPingSentNs) / 1000000)
           15                             + "ms");
           16                 }
           17                 return;
           18             }
           19             if (replyHdr.getXid() == -4) {
           20                 // -4 is the xid for AuthPacket               
           21                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
           22                     state = States.AUTH_FAILED;                    
           23                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
           24                             Watcher.Event.KeeperState.AuthFailed, null) );                                        
           25                 }
           26                 if (LOG.isDebugEnabled()) {
           27                     LOG.debug("Got auth sessionid:0x"
           28                             + Long.toHexString(sessionId));
           29                 }
           30                 return;
           31             }
           32             if (replyHdr.getXid() == -1) {
           33                 // -1 means notification
           34                 if (LOG.isDebugEnabled()) {
           35                     LOG.debug("Got notification sessionid:0x"
           36                         + Long.toHexString(sessionId));
           37                 }
           38                 WatcherEvent event = new WatcherEvent();
           39                 event.deserialize(bbia, "response");
           40 
           41                 // convert from a server path to a client path
           42                 if (chrootPath != null) {
           43                     String serverPath = event.getPath();
           44                     if(serverPath.compareTo(chrootPath)==0)
           45                         event.setPath("/");
           46                     else if (serverPath.length() > chrootPath.length())
           47                         event.setPath(serverPath.substring(chrootPath.length()));
           48                     else {
           49                         LOG.warn("Got server path " + event.getPath()
           50                                 + " which is too short for chroot path "
           51                                 + chrootPath);
           52                     }
           53                 }
           54 
           55                 WatchedEvent we = new WatchedEvent(event);
           56                 if (LOG.isDebugEnabled()) {
           57                     LOG.debug("Got " + we + " for sessionid 0x"
           58                             + Long.toHexString(sessionId));
           59                 }
           60 
           61                 eventThread.queueEvent( we );
           62                 return;
           63             }
           64 
           65             // If SASL authentication is currently in progress, construct and
           66             // send a response packet immediately, rather than queuing a
           67             // response as with other packets.
           68             if (clientTunneledAuthenticationInProgress()) {
           69                 GetSASLRequest request = new GetSASLRequest();
           70                 request.deserialize(bbia,"token");
           71                 zooKeeperSaslClient.respondToServer(request.getToken(),
           72                   ClientCnxn.this);
           73                 return;
           74             }
           75 
           76             Packet packet;
           77             synchronized (pendingQueue) {
           78                 if (pendingQueue.size() == 0) {
           79                     throw new IOException("Nothing in the queue, but got "
           80                             + replyHdr.getXid());
           81                 }
           82                 packet = pendingQueue.remove();
           83             }
           84             /*
           85              * Since requests are processed in order, we better get a response
           86              * to the first request!
           87              */
           88             try {
           89                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
           90                     packet.replyHeader.setErr(
           91                             KeeperException.Code.CONNECTIONLOSS.intValue());
           92                     throw new IOException("Xid out of order. Got Xid "
           93                             + replyHdr.getXid() + " with err " +
           94                             + replyHdr.getErr() +
           95                             " expected Xid "
           96                             + packet.requestHeader.getXid()
           97                             + " for a packet with details: "
           98                             + packet );
           99                 }
          100 
          101                 packet.replyHeader.setXid(replyHdr.getXid());
          102                 packet.replyHeader.setErr(replyHdr.getErr());
          103                 packet.replyHeader.setZxid(replyHdr.getZxid());
          104                 if (replyHdr.getZxid() > 0) {
          105                     lastZxid = replyHdr.getZxid();
          106                 }
          107                 if (packet.response != null && replyHdr.getErr() == 0) {
          108                     packet.response.deserialize(bbia, "response");
          109                 }
          110 
          111                 if (LOG.isDebugEnabled()) {
          112                     LOG.debug("Reading reply sessionid:0x"
          113                             + Long.toHexString(sessionId) + ", packet:: " + packet);
          114                 }
          115             } finally {
          116                 finishPacket(packet);
          117             }
          118         }


          posted on 2016-12-27 13:51 jinfeng_wang 閱讀(505) 評論(0)  編輯  收藏 所屬分類: 2016-zookeeper
          主站蜘蛛池模板: 马山县| 治县。| 运城市| 寿光市| 驻马店市| 鸡东县| 金昌市| 浦北县| 庄河市| 郎溪县| 白朗县| 桑植县| 汨罗市| 富源县| 台江县| 南康市| 青田县| 阳谷县| 建德市| 买车| 万荣县| 邹平县| 长顺县| 天峻县| 化德县| 嘉义县| 临泽县| 孙吴县| 平远县| 晋州市| 原平市| 昆山市| 民县| 鄢陵县| 米林县| 东方市| 洛隆县| 汉寿县| 武平县| 北票市| 阿图什市|