java人

          愛生活,更愛java!

            BlogJava :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            1 隨筆 :: 2 文章 :: 3 評(píng)論 :: 0 Trackbacks

                  今天無意中看到PipedInputStream這個(gè)類,不知道是干嘛用的,便google了一下,字面上理解是管道流,看一下別人是怎么說的:

                  PipedInputStream類與PipedOutputStream類用于在應(yīng)用程序中創(chuàng)建管道通信.一個(gè)PipedInputStream實(shí)例對(duì)象必須和一個(gè)PipedOutputStream實(shí)例對(duì)象進(jìn)行連接而產(chǎn)生一個(gè)通信管道.PipedOutputStream可以向管道中寫入數(shù)據(jù),PipedIntputStream可以讀取PipedOutputStream向管道中寫入的數(shù)據(jù).這兩個(gè)類主要用來完成線程之間的通信.一個(gè)線程的PipedInputStream對(duì)象能夠從另外一個(gè)線程的PipedOutputStream對(duì)象中讀取數(shù)據(jù).

                  原來如此,不過這只是說說而已,具體怎么實(shí)現(xiàn)的呢,愛刨根問底的我可不會(huì)輕易放過這些疑惑,于是看了一下SUN源碼這兩個(gè)類的實(shí)現(xiàn),恕本人比較愚鈍,把玩了大半天,才稍微參透其中一些奧妙,故將心得體會(huì)在此寫上,供各位批判:)

                  首先簡(jiǎn)單的介紹一下這兩個(gè)類的實(shí)現(xiàn)原理,PipedInputStreamPipedOutputStream的實(shí)現(xiàn)原理類似于"生產(chǎn)者-消費(fèi)者"原理,PipedOutputStream是生產(chǎn)者,PipedInputStream是消費(fèi)者,在PipedInputStream中有一個(gè)buffer字節(jié)數(shù)組,默認(rèn)大小為1024,作為緩沖區(qū),存放"生產(chǎn)者"生產(chǎn)出來的東東.還有兩個(gè)變量,in,out,in是用來記錄"生產(chǎn)者"生產(chǎn)了多少,out是用來記錄"消費(fèi)者"消費(fèi)了多少,in為-1表示消費(fèi)完了,in==out表示生產(chǎn)滿了.當(dāng)消費(fèi)者沒東西可消費(fèi)的時(shí)候,也就是當(dāng)in為-1的時(shí)候,消費(fèi)者會(huì)一直等待,直到有東西可消費(fèi).

                  因?yàn)樯a(chǎn)和消費(fèi)的方法都是synchronized的(寫到這里,我去研究了一下synchronized的用法,才知道synchronized是對(duì)對(duì)象上鎖,之前一直以為只是對(duì)這個(gè)方法上鎖,別的synchronized方法仍然可以進(jìn)入,哎,慚愧慚愧~~),所以肯定是生產(chǎn)者先生產(chǎn)出一定數(shù)量的東西,消費(fèi)者才可以開始消費(fèi),所以在生產(chǎn)的時(shí)候發(fā)現(xiàn)in==out,那一定是滿了,同理,在消費(fèi)的時(shí)候發(fā)現(xiàn)in==out,那一定是消費(fèi)完了,因?yàn)樯a(chǎn)的東西永遠(yuǎn)要比消費(fèi)來得早,消費(fèi)者最多可以消費(fèi)和生產(chǎn)的數(shù)量相等的東西,而不會(huì)超出.

                  好了,介紹完之后,看看SUN高手是怎么實(shí)現(xiàn)這些功能的.由于buffer(存放產(chǎn)品的通道)這個(gè)關(guān)鍵變量在PipedInputStream消費(fèi)者這個(gè)類中,所以要想對(duì)buffer操作,只能通過PipedInputStream來操作,因此將產(chǎn)品放入通道的操作是在PipedInputStream中.

          存放產(chǎn)品的行為:

              protected synchronized void receive(int b) throws IOException {// 這里好像有些問題,因?yàn)檫@個(gè)方法是在PipedOutputStream類中調(diào)用的,而這個(gè)方法是protected的,下面另一個(gè)receive方法就不是protected,可能是我的源碼有些問題,也請(qǐng)大家?guī)臀铱纯?/span>
                  checkStateForReceive();// 檢測(cè)通道是否連接,準(zhǔn)備好接收產(chǎn)品
                  writeSide = Thread.currentThread();// 當(dāng)前線程是生產(chǎn)者
                  if (in == out)
                      awaitSpace();
          // 發(fā)現(xiàn)通道滿了,沒地方放東西啦,等吧~~
                  if (in < 0{// in<0,表示通道是空的,將生產(chǎn)和消費(fèi)的位置都置成第一個(gè)位置
                      in = 0;
                      out 
          = 0;
                  }

                  buffer[in
          ++= (byte) (b & 0xFF);
                  
          if (in >= buffer.length) {// 如果生產(chǎn)位置到達(dá)了通道的末尾,為了循環(huán)利用通道,將in置成0
                      in = 0;
                  }

              }


              
          synchronized void receive(byte b[], int off, int len) throws IOException {// 看,這個(gè)方法不是protected的!
                  checkStateForReceive();
                  writeSide 
          = Thread.currentThread();
                  
          int bytesToTransfer = len;// 需要接收多少產(chǎn)品的數(shù)量
                  while (bytesToTransfer > 0{
                      
          if (in == out)
                          awaitSpace();
                      
          int nextTransferAmount = 0;// 本次實(shí)際可以接收的數(shù)量
                      if (out < in) {
                          nextTransferAmount 
          = buffer.length - in;// 如果消費(fèi)的當(dāng)前位置<生產(chǎn)的當(dāng)前位置,則還可以再生產(chǎn)buffer.length-in這么多
                      }
           else if (in < out) {
                          
          if (in == -1{
                              in 
          = out = 0;// 如果已經(jīng)消費(fèi)完,則將in,out置成0,從頭開始接收
                              nextTransferAmount = buffer.length - in;
                          }
           else {
                              nextTransferAmount 
          = out - in;// 如果消費(fèi)的當(dāng)前位置>生產(chǎn)的當(dāng)前位置,而且還沒消費(fèi)完,那么至少還可以再生產(chǎn)out-in這么多,注意,這種情況是因?yàn)橥ǖ辣恢貜?fù)利用而產(chǎn)生的!
                          }

                      }

                      
          if (nextTransferAmount > bytesToTransfer)// 如果本次實(shí)際可以接收的數(shù)量要大于當(dāng)前傳過來的數(shù)量,
                          nextTransferAmount = bytesToTransfer;// 那么本次實(shí)際只能接收當(dāng)前傳過來的這么多了
                      assert (nextTransferAmount > 0);
                      System.arraycopy(b, off, buffer, in, nextTransferAmount);
          // 把本次實(shí)際接收的數(shù)量放進(jìn)通道
                      bytesToTransfer -= nextTransferAmount;// 算出還剩多少需要放進(jìn)通道
                      off += nextTransferAmount;
                      in 
          += nextTransferAmount;
                      
          if (in >= buffer.length) {// 到末尾了,該從頭開始了
                          in = 0;
                      }

                  }

              }

           

          消費(fèi)產(chǎn)品的行為:

              public synchronized int read() throws IOException {// 消費(fèi)單個(gè)產(chǎn)品
                  if (!connected) {
                      
          throw new IOException("Pipe not connected");
                  }
           else if (closedByReader) {
                      
          throw new IOException("Pipe closed");
                  }
           else if (writeSide != null && !writeSide.isAlive() && !closedByWriter
                          
          && (in < 0)) {
                      
          throw new IOException("Write end dead");
                  }


                  readSide 
          = Thread.currentThread();
                  
          int trials = 2;
                  
          while (in < 0{// in<0,表示通道是空的,等待生產(chǎn)者生產(chǎn)
                      if (closedByWriter) {
                          
          /* closed by writer, return EOF */
                          
          return -1;// 返回-1表示生產(chǎn)者已經(jīng)不再生產(chǎn)產(chǎn)品了,closedByWriter為true表示是由生產(chǎn)者將通道關(guān)閉的
                      }

                      
          if ((writeSide != null&& (!writeSide.isAlive()) && (--trials < 0)) {
                          
          throw new IOException("Pipe broken");
                      }

                      
          /* might be a writer waiting */
                      notifyAll();
                      
          try {
                          wait(
          1000);
                      }
           catch (InterruptedException ex) {
                          
          throw new java.io.InterruptedIOException();
                      }

                  }

                  
          int ret = buffer[out++& 0xFF;
                  
          if (out >= buffer.length) {
                      out 
          = 0;// 如果消費(fèi)到通道的末尾了,從通道頭開始繼續(xù)循環(huán)消費(fèi)
                  }

                  
          if (in == out) {
                      
          /* now empty */
                      in 
          = -1;// 消費(fèi)的位置和生產(chǎn)的位置重合了,表示消費(fèi)完了,需要生產(chǎn)者生產(chǎn),in置為-1
                  }

                  
          return ret;
              }


              
          public synchronized int read(byte b[], int off, int len) throws IOException {
                  
          if (b == null{
                      
          throw new NullPointerException();
                  }
           else if ((off < 0|| (off > b.length) || (len < 0)
                          
          || ((off + len) > b.length) || ((off + len) < 0)) {
                      
          throw new IndexOutOfBoundsException();
                  }
           else if (len == 0{
                      
          return 0;
                  }


                  
          /* possibly wait on the first character */
                  
          int c = read();// 利用消費(fèi)單個(gè)產(chǎn)品來檢測(cè)通道是否連接,并且通道中是否有東西可消費(fèi)
                  if (c < 0{
                      
          return -1;// 返回-1表示生產(chǎn)者生產(chǎn)完了,消費(fèi)者也消費(fèi)完了,消費(fèi)者可以關(guān)閉通道了
                  }

                  b[off] 
          = (byte) c;
                  
          int rlen = 1;

                  
          // 這里沒有采用receive(byte [], int ,
                  
          // int)方法中System.arrayCopy()的方法,其實(shí)用System.arrayCopy()的方法也可以實(shí)現(xiàn)
                  /*
                   * 這是用System.arrayCopy()實(shí)現(xiàn)的方法 int bytesToConsume = len - 1; while
                   * (bytesToConsume > 0 && in >= 0) { int nextConsumeAmount = 0; if (out <
                   * in) { nextConsumeAmount = in - out; // System.arraycopy(buffer, out,
                   * b, off, nextConsumeAmount); } else if (in < out) { nextConsumeAmount =
                   * buffer.length - out; }
                   * 
                   * if (nextConsumeAmount > bytesToConsume) nextConsumeAmount =
                   * bytesToConsume; assert (nextConsumeAmount > 0);
                   * System.arraycopy(buffer, out, b, off, nextConsumeAmount);
                   * bytesToConsume -= nextConsumeAmount; off += nextConsumeAmount; out +=
                   * nextConsumeAmount; rlen += nextConsumeAmount; if (out >=
                   * buffer.length) { out = 0; } if(in == out) { in = -1; } }
                   
          */


                  
          while ((in >= 0&& (--len > 0)) {
                      b[off 
          + rlen] = buffer[out++];
                      rlen
          ++;
                      
          if (out >= buffer.length) {
                          out 
          = 0;
                      }

                      
          if (in == out) {
                          
          /* now empty */
                          in 
          = -1;// in==out,表示滿了,將in置成-1
                      }

                  }

                  
          return rlen;
              }


          雖說功能看似簡(jiǎn)單,可是實(shí)現(xiàn)起來卻費(fèi)了一番功夫,在線程的調(diào)度上還是挺麻煩的,要考慮的地方很多,不過通過深入的了解這兩個(gè)類,讓我對(duì)多線程編程有了更多的認(rèn)識(shí).我覺得要想周密的分析整個(gè)功能,得把整個(gè)流程都分析清楚,構(gòu)造好了模型再去實(shí)現(xiàn)細(xì)節(jié),只有整體構(gòu)造對(duì)了,才能把正確的實(shí)現(xiàn)局部,自己的分析能力還有待加強(qiáng)啊!

          posted on 2008-04-10 17:47 爪哇豬 閱讀(3953) 評(píng)論(3)  編輯  收藏

          評(píng)論

          # re: 使用PipedInputStream,PipedOutputStream心得體會(huì) 2011-10-20 10:14 王康
          不知道你指的protected關(guān)鍵字有什么問題???  回復(fù)  更多評(píng)論
            

          # r的 2012-11-05 16:31
          的  回復(fù)  更多評(píng)論
            

          # re: 使用PipedInputStream,PipedOutputStream心得體會(huì)[未登錄] 2015-04-03 12:35 aaron
          請(qǐng)問可以引用嗎?謝謝  回復(fù)  更多評(píng)論
            


          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 榆中县| 得荣县| 海兴县| 武冈市| 德州市| 綦江县| 洛扎县| 托克托县| 平顶山市| 育儿| 赞皇县| 信阳市| 梓潼县| 辉县市| 安庆市| 泰和县| 临城县| 莫力| 唐河县| 临海市| 凤山市| 黎平县| 潜山县| 奎屯市| 临邑县| 和顺县| 沙洋县| 偏关县| 济源市| 鸡泽县| 舞阳县| 安西县| 辽阳县| 宁国市| 龙州县| 夹江县| 德昌县| 贵州省| 会东县| 靖西县| 沐川县|