上善若水
          In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
          posts - 146,comments - 147,trackbacks - 0

          概述

          在Jetty中,使用Connector來抽象Jetty服務器對某個端口的監聽。在Connector啟動時,它會啟動acceptors個Acceptor線程用于監聽在Connector中配置的端口。對于客戶端的每次連接,Connector都會創建相應的EndPoint來表示該連接,一般在創建EndPoint的同時會同時創建Connection,這里EndPoint用于和Socket打交道,而Connection用于在從Socket中讀取到數據后的處理邏輯以及生成響應數據的處理邏輯。

          不同的Connector會創建不同的EndPoint和Connection實例。如SocketConnector創建ConnectorEndPoint和HttpConnection,SslSocketConnector創建SslConnectorEndPoint和HttpConnection,SelectChannelConnector創建SelectChannelEndPoint和SelectChannelHttpConnection,SslSelectChannelConnector創建SslSelectChannelEndPoint和SelectChannelHttpConnection,BlockingChannelConnector創建BlockingChannelEndPoint和HttpConnection等。

          EndPoint接口定義

          Jetty中EndPoint接口定義如下:
          public interface EndPoint {
              // EndPoint是對一次客戶端到服務器連接的抽象,每一個新的連接都會創建一個新的EndPoint,并且在這個EndPoint中包含這次連接的Socket。由于EndPoint包含底層的連接Socket,因而它主要用于處理從Socket中讀取數據和向Socket中寫入數據,即對應EndPoint接口中的fill和flush方法。

              
          // 從Socket中讀取數據,并寫入Buffer中直到數據讀取完成或putIndex到Buffer的capacity。返回總共讀取的字節數。在實現中,StreamEndPoint使用Buffer直接從Socket的InputStream中讀取數據,而ChannelEndPoint則向Channel讀取數據到Buffer。
              int fill(Buffer buffer) throws IOException;

              // 將Buffer中的數據(從getIndex到putIndex的數據)寫入到Socket中,同時清除緩存(調用Buffer的clear方法)。在實現中,StreamEndPoint使用Buffer直接向Socket的OutputStream寫入數據,而ChanelEndPoint則將Buffer中的數據寫入Channel中。
              int flush(Buffer buffer) throws IOException;
              
              // 類似上面的flush,它會將傳入的header、buffer、trailer按順序寫入Socket中(OutputStream或者Channel)。返回總共寫入的字節數。
              int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException;

              // 當在處理HTTP/1.0請求時或當前Request中的KeepAlive的值為false時,在處理完成當前請求后,需要調用shutdownOutput()方法,關閉當前連接;或在處理當前請求時出現比較嚴重的錯誤、Socket超時時。在調用完shutdownOutput()方法后,isOutputShutdown()方法返回true。
              void shutdownOutput() throws IOException;
              boolean isOutputShutdown();

              // 當Server無法從當前連接(Socket)中讀取數據時(即read返回-1)時,調用shutdownInput()方法以關閉當前連接,此時isInputShutdown()返回true。
              void shutdownInput() throws IOException;
              boolean isInputShutdown();

              // 當Socket超時或在讀寫Socket過程中出現任何IO錯誤時,Server會直接調用close()方法以關閉當前連接。
              void close() throws IOException;
              // 當前Connection是否已經打開,對ChannelEndPoint來說表示Channel.isOpen()返回true,對SocketEndPoint來說,表示Socket沒有被關閉。
              public boolean isOpen();

              // 對StreamEndPoint來說,它的讀寫是阻塞式的,但是對ChannelEndPoint來說,如果它內部的channel是SelectableChannel,那么這個Channel的讀寫可以配置成非阻塞的(通過SelectableChannel.isBlocking()方法判斷)。因而對SelectChannelEndPoint需要使用blockReadable()方法來阻塞直到超時。返回true表示阻塞讀取失敗,此時HttpParser會關閉這個EndPoint,并拋出異常。blockWritable()方法類似blockReadable()用于SelectChannelEndPoint以等待有數據寫入到Channel中,如果返回false,表示在指定的時間內沒有數據可寫入Channel中(即超時),此時會關閉該EndPoint,并拋出異常。
              public boolean isBlocking();
              public boolean blockReadable(long millisecs) throws IOException;
              public boolean blockWritable(long millisecs) throws IOException;

              // 對SslSelectChannelEndPoint,它是Buffered,因而它的isBuffered()方法返回true,而isBufferingInput()和isBufferingOutput()根據內部的_inNIOBuffer和_outNIOBuffer字段的hasContent()方法判斷是否返回true或false。對其他類型的EndPoint來說,這三個方法都返回false。而flush()方法則將_outNIOBuffer中緩存的數據寫入Channel中。
              public boolean isBufferred();
              public boolean isBufferingInput();
              public boolean isBufferingOutput();
              public void flush() throws IOException;

              // EndPoint還定義了一些和EndPoint相關鏈的信息和狀態:

              // 返回該EndPoint內部使用的傳輸工具,如ChannelEndPoint內部使用Channel,而SocketEndPoint內部使用Socket。該方法用于對內部傳輸工具的配置。
              public Object getTransport();
              // 用于配置Socket的SO_TIMEOUT的時間,即等待客戶連接的超時時間。
              public int getMaxIdleTime();
              public void setMaxIdleTime(int timeMs) throws IOException;

              // 返回當前EndPoint所在服務器的IP地址、主機名、端口號以及客戶端的IP地址、主機名、端口號。
              public String getLocalAddr();
              public String getLocalHost();
              public int getLocalPort();
              public String getRemoteAddr();
              public String getRemoteHost();
              public int getRemotePort();
          }

          EndPoint類圖


          EndPoint接口定義概述

          EndPoint最主要的方法從底層傳輸鏈路中讀取數據并填入Buffer中的fill方法,以及將Buffer中的數據寫入底層傳輸鏈路的flush方法;讀數據對應Input,寫數據對應Output,可以單獨的關閉Input或Output,并提供方法判斷Input或Output是否已經被關閉;可以用close方法關閉EndPoint,也可以通過isOpen方法判斷是否這個EndPoint是否已經被關閉;可以以阻塞的方式讀寫EndPoint,并判斷當前EndPoint是否處于阻塞狀態(主要用于SelectChannelEndPoint中);對SslSelectChannelEndPoint來說,它在讀寫時都可能內部緩存數據,因而EndPoint中定義了一些方法用于判斷當前EndPoint是否有輸入/輸出換成,以及使用flush將緩存中的數據寫入到底層鏈路中;對底層Socket,EndPoint還可以配置其最長的空閑時間;最后EndPoint還提供一些方法用于獲取本地和遠程的地址、主機名、端口號,以及獲取底層傳輸類,如Socket、Channel等。

          StreamEndPoint實現

          StreamEndPoint采用古老的Stream方法從Socket中讀寫數據,它包含InputStream和OutputStream,分別表示讀寫數據流;它永遠是阻塞式讀寫,因而isBlocking、blockReadable、blockWritable永遠返回true;它也不會在內部緩存讀寫數據,因而isBufferingInput、isBufferingOutput、isBufferred永遠返回false,而flush方法直接調用OutputStream的flush方法;對fill實現,直接使用傳入的Buffer從InputStream中讀取數據;對flush實現,直接將Buffer中的數據寫入到OutputStream中;close方法同時關閉InputStream和OutputStream,并將成員變量置為null;對StreamEndPoint本身,沒有本地或遠程的地址、主機名、端口號信息。

          SocketEndPoint是StreamEndPoint的子類,它從Socket中獲取InputStream和OutputStream,以及本地和遠程的地址、主機名、端口號;而isInputShutdown、isOutputShutdown、shutdownInput、shutdownOutput等方法直接調用Socket中相應的方法;getTransport直接返回Socket實例;setMaxIdleTime方法同時設置Socket的SO_TIMEOUT值;當空閑超時,只關閉Input。

          ConnectorEndPoint繼承自SocketEndPoint,它是SocketConnector的內部類,每一個客戶端的連接請求創建一個ConnectorEndPoint實例,在創建ConnectorEndPoint的同時,會在內部創建一個HttpConnection實例;它還實現了ConnectedEndPoint,因而可以從外部設置Connection實例;在讀數據時,如果遇到EOF,表示連接已經斷開,因而關閉當前EndPoint;在關閉EndPoint時,cancel當前Connection中Request實例的AsyncContinuation。ConnectorEndPoint還實現了Runnable接口,在其run方法的實現中,它首先更新處理的Connection的引用計數,然后保存當前Connection實例,在SocketConnector已經啟動,并且ConnectorEndPoint未被關閉的狀態下循環調用Connection的handle方法,在每個循環開始前檢查當前Connector是否處于Low Resources狀態(如線程池的可用線程已經不多),此時更新EndPoint的MaxIdleTime為當前Connector的LowResourcesMaxIdleTime的值,以減少一些連接的空閑等待時間;對任何Exception,關閉當前EndPoint;最后更新Connector中的一些統計信息,將當前Connection從Connector的當前正在處理的connections集合中移除,如果此時Socket還未關閉,讀取Socket中的數據直到數據讀完或超過MaxIdleTime,此時如果Socket還未關閉,則關閉當前Socket。而在Connector創建ConnectorEndPoint時,會調用其dispatch方法,將其自身仍給相應的線程池處理,以在某個時間在另一個線程中調用其run方法。

          SslConnectorEndPoint繼承自ConnectorEndPoint,它在關閉Input和Output時會同時關閉整個EndPoint,而在執行真正的處理邏輯前有一個handle shake的過程。

          ChannelEndPoint實現

          ChannelEndPoint采用NIO實現,從Channel中讀寫數據。在創建ChannelEndPoint時傳入ByteChannel,如果傳入的ByteChannel是SocketChannel,則同時紀錄Socket實例,以及獲取本地、遠程的地址信息,并設置MaxIdleTime值為SO_TIMEOUT值。如果該ByteChannel是SelectableChannel類型(ServerSocketChannel、SocketChannel、DiagramChannel、SinkChannel、SourceChannel),并且其isBlocking()方法返回false,表示該Channel是非阻塞式的讀寫,否則這個Channel是阻塞式的讀寫,但是默認情況下,blockReadable、blockWritable直接返回true,表示阻塞式的讀寫。對非SslSelectChannelEndPoint的EndPoint不會在內部緩存數據,因而isBufferred、isBufferingOutput、isBufferingInput直接返回false,而flush方法為空實現;對SocketChannel,在設置MaxIdleTime時,同時將該值設置到底層Socket的SO_TIMEOUT的值中;getTransport直接返回底層channel實例;shutdownInput、shutdownOutput、isInputShutdown、isOutputShutdown使用Socket實現;fill實現只支持NIOBuffer,它使用Channel將數據寫入內部的ByteBuffer中;flush實現使用Channel將ByteBuffer中的數據寫入到Channel中,或使用GatheringByteChannel將多個ByteBuffer同時寫入到Channel中。

          BlockingChannelEndPoint類是BlockingChannelConnector的內部類,它繼承自ChannelEndPoint,并實現了ConnectedEndPoint和Runnable接口。在創建BlcokingChannelEndPoint時,同樣也會創建HttpConnection實例;每次調用fill、flush方法時,都會更新_idleTimestamp的值為當前時間戳(該值也會在每一次Connection開始重新被處理時更新),在BlockingChannelConnector啟動時會生成一個Task,它沒400毫秒遍歷一次所有正在處理的EndPoint,如果發現有EndPoint已經超時(checkIdleTimestamp()方法,即空閑時間超過MaxIdleTime),則調用其idleExpired()方法,將該EndPoint關閉;BlockingChannelConnector在接到一個連接后,先會設置SocketChannel的blockingChannel為true,然后使用這個SocketChannel創建一個BlockingChannelEndPoint,并調用其dispatch()方法,將它丟到一個線程池中,在BlockingChannelEndPoint的run方法實現中,首先更新一些統計數據,紀錄當前正在處理的EndPoint;只要當前EndPoint還處于打開狀態,先更新_idleTimestamp為當前時間戳,然后如果當前ThreadPool處于LowOnThread狀態,將timeout時間更新為LowResourcesMaxIdleTime,而后調用Connection的handle方法;對任何Exception,直接關閉EndPoint;在最后退出時,如果EndPoint還未關閉,讀取EndPoint的數據,直到超時,并強制關閉EndPoint。

          SelectChannelEndPoint類在SelectChannelConnector中被使用,它繼承自ChannelEndPoint,并實現了ConnectedEndPoint和AsyncEndPoint接口,SelectChannelConnector采用NIO中多路復用的機制,因而實現會比較復雜一些。在創建Connector時,首先創建ConnectorSelectorManager實例(_manager),在SelectChannelConnector啟動時,設置_manager的SelectSets(acceptors)、MaxIdleTime、LowResourcesConnections、LowResourcesIdleTime,然后啟動_manager,并且啟動acceptors個線程,只要SelectChannelConnector處于Running狀態,就不斷的調用_manager.doSelect()方法。ConnectorSelectorManager在啟動時會創建_selectSets個SelectSet;而doSelect方法會調用根據傳入的索引號對應的SelectSet的doSelect()方法。當客戶端的連接到來后,SelectChannelConnector首先會配置SocketChannel的configureBlocking為false,然后將該SocketChannel注冊到_manager中,在注冊過程中,根據當前的SelectSet索引值找到相應的SelectSet(之后索引自增),然后調用SelectSet的addChange(傳入SocketChannel)和wakeup方法。因而這里最重要的就是SelectSet的實現,它是SelectorManager中的一個內部類。
          SelectSet類的實現中,它內部有一個Selector,一個ConcurrentLinkedQueue的changes隊列,以及SelectChannelEndPoint到SelectSet的集合(它用于調用SelectChannelEndPoint中的checkIdleTimestamp()方法以檢查并關閉處于Idle Timeout的SelectChannelEndPoint)。SelectSet使用addChange()方法添加需要改變狀態的對象,這些對象有EndPoint、ChannelAndAttachment、SocketChannel、Runnable。在doSelect()方法中,首先檢查changes隊列中是否有對象,如果有SelectChannelEndPoint對象,則調用其doUpdateKey()方法;如果是SocketChannel對象,則注冊OP_READ操作到Selector中,創建新的SelectChannelEndPoint,attach新創建的SelectChannelEndPoint到SelectionKey中,調用SelectChannelEndPoint的schedule()方法。對ChannelAndAttachment對象,如果其Channel是SocketChannel,并且處于Connected狀態,則類似對SocketChannel對象的處理,否則,注冊OP_CONNECT操作到Selector;如果是Runnable對象,則dispatch該Runnable對象。然后調用Selector的selectNow()方法,如果沒有任何可用的事件,則計算出等待時間,然后帶等待時間的調用Selector的select()方法;遍歷所有Selected Keys,對Invalid的SelectionKey,直接調用其attach的SelectChannelEndPoint的doUpdateKey()方法,否則對類型是SelectChannelEndPoint的attachment調用其schedule()方法,對connectable的SelectionKey創建新的SelectChannelEndPoint并調用schedule()方法,否則創建新的SelectChannelEndPoint并對readable的SelectionKey調用其schedule()方法。
          SelectChannelEndPoint采用NIO的非阻塞讀寫方式,而NIO基于Channel的非阻塞操作是基于注冊的操作集(OP_READ, OP_WRITE, O_CONNECT, OP_ACCEPT)以從Selector中選出已經可用的SelectionKey(包含對應的Channel、interestOps、readable、writable、attachment等),之后可以使用對應的Channel以及根據SelectionKey中對應的已經可用的操作執行相應的操作(如讀寫),因而SelectChannelEndPoint的其中一個任務是要實時的更新當前它感興趣的操作集,并重新像Selector中注冊。 SelectChannelEndPoint使用updateKey()方法跟新感興趣操作集合,并且它只關注OP_READ和OP_WRITE操作,在實現時,OP_READ只需要在Socket的輸入沒有關閉,且還沒有dispatch或當前處于readBlocked狀態下才需要關注;OP_WRITE只需要在Socket的輸出沒有關閉,且writable為false(當需要向Channel中寫數據,但是還沒有寫完的情況下)或當前處于writeBlocked狀態下才需要關注;如果和當前已注冊的操作集相同,則不需要重新注冊,否者將自身通過SelectSet的addChange()方法添加到SelectSet中,在SelectSet的doSelect()方法中會最終調用SelectChannelEndPoint中的doUpdateKey()方法,該方法的實現:1. 當Channel處于Open狀態,存在感興趣的操作,SelectionKey為null或invalid,如果Channel已經注冊了,重新調用updateKey()方法(感覺這里一般不會被調用到,如果被調用到了,則可能出現死循環),否則將Channel重新向Selector中重新注冊interestOps的操作集(如果出錯,則canncel SelectionKey,并且從SelectSet中銷毀當前EndPoint)。2. 當Channel處于Open狀態,存在感興趣的操作,SelectionKey存在且valid,則直接使用interestOps更新SelectionKey的感興趣集(調用SelectionKey的interestOps()方法)。3. 當Channel處于Open狀態,不存在感興趣的操作,清空SelectionKey的interestOps,或清理SelectionKey引用。4. 如果Channel處于關閉狀態,則canncel SelectionKey,并從SelectSet中銷毀當前EndPoint。
          對阻塞讀寫(readBlocked、writeBlocked),在blockReadable()、blockWritable()方法中,會設置readBlocked、writeBlocked為true,調用updateKey()方法,然后計算等待時間并進入等待(調用wait方法),如果因為超時而退出等待,則返回false,否則返回true(在返回時設置readBlocked、writeBlocked為false);當調用SelectChannelEndPoint的schedule()方法時,它會更新readBlocked、writeBlocked、interestOps的值(同時使用該值更新SelectionKey中的狀態),并調用notifyAll()方法喚醒blockReadable()、blockWritable()方法:1. 如果SelectionKey為null或invalid,readBlocked、writeBlocked設置為false,調用notifyAll(),并返回;2. 如果readBlocked或writeBlocked為true,使用SelectionKey的readable、writable更新readBlocked和writeBlocked的值,調用notifyAll(),如果已經dispatched,清除所有interestOps,并返回;3. 如果還沒有dispatched,直接清除所有interestOps,并返回;4. 如果注冊了OP_WRITE,并且已經可寫,則清除OP_WRITE操作,設置writable為true;5. 如果還沒有dispatched,則調用dispatch()方法。在dispatch()方法中,它設置dispatched為true,并將handler扔給ThreadPool(在handler調用Connection的handle()方法,由于SelectChannelEndPoint的生命周期是在SelectManager維護,并且dispatch()方法可能被多次調用,因而沒有在handler的handle()方法中判斷EndPoint的close狀態,并循環的調用Connection的handle()方法,而是在每次handle()方法結束后退出當前線程,在下次schedule()時會使用重新將handler扔給ThreadPool以支持AsyncContinuation的實現,并且AsyncEndPoint接口的定義也是用于AsyncContinuation的實現,這個將在以后的博客中詳述)。在flush()方法中,如果沒有任何數據能寫入Channel時,設置writable為false(從而在updateKey()方法中能將OP_WRITE注冊到SelectionKey的interestOps中),并在沒有dispatch的情況下調用updateKey。最后清理Selector中的selectedKeys,expire所有timeout中注冊的Task(使用scheduleTimeout()方法注冊),依次調用檢查EndPoints中是否已經TimeOut。
          在SelectChannelEndPoint的構建中,它使用SocketChannel、SelectSet、SelectionKey構建,內部從SelectSet中獲取SelectorManager,并使用SelectorManager 創建Connection實例,初始化_dispatched、_redispatched為false,_open、_writable為true,_readBlocked、_writeBlocked為false,_interestOps為0,最后更新_idleTimestamp為當前時間。當一個客戶端連接到來后,SelectChannelConnector會向SelectorManager(SelectSet)中注冊一個SocketChannel,當后臺線程調用SelectSet中的doSelect()方法時,它使用該SocketChannel,向該SelectSet中的Selector注冊OP_READ得到一個SelectionKey,并使用這個SocketChannel、當前SelectSet、以及這個SelectionKey創建一個SelectChannelEndPoint,而后調用SelectChannelEndPoint的schedule()方法。

          SslSelectChannelEndPoint類采用Buffer的形式先將數據讀寫到內部緩存中,然后使用SSLEngine來wrap或unwrap(encode/decode)數據。這里不再詳述。
          posted on 2014-03-29 14:34 DLevin 閱讀(3277) 評論(1)  編輯  收藏 所屬分類: Jetty

          FeedBack:
          # re: 深入Jetty源碼之EndPoint
          2014-04-08 16:53 | 真相帝
          繼續支持!很崇拜你們這些技術達人。。  回復  更多評論
            
          主站蜘蛛池模板: 旌德县| 偏关县| 屏边| 西充县| 仪陇县| 白沙| 油尖旺区| 息烽县| 阜城县| 博客| 监利县| 苏州市| 罗山县| 班玛县| 房产| 库伦旗| 托克逊县| 松江区| 东阳市| 杨浦区| 闸北区| 铁岭市| 河北区| 拉孜县| 同仁县| 沁阳市| 常山县| 谢通门县| 行唐县| 临泽县| 汕尾市| 新绛县| 罗江县| 桃园县| 邵武市| 那曲县| 吉林省| 罗山县| 荆门市| 皋兰县| 绥中县|