java人

          愛生活,更愛java!

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            1 隨筆 :: 2 文章 :: 3 評論 :: 0 Trackbacks

                  今天無意中看到PipedInputStream這個類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

                  PipedInputStream類與PipedOutputStream類用于在應用程序中創建管道通信.一個PipedInputStream實例對象必須和一個PipedOutputStream實例對象進行連接而產生一個通信管道.PipedOutputStream可以向管道中寫入數據,PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數據.這兩個類主要用來完成線程之間的通信.一個線程的PipedInputStream對象能夠從另外一個線程的PipedOutputStream對象中讀取數據.

                  原來如此,不過這只是說說而已,具體怎么實現的呢,愛刨根問底的我可不會輕易放過這些疑惑,于是看了一下SUN源碼這兩個類的實現,恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會在此寫上,供各位批判:)

                  首先簡單的介紹一下這兩個類的實現原理,PipedInputStreamPipedOutputStream的實現原理類似于"生產者-消費者"原理,PipedOutputStream是生產者,PipedInputStream是消費者,在PipedInputStream中有一個buffer字節數組,默認大小為1024,作為緩沖區,存放"生產者"生產出來的東東.還有兩個變量,in,out,in是用來記錄"生產者"生產了多少,out是用來記錄"消費者"消費了多少,in為-1表示消費完了,in==out表示生產滿了.當消費者沒東西可消費的時候,也就是當in為-1的時候,消費者會一直等待,直到有東西可消費.

                  因為生產和消費的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對對象上鎖,之前一直以為只是對這個方法上鎖,別的synchronized方法仍然可以進入,哎,慚愧慚愧~~),所以肯定是生產者先生產出一定數量的東西,消費者才可以開始消費,所以在生產的時候發現in==out,那一定是滿了,同理,在消費的時候發現in==out,那一定是消費完了,因為生產的東西永遠要比消費來得早,消費者最多可以消費和生產的數量相等的東西,而不會超出.

                  好了,介紹完之后,看看SUN高手是怎么實現這些功能的.由于buffer(存放產品的通道)這個關鍵變量在PipedInputStream消費者這個類中,所以要想對buffer操作,只能通過PipedInputStream來操作,因此將產品放入通道的操作是在PipedInputStream中.

          存放產品的行為:

              protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因為這個方法是在PipedOutputStream類中調用的,而這個方法是protected的,下面另一個receive方法就不是protected,可能是我的源碼有些問題,也請大家幫我看看
                  checkStateForReceive();// 檢測通道是否連接,準備好接收產品
                  writeSide = Thread.currentThread();// 當前線程是生產者
                  if (in == out)
                      awaitSpace();
          // 發現通道滿了,沒地方放東西啦,等吧~~
                  if (in < 0{// in<0,表示通道是空的,將生產和消費的位置都置成第一個位置
                      in = 0;
                      out 
          = 0;
                  }

                  buffer[in
          ++= (byte) (b & 0xFF);
                  
          if (in >= buffer.length) {// 如果生產位置到達了通道的末尾,為了循環利用通道,將in置成0
                      in = 0;
                  }

              }


              
          synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個方法不是protected的!
                  checkStateForReceive();
                  writeSide 
          = Thread.currentThread();
                  
          int bytesToTransfer = len;// 需要接收多少產品的數量
                  while (bytesToTransfer > 0{
                      
          if (in == out)
                          awaitSpace();
                      
          int nextTransferAmount = 0;// 本次實際可以接收的數量
                      if (out < in) {
                          nextTransferAmount 
          = buffer.length - in;// 如果消費的當前位置<生產的當前位置,則還可以再生產buffer.length-in這么多
                      }
           else if (in < out) {
                          
          if (in == -1{
                              in 
          = out = 0;// 如果已經消費完,則將in,out置成0,從頭開始接收
                              nextTransferAmount = buffer.length - in;
                          }
           else {
                              nextTransferAmount 
          = out - in;// 如果消費的當前位置>生產的當前位置,而且還沒消費完,那么至少還可以再生產out-in這么多,注意,這種情況是因為通道被重復利用而產生的!
                          }

                      }

                      
          if (nextTransferAmount > bytesToTransfer)// 如果本次實際可以接收的數量要大于當前傳過來的數量,
                          nextTransferAmount = bytesToTransfer;// 那么本次實際只能接收當前傳過來的這么多了
                      assert (nextTransferAmount > 0);
                      System.arraycopy(b, off, buffer, in, nextTransferAmount);
          // 把本次實際接收的數量放進通道
                      bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進通道
                      off += nextTransferAmount;
                      in 
          += nextTransferAmount;
                      
          if (in >= buffer.length) {// 到末尾了,該從頭開始了
                          in = 0;
                      }

                  }

              }

           

          消費產品的行為:

              public synchronized int read() throws IOException {// 消費單個產品
                  if (!connected) {
                      
          throw new IOException("Pipe not connected");
                  }
           else if (closedByReader) {
                      
          throw new IOException("Pipe closed");
                  }
           else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                          
          && (in < 0)) {
                      
          throw new IOException("Write end dead");
                  }


                  readSide 
          = Thread.currentThread();
                  
          int trials = 2;
                  
          while (in < 0{// in<0,表示通道是空的,等待生產者生產
                      if (closedByWriter) {
                          
          /* closed by writer, return EOF */
                          
          return -1;// 返回-1表示生產者已經不再生產產品了,closedByWriter為true表示是由生產者將通道關閉的
                      }

                      
          if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                          
          throw new IOException("Pipe broken");
                      }

                      
          /* might be a writer waiting */
                      notifyAll();
                      
          try {
                          wait(
          1000);
                      }
           catch (InterruptedException ex) {
                          
          throw new java.io.InterruptedIOException();
                      }

                  }

                  
          int ret = buffer[out++& 0xFF;
                  
          if (out >= buffer.length) {
                      out 
          = 0;// 如果消費到通道的末尾了,從通道頭開始繼續循環消費
                  }

                  
          if (in == out) {
                      
          /* now empty */
                      in 
          = -1;// 消費的位置和生產的位置重合了,表示消費完了,需要生產者生產,in置為-1
                  }

                  
          return ret;
              }


              
          public synchronized int read(byte b[], int off, int len) throws IOException {
                  
          if (b == null{
                      
          throw new NullPointerException();
                  }
           else if ((off < 0|| (off > b.length) || (len < 0)
                          
          || ((off + len) > b.length) || ((off + len) < 0)) {
                      
          throw new IndexOutOfBoundsException();
                  }
           else if (len == 0{
                      
          return 0;
                  }


                  
          /* possibly wait on the first character */
                  
          int c = read();// 利用消費單個產品來檢測通道是否連接,并且通道中是否有東西可消費
                  if (c < 0{
                      
          return -1;// 返回-1表示生產者生產完了,消費者也消費完了,消費者可以關閉通道了
                  }

                  b[off] 
          = (byte) c;
                  
          int rlen = 1;

                  
          // 這里沒有采用receive(byte [], int ,
                  
          // int)方法中System.arrayCopy()的方法,其實用System.arrayCopy()的方法也可以實現
                  /*
                   * 這是用System.arrayCopy()實現的方法 int bytesToConsume = len - 1; while
                   * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
                   * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
                   * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
                   * buffer.length - out; }
                   * 
                   * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
                   * bytesToConsume; assert (nextConsumeAmount > 0);
                   * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
                   * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
                   * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
                   * buffer.length) { out = 0; } if(in == out) { in = -1; } }
                   
          */


                  
          while ((in >= 0&& (--len > 0)) {
                      b[off 
          + rlen] = buffer[out++];
                      rlen
          ++;
                      
          if (out >= buffer.length) {
                          out 
          = 0;
                      }

                      
          if (in == out) {
                          
          /* now empty */
                          in 
          = -1;// in==out,表示滿了,將in置成-1
                      }

                  }

                  
          return rlen;
              }


          雖說功能看似簡單,可是實現起來卻費了一番功夫,在線程的調度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個類,讓我對多線程編程有了更多的認識.我覺得要想周密的分析整個功能,得把整個流程都分析清楚,構造好了模型再去實現細節,只有整體構造對了,才能把正確的實現局部,自己的分析能力還有待加強啊!

          posted on 2008-04-10 17:47 爪哇豬 閱讀(3948) 評論(3)  編輯  收藏

          評論

          # re: 使用PipedInputStream,PipedOutputStream心得體會 2011-10-20 10:14 王康
          不知道你指的protected關鍵字有什么問題???  回復  更多評論
            

          # r的 2012-11-05 16:31
          的  回復  更多評論
            

          # re: 使用PipedInputStream,PipedOutputStream心得體會[未登錄] 2015-04-03 12:35 aaron
          請問可以引用嗎?謝謝  回復  更多評論
            


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 平山县| 伊宁县| 庆阳市| 绥滨县| 东阳市| 济宁市| 虹口区| 建昌县| 南华县| 宜兴市| 黎川县| 华坪县| 土默特左旗| 蒲江县| 台湾省| 镇赉县| 五原县| 石林| 石嘴山市| 定州市| 尉犁县| 达州市| 桂阳县| 武穴市| 庐江县| 克拉玛依市| 大洼县| 南澳县| 哈巴河县| 察隅县| 武冈市| 宁德市| 宾阳县| 子长县| 珠海市| 红安县| 碌曲县| 辽宁省| 泉州市| 太湖县| 象州县|