java人

          愛生活,更愛java!

            BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            1 隨筆 :: 2 文章 :: 3 評論 :: 0 Trackbacks

                  今天無意中看到PipedInputStream這個類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

                  PipedInputStream類與PipedOutputStream類用于在應(yīng)用程序中創(chuàng)建管道通信.一個PipedInputStream實例對象必須和一個PipedOutputStream實例對象進行連接而產(chǎn)生一個通信管道.PipedOutputStream可以向管道中寫入數(shù)據(jù),PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數(shù)據(jù).這兩個類主要用來完成線程之間的通信.一個線程的PipedInputStream對象能夠從另外一個線程的PipedOutputStream對象中讀取數(shù)據(jù).

                  原來如此,不過這只是說說而已,具體怎么實現(xiàn)的呢,愛刨根問底的我可不會輕易放過這些疑惑,于是看了一下SUN源碼這兩個類的實現(xiàn),恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會在此寫上,供各位批判:)

                  首先簡單的介紹一下這兩個類的實現(xiàn)原理,PipedInputStreamPipedOutputStream的實現(xiàn)原理類似于"生產(chǎn)者-消費者"原理,PipedOutputStream是生產(chǎn)者,PipedInputStream是消費者,在PipedInputStream中有一個buffer字節(jié)數(shù)組,默認大小為1024,作為緩沖區(qū),存放"生產(chǎn)者"生產(chǎn)出來的東東.還有兩個變量,in,out,in是用來記錄"生產(chǎn)者"生產(chǎn)了多少,out是用來記錄"消費者"消費了多少,in為-1表示消費完了,in==out表示生產(chǎn)滿了.當消費者沒東西可消費的時候,也就是當in為-1的時候,消費者會一直等待,直到有東西可消費.

                  因為生產(chǎn)和消費的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對對象上鎖,之前一直以為只是對這個方法上鎖,別的synchronized方法仍然可以進入,哎,慚愧慚愧~~),所以肯定是生產(chǎn)者先生產(chǎn)出一定數(shù)量的東西,消費者才可以開始消費,所以在生產(chǎn)的時候發(fā)現(xiàn)in==out,那一定是滿了,同理,在消費的時候發(fā)現(xiàn)in==out,那一定是消費完了,因為生產(chǎn)的東西永遠要比消費來得早,消費者最多可以消費和生產(chǎn)的數(shù)量相等的東西,而不會超出.

                  好了,介紹完之后,看看SUN高手是怎么實現(xiàn)這些功能的.由于buffer(存放產(chǎn)品的通道)這個關(guān)鍵變量在PipedInputStream消費者這個類中,所以要想對buffer操作,只能通過PipedInputStream來操作,因此將產(chǎn)品放入通道的操作是在PipedInputStream中.

          存放產(chǎn)品的行為:

              protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因為這個方法是在PipedOutputStream類中調(diào)用的,而這個方法是protected的,下面另一個receive方法就不是protected,可能是我的源碼有些問題,也請大家?guī)臀铱纯?/span>
                  checkStateForReceive();// 檢測通道是否連接,準備好接收產(chǎn)品
                  writeSide = Thread.currentThread();// 當前線程是生產(chǎn)者
                  if (in == out)
                      awaitSpace();
          // 發(fā)現(xiàn)通道滿了,沒地方放東西啦,等吧~~
                  if (in < 0{// in<0,表示通道是空的,將生產(chǎn)和消費的位置都置成第一個位置
                      in = 0;
                      out 
          = 0;
                  }

                  buffer[in
          ++= (byte) (b & 0xFF);
                  
          if (in >= buffer.length) {// 如果生產(chǎn)位置到達了通道的末尾,為了循環(huán)利用通道,將in置成0
                      in = 0;
                  }

              }


              
          synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個方法不是protected的!
                  checkStateForReceive();
                  writeSide 
          = Thread.currentThread();
                  
          int bytesToTransfer = len;// 需要接收多少產(chǎn)品的數(shù)量
                  while (bytesToTransfer > 0{
                      
          if (in == out)
                          awaitSpace();
                      
          int nextTransferAmount = 0;// 本次實際可以接收的數(shù)量
                      if (out < in) {
                          nextTransferAmount 
          = buffer.length - in;// 如果消費的當前位置<生產(chǎn)的當前位置,則還可以再生產(chǎn)buffer.length-in這么多
                      }
           else if (in < out) {
                          
          if (in == -1{
                              in 
          = out = 0;// 如果已經(jīng)消費完,則將in,out置成0,從頭開始接收
                              nextTransferAmount = buffer.length - in;
                          }
           else {
                              nextTransferAmount 
          = out - in;// 如果消費的當前位置>生產(chǎn)的當前位置,而且還沒消費完,那么至少還可以再生產(chǎn)out-in這么多,注意,這種情況是因為通道被重復(fù)利用而產(chǎn)生的!
                          }

                      }

                      
          if (nextTransferAmount > bytesToTransfer)// 如果本次實際可以接收的數(shù)量要大于當前傳過來的數(shù)量,
                          nextTransferAmount = bytesToTransfer;// 那么本次實際只能接收當前傳過來的這么多了
                      assert (nextTransferAmount > 0);
                      System.arraycopy(b, off, buffer, in, nextTransferAmount);
          // 把本次實際接收的數(shù)量放進通道
                      bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進通道
                      off += nextTransferAmount;
                      in 
          += nextTransferAmount;
                      
          if (in >= buffer.length) {// 到末尾了,該從頭開始了
                          in = 0;
                      }

                  }

              }

           

          消費產(chǎn)品的行為:

              public synchronized int read() throws IOException {// 消費單個產(chǎn)品
                  if (!connected) {
                      
          throw new IOException("Pipe not connected");
                  }
           else if (closedByReader) {
                      
          throw new IOException("Pipe closed");
                  }
           else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                          
          && (in < 0)) {
                      
          throw new IOException("Write end dead");
                  }


                  readSide 
          = Thread.currentThread();
                  
          int trials = 2;
                  
          while (in < 0{// in<0,表示通道是空的,等待生產(chǎn)者生產(chǎn)
                      if (closedByWriter) {
                          
          /* closed by writer, return EOF */
                          
          return -1;// 返回-1表示生產(chǎn)者已經(jīng)不再生產(chǎn)產(chǎn)品了,closedByWriter為true表示是由生產(chǎn)者將通道關(guān)閉的
                      }

                      
          if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                          
          throw new IOException("Pipe broken");
                      }

                      
          /* might be a writer waiting */
                      notifyAll();
                      
          try {
                          wait(
          1000);
                      }
           catch (InterruptedException ex) {
                          
          throw new java.io.InterruptedIOException();
                      }

                  }

                  
          int ret = buffer[out++& 0xFF;
                  
          if (out >= buffer.length) {
                      out 
          = 0;// 如果消費到通道的末尾了,從通道頭開始繼續(xù)循環(huán)消費
                  }

                  
          if (in == out) {
                      
          /* now empty */
                      in 
          = -1;// 消費的位置和生產(chǎn)的位置重合了,表示消費完了,需要生產(chǎn)者生產(chǎn),in置為-1
                  }

                  
          return ret;
              }


              
          public synchronized int read(byte b[], int off, int len) throws IOException {
                  
          if (b == null{
                      
          throw new NullPointerException();
                  }
           else if ((off < 0|| (off > b.length) || (len < 0)
                          
          || ((off + len) > b.length) || ((off + len) < 0)) {
                      
          throw new IndexOutOfBoundsException();
                  }
           else if (len == 0{
                      
          return 0;
                  }


                  
          /* possibly wait on the first character */
                  
          int c = read();// 利用消費單個產(chǎn)品來檢測通道是否連接,并且通道中是否有東西可消費
                  if (c < 0{
                      
          return -1;// 返回-1表示生產(chǎn)者生產(chǎn)完了,消費者也消費完了,消費者可以關(guān)閉通道了
                  }

                  b[off] 
          = (byte) c;
                  
          int rlen = 1;

                  
          // 這里沒有采用receive(byte [], int ,
                  
          // int)方法中System.arrayCopy()的方法,其實用System.arrayCopy()的方法也可以實現(xiàn)
                  /*
                   * 這是用System.arrayCopy()實現(xiàn)的方法 int bytesToConsume = len - 1; while
                   * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
                   * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
                   * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
                   * buffer.length - out; }
                   * 
                   * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
                   * bytesToConsume; assert (nextConsumeAmount > 0);
                   * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
                   * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
                   * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
                   * buffer.length) { out = 0; } if(in == out) { in = -1; } }
                   
          */


                  
          while ((in >= 0&& (--len > 0)) {
                      b[off 
          + rlen] = buffer[out++];
                      rlen
          ++;
                      
          if (out >= buffer.length) {
                          out 
          = 0;
                      }

                      
          if (in == out) {
                          
          /* now empty */
                          in 
          = -1;// in==out,表示滿了,將in置成-1
                      }

                  }

                  
          return rlen;
              }


          雖說功能看似簡單,可是實現(xiàn)起來卻費了一番功夫,在線程的調(diào)度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個類,讓我對多線程編程有了更多的認識.我覺得要想周密的分析整個功能,得把整個流程都分析清楚,構(gòu)造好了模型再去實現(xiàn)細節(jié),只有整體構(gòu)造對了,才能把正確的實現(xiàn)局部,自己的分析能力還有待加強啊!

          posted on 2008-04-10 17:47 爪哇豬 閱讀(3948) 評論(3)  編輯  收藏

          評論

          # re: 使用PipedInputStream,PipedOutputStream心得體會 2011-10-20 10:14 王康
          不知道你指的protected關(guān)鍵字有什么問題???  回復(fù)  更多評論
            

          # r的 2012-11-05 16:31
          的  回復(fù)  更多評論
            

          # re: 使用PipedInputStream,PipedOutputStream心得體會[未登錄] 2015-04-03 12:35 aaron
          請問可以引用嗎?謝謝  回復(fù)  更多評論
            


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 容城县| 岱山县| 平定县| 松潘县| 巩义市| 上饶市| 中阳县| 竹溪县| 剑阁县| 安宁市| 巧家县| 柞水县| 津南区| 陈巴尔虎旗| 富蕴县| 延长县| 南涧| 兴和县| 永康市| 铜陵市| 卢龙县| 秀山| 湟源县| 韩城市| 全州县| 二手房| 腾冲县| 河津市| 申扎县| 靖宇县| 会东县| 肥城市| 青铜峡市| 宜都市| 礼泉县| 同德县| 乌苏市| 邯郸县| 新巴尔虎左旗| 晋江市| 新野县|