posts - 1,  comments - 25,  trackbacks - 0

          徹底轉(zhuǎn)變流,第 2 部分:優(yōu)化 Java 內(nèi)部 I/O

          替換字節(jié)數(shù)組流和管道流

          developerWorks
          文檔選項(xiàng)
          將此頁(yè)作為電子郵件發(fā)送

          將此頁(yè)作為電子郵件發(fā)送


          級(jí)別: 初級(jí)

          Merlin Hughes (merlin@merlin.org), 密碼專(zhuān)家, Baltimore Technologies

          2002 年 12 月 20 日

          雖然新的 Java I/O 框架( java.nio )能解決 I/O 支持所具有的多數(shù)性能問(wèn)題,但是它并沒(méi)有滿(mǎn)足使用字節(jié)數(shù)組和管道的應(yīng)用程序內(nèi)部通信的所有性能需求。本文是分兩部分的系列文章的最后一篇,Java 密碼專(zhuān)家和作家 Merlin Hughes 開(kāi)發(fā)了一組新的流,以補(bǔ)充標(biāo)準(zhǔn)的 Java I/O 字節(jié)數(shù)組流類(lèi)和管道流類(lèi),在設(shè)計(jì)中強(qiáng)調(diào)以高性能為目標(biāo)。請(qǐng)到關(guān)于本文的 討論論壇,與作者和其他讀者分享您對(duì)本文的看法。(您也可以單擊文章頂部或底部的 討論。)

          本系列的第一篇文章中,您學(xué)習(xí)了解決從只能寫(xiě)出數(shù)據(jù)的源讀取數(shù)據(jù)的問(wèn)題的一些不同方法。在可能的解決方案中,我們研究了怎樣使用字節(jié)數(shù)組流、管道流以及直接處理該問(wèn)題的定制框架。定制方法顯然是最有效率的解決方案;但是,分析其它幾種方法有助于看清標(biāo)準(zhǔn) Java 流的一些問(wèn)題。具體地說(shuō),字節(jié)數(shù)組輸出流并不提供可提供對(duì)它的內(nèi)容進(jìn)行只讀訪(fǎng)問(wèn)的高效機(jī)制,管道流的性能通常很差。

          為了處理這些問(wèn)題,我們將在本文中實(shí)現(xiàn)功能同樣齊全的替換類(lèi),但在實(shí)現(xiàn)時(shí)更強(qiáng)調(diào)性能。讓我們先來(lái)簡(jiǎn)要地討論一下同步問(wèn)題,因?yàn)樗c I/O 流有關(guān)。

          同步問(wèn)題

          一般來(lái)說(shuō),我推薦在不是特別需要同步的情況下避免不必要地使用同步。顯然,如果多個(gè)線(xiàn)程需并發(fā)地訪(fǎng)問(wèn)一個(gè)類(lèi),那么這個(gè)類(lèi)需確保線(xiàn)程安全。但是,在許多情況下并不需要并發(fā)的訪(fǎng)問(wèn),同步成了不必要的開(kāi)銷(xiāo)。例如,對(duì)流的并發(fā)訪(fǎng)問(wèn)自然是不確定的 ― 您無(wú)法預(yù)測(cè)哪些數(shù)據(jù)被先寫(xiě)入,也無(wú)法預(yù)測(cè)哪個(gè)線(xiàn)程讀了哪些數(shù)據(jù) ― 也就是說(shuō),在多數(shù)情況下,對(duì)流的并發(fā)訪(fǎng)問(wèn)是沒(méi)用的。所以,對(duì)所有的流強(qiáng)制同步是不提供實(shí)際好處的花費(fèi)。如果某個(gè)應(yīng)用程序要求線(xiàn)程安全,那么通過(guò)應(yīng)用程序自己的同步原語(yǔ)可以強(qiáng)制線(xiàn)程安全。

          事實(shí)上,Collection 類(lèi)的 API 作出了同樣的選擇:在缺省的情況下,set、list 等等都不是線(xiàn)程安全的。如果應(yīng)用程序想使用線(xiàn)程安全的 Collection,那么它可以使用 Collections 類(lèi)來(lái)創(chuàng)建一個(gè)線(xiàn)程安全的包裝器來(lái)包裝非線(xiàn)程安全的 Collection。如果這種作法是有用的,那么應(yīng)用程序可以使用完全相同的機(jī)制來(lái)包裝流,以使它線(xiàn)程安全;例如, OutputStream out = Streams.synchronizedOutputStream (byteStream) 。請(qǐng)參閱附帶的 源代碼中的 Streams 類(lèi),這是一個(gè)實(shí)現(xiàn)的示例。

          所以,對(duì)于我所認(rèn)為的多個(gè)并發(fā)線(xiàn)程無(wú)法使用的類(lèi),我沒(méi)用同步來(lái)為這些類(lèi)提供線(xiàn)程安全。在您廣泛采用這種方式前,我推薦您研究一下 Java 語(yǔ)言規(guī)范(Java Language Specification)的 Threads and Locks那一章(請(qǐng)參閱 參考資料),以理解潛在的缺陷;具體地說(shuō),在未使用同步的情況下無(wú)法確保讀寫(xiě)的順序,所以,對(duì)不同步的只讀方法的并發(fā)訪(fǎng)問(wèn)可能導(dǎo)致意外的行為,盡管這種訪(fǎng)問(wèn)看起來(lái)是無(wú)害的。





          回頁(yè)首


          更好的字節(jié)數(shù)組輸出流

          當(dāng)您需要把未知容量的數(shù)據(jù)轉(zhuǎn)儲(chǔ)到內(nèi)存緩沖區(qū)時(shí), ByteArrayOutputStream 類(lèi)是使用效果很好的流。當(dāng)我為以后再次讀取而存儲(chǔ)一些數(shù)據(jù)時(shí),我經(jīng)常使用這個(gè)類(lèi)。但是,使用 toByteArray() 方法來(lái)取得對(duì)結(jié)果數(shù)據(jù)的讀訪(fǎng)問(wèn)是很低效的,因?yàn)樗鼘?shí)際返回的是內(nèi)部字節(jié)數(shù)組的副本。對(duì)于小容量的數(shù)據(jù),使用這種方式不會(huì)有太大問(wèn)題;然而,隨著容量增大,這種方式的效率被不必要地降低了。這個(gè)類(lèi)必須復(fù)制數(shù)據(jù),因?yàn)樗荒軓?qiáng)制對(duì)結(jié)果字節(jié)數(shù)組進(jìn)行只讀訪(fǎng)問(wèn)。如果它返回它的內(nèi)部緩沖區(qū),那么在一般的情況下,接收方無(wú)法保證該緩沖區(qū)未被同一數(shù)組的另一個(gè)接收方并發(fā)地修改。

          StringBuffer 類(lèi)已解決了類(lèi)似的問(wèn)題;它提供可寫(xiě)的字符緩沖區(qū),它還支持高效地返回能從內(nèi)部字符數(shù)組直接讀取的只讀 String 。因?yàn)?StringBuffer 類(lèi)控制著對(duì)它的內(nèi)部數(shù)組的寫(xiě)訪(fǎng)問(wèn),所以它僅在必要時(shí)才復(fù)制它的數(shù)組;也就是說(shuō),當(dāng)它導(dǎo)出了 String 且后來(lái)調(diào)用程序修改了 StringBuffer 的時(shí)候。如果沒(méi)有發(fā)生這樣的修改,那么任何不必要的復(fù)制都不會(huì)被執(zhí)行。通過(guò)支持能夠強(qiáng)制適當(dāng)?shù)脑L(fǎng)問(wèn)控制的字節(jié)數(shù)組的包裝器,新的 I/O 框架以類(lèi)似的方式解決了這個(gè)問(wèn)題。

          我們可以使用相同的通用機(jī)制為需要使用標(biāo)準(zhǔn)流 API 的應(yīng)用程序提供高效的數(shù)據(jù)緩沖和再次讀取。我們的示例給出了可替代 ByteArrayOutputStream 類(lèi)的類(lèi),它能高效地導(dǎo)出對(duì)內(nèi)部緩沖區(qū)的只讀訪(fǎng)問(wèn),方法是返回直接讀取內(nèi)部字節(jié)數(shù)組的只讀 InputStream

          我們來(lái)看一下代碼。清單 1 中的構(gòu)造函數(shù)分配了初始緩沖區(qū),以存儲(chǔ)寫(xiě)到這個(gè)流的數(shù)據(jù)。為了存儲(chǔ)更多的數(shù)據(jù),該緩沖區(qū)將按需自動(dòng)地?cái)U(kuò)展。


          清單 1. 不同步的字節(jié)數(shù)組輸出流
          package org.merlin.io;
                                  import java.io.*;
                                  /**
                                  * An unsynchronized ByteArrayOutputStream alternative that efficiently
                                  * provides read-only access to the internal byte array with no
                                  * unnecessary copying.
                                  *
                                  * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
                                  */
                                  public class BytesOutputStream extends OutputStream {
                                  private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;
                                  // internal buffer
                                  private byte[] buffer;
                                  private int index, capacity;
                                  // is the stream closed?
                                  private boolean closed;
                                  // is the buffer shared?
                                  private boolean shared;
                                  public BytesOutputStream () {
                                  this (DEFAULT_INITIAL_BUFFER_SIZE);
                                  }
                                  public BytesOutputStream (int initialBufferSize) {
                                  capacity = initialBufferSize;
                                  buffer = new byte[capacity];
                                  }
                                  

          清單 2 顯示的是寫(xiě)方法。這些方法按需擴(kuò)展內(nèi)部緩沖區(qū),然后把新數(shù)據(jù)復(fù)制進(jìn)來(lái)。在擴(kuò)展內(nèi)部緩沖區(qū)時(shí),我們使緩沖區(qū)的大小增加了一倍再加上存儲(chǔ)新數(shù)據(jù)所需的容量;這樣,為了存儲(chǔ)任何所需的數(shù)據(jù),緩沖區(qū)的容量成指數(shù)地增長(zhǎng)。為了提高效率,如果您知道您將寫(xiě)入的數(shù)據(jù)的預(yù)期容量,那么您應(yīng)該指定相應(yīng)的初始緩沖區(qū)的大小。 close() 方法只是設(shè)置了一個(gè)合適的標(biāo)志。


          清單 2. 寫(xiě)方法
            public void write (int datum) throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  if (index >= capacity) {
                                  // expand the internal buffer
                                  capacity = capacity * 2 + 1;
                                  byte[] tmp = new byte[capacity];
                                  System.arraycopy (buffer, 0, tmp, 0, index);
                                  buffer = tmp;
                                  // the new buffer is not shared
                                  shared = false;
                                  }
                                  // store the byte
                                  buffer[index ++] = (byte) datum;
                                  }
                                  }
                                  public void write (byte[] data, int offset, int length)
                                  throws IOException {
                                  if (data == null) {
                                  throw new NullPointerException ();
                                  } else if ((offset < 0) || (offset + length > data.length)
                                  || (length < 0)) {
                                  throw new IndexOutOfBoundsException ();
                                  } else if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  if (index + length > capacity) {
                                  // expand the internal buffer
                                  capacity = capacity * 2 + length;
                                  byte[] tmp = new byte[capacity];
                                  System.arraycopy (buffer, 0, tmp, 0, index);
                                  buffer = tmp;
                                  // the new buffer is not shared
                                  shared = false;
                                  }
                                  // copy in the subarray
                                  System.arraycopy (data, offset, buffer, index, length);
                                  index += length;
                                  }
                                  }
                                  public void close () {
                                  closed = true;
                                  }
                                  

          清單 3 中的字節(jié)數(shù)組抽取方法返回內(nèi)部字節(jié)數(shù)組的副本。因?yàn)槲覀儫o(wú)法防止調(diào)用程序把數(shù)據(jù)寫(xiě)到結(jié)果數(shù)組,所以我們無(wú)法安全地返回對(duì)內(nèi)部緩沖區(qū)的直接引用。


          清單 3. 轉(zhuǎn)換成字節(jié)數(shù)組
            public byte[] toByteArray () {
                                  // return a copy of the internal buffer
                                  byte[] result = new byte[index];
                                  System.arraycopy (buffer, 0, result, 0, index);
                                  return result;
                                  }
                                  

          當(dāng)方法提供對(duì)存儲(chǔ)的數(shù)據(jù)的只讀訪(fǎng)問(wèn)的時(shí)候,它們可以安全地高效地直接使用內(nèi)部字節(jié)數(shù)組。清單 4 顯示了兩個(gè)這樣的方法。 writeTo() 方法把這個(gè)流的內(nèi)容寫(xiě)到輸出流;它直接從內(nèi)部緩沖區(qū)進(jìn)行寫(xiě)操作。 toInputStream() 方法返回了可被高效地讀取數(shù)據(jù)的輸入流。它所返回的 BytesInputStream (這是 ByteArrayInputStream 的非同步替代品。)能直接從我們的內(nèi)部字節(jié)數(shù)組讀取數(shù)據(jù)。在這個(gè)方法中,我們還設(shè)置了標(biāo)志,以表示內(nèi)部緩沖區(qū)正被輸入流共享。這一點(diǎn)很重要,因?yàn)檫@樣做可以防止在內(nèi)部緩沖區(qū)正被共享時(shí)這個(gè)流被修改。


          清單 4. 只讀訪(fǎng)問(wèn)方法
            public void writeTo (OutputStream out) throws IOException {
                                  // write the internal buffer directly
                                  out.write (buffer, 0, index);
                                  }
                                  public InputStream toInputStream () {
                                  // return a stream reading from the shared internal buffer
                                  shared = true;
                                  return new BytesInputStream (buffer, 0, index);
                                  }
                                  

          可能會(huì)覆蓋共享數(shù)據(jù)的唯一的一個(gè)方法是顯示在清單 5 中的 reset() 方法,該方法清空了這個(gè)流。所以,如果 shared 等于 true 且 reset() 被調(diào)用,那么我們創(chuàng)建新的內(nèi)部緩沖區(qū),而不是重新設(shè)置寫(xiě)索引。


          清單 5. 重新設(shè)置流
            public void reset () throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  if (shared) {
                                  // create a new buffer if it is shared
                                  buffer = new byte[capacity];
                                  shared = false;
                                  }
                                  // reset index
                                  index = 0;
                                  }
                                  }
                                  }
                                  





          回頁(yè)首


          更好的字節(jié)數(shù)組輸入流

          ByteArrayInputStream 類(lèi)來(lái)提供對(duì)內(nèi)存中的二進(jìn)制數(shù)據(jù)基于流的讀訪(fǎng)問(wèn)是很理想的。但是,有時(shí)候,它的兩個(gè)設(shè)計(jì)特點(diǎn)使我覺(jué)得需要一個(gè)替代它的類(lèi)。第一,這個(gè)類(lèi)是同步的;我已講過(guò),對(duì)于多數(shù)應(yīng)用程序來(lái)說(shuō)沒(méi)有這個(gè)必要。第二,如果在執(zhí)行 mark() 前調(diào)用它所實(shí)現(xiàn)的 reset() 方法,那么 reset() 將忽略初始讀偏移。這兩點(diǎn)都不是缺陷;但是,它們不一定總是人們所期望的。

          清單 6 中的 BytesInputStream 類(lèi)是不同步的較為普通的字節(jié)數(shù)組輸入流類(lèi)。


          清單 6. 不同步的字節(jié)數(shù)組輸入流
          package org.merlin.io;
                                  import java.io.*;
                                  /**
                                  * An unsynchronized ByteArrayInputStream alternative.
                                  *
                                  * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
                                  */
                                  public class BytesInputStream extends InputStream {
                                  // buffer from which to read
                                  private byte[] buffer;
                                  private int index, limit, mark;
                                  // is the stream closed?
                                  private boolean closed;
                                  public BytesInputStream (byte[] data) {
                                  this (data, 0, data.length);
                                  }
                                  public BytesInputStream (byte[] data, int offset, int length) {
                                  if (data == null) {
                                  throw new NullPointerException ();
                                  } else if ((offset < 0) || (offset + length > data.length)
                                  || (length < 0)) {
                                  throw new IndexOutOfBoundsException ();
                                  } else {
                                  buffer = data;
                                  index = offset;
                                  limit = offset + length;
                                  mark = offset;
                                  }
                                  }
                                  public int read () throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else if (index >= limit) {
                                  return -1; // EOF
                                  } else {
                                  return buffer[index ++] & 0xff;
                                  }
                                  }
                                  public int read (byte data[], int offset, int length)
                                  throws IOException {
                                  if (data == null) {
                                  throw new NullPointerException ();
                                  } else if ((offset < 0) || (offset + length > data.length)
                                  || (length < 0)) {
                                  throw new IndexOutOfBoundsException ();
                                  } else if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else if (index >= limit) {
                                  return -1; // EOF
                                  } else {
                                  // restrict length to available data
                                  if (length > limit - index)
                                  length = limit - index;
                                  // copy out the subarray
                                  System.arraycopy (buffer, index, data, offset, length);
                                  index += length;
                                  return length;
                                  }
                                  }
                                  public long skip (long amount) throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else if (amount <= 0) {
                                  return 0;
                                  } else {
                                  // restrict amount to available data
                                  if (amount > limit - index)
                                  amount = limit - index;
                                  index += (int) amount;
                                  return amount;
                                  }
                                  }
                                  public int available () throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  return limit - index;
                                  }
                                  }
                                  public void close () {
                                  closed = true;
                                  }
                                  public void mark (int readLimit) {
                                  mark = index;
                                  }
                                  public void reset () throws IOException {
                                  if (closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  // reset index
                                  index = mark;
                                  }
                                  }
                                  public boolean markSupported () {
                                  return true;
                                  }
                                  }
                                  





          回頁(yè)首


          使用新的字節(jié)數(shù)組流

          清單 7 中的代碼演示了怎樣使用新的字節(jié)數(shù)組流來(lái)解決第一篇文章中處理的問(wèn)題(讀一些壓縮形式的數(shù)據(jù)):


          清單 7. 使用新的字節(jié)數(shù)組流
          public static InputStream newBruteForceCompress (InputStream in)
                                  throws IOException {
                                  BytesOutputStream sink = new BytesOutputStream ();
                                  OutputStream out = new GZIPOutputStream (sink);
                                  Streams.io (in, out);
                                  out.close ();
                                  return sink.toInputStream ();
                                  }
                                  





          回頁(yè)首


          更好的管道流

          雖然標(biāo)準(zhǔn)的管道流既安全又可靠,但在性能方面不能令人滿(mǎn)意。幾個(gè)因素導(dǎo)致了它的性能問(wèn)題:

          • 對(duì)于不同的使用情況,大小為 1024 字節(jié)的內(nèi)部緩沖區(qū)并不都適用;對(duì)于大容量的數(shù)據(jù),該緩沖區(qū)太小了。
          • 基于數(shù)組的操作只是反復(fù)調(diào)用低效的一個(gè)字節(jié)一個(gè)字節(jié)地復(fù)制操作。該操作本身是同步的,從而導(dǎo)致非常嚴(yán)重的鎖爭(zhēng)用。
          • 如果管道變空或變滿(mǎn)而在這種狀態(tài)改變時(shí)一個(gè)線(xiàn)程阻塞了,那么,即使僅有一個(gè)字節(jié)被讀或?qū)懀摼€(xiàn)程也被喚醒。在許多情況下,線(xiàn)程將使用這一個(gè)字節(jié)并立即再次阻塞,這將導(dǎo)致只做了很少有用的工作。

          最后一個(gè)因素是 API 提供的嚴(yán)格的約定的后果。對(duì)于最通用的可能的應(yīng)用程序中使用的流來(lái)說(shuō),這種嚴(yán)格的約定是必要的。但是,對(duì)于管道流實(shí)現(xiàn),提供一種更寬松的約定是可能的,這個(gè)約定犧牲嚴(yán)格性以換取性能的提高:

          • 僅當(dāng)緩沖區(qū)的可用數(shù)據(jù)(對(duì)阻塞的讀程序而言)或可用空間(對(duì)寫(xiě)程序而言)達(dá)到指定的某個(gè) 滯后閾值或發(fā)生異常事件(例如管道關(guān)閉)時(shí),阻塞的讀程序和寫(xiě)程序才被喚醒。這將提高性能,因?yàn)閮H當(dāng)線(xiàn)程能完成適度的工作量時(shí)它們才被喚醒。
          • 只有一個(gè)線(xiàn)程可以從管道讀取數(shù)據(jù),只有一個(gè)線(xiàn)程可以把數(shù)據(jù)寫(xiě)到管道。否則,管道無(wú)法可靠地確定讀程序線(xiàn)程或?qū)懗绦蚓€(xiàn)程何時(shí)意外死亡。

          這個(gè)約定可完全適合典型應(yīng)用程序情形中獨(dú)立的讀程序線(xiàn)程和寫(xiě)程序線(xiàn)程;需要立即喚醒的應(yīng)用程序可以使用零滯后級(jí)別。我們將在后面看到,這個(gè)約定的實(shí)現(xiàn)的操作速度比標(biāo)準(zhǔn) API 流的速度快兩個(gè)數(shù)量級(jí)(100 倍)。

          我們可以使用幾個(gè)可能的 API 中的一個(gè)來(lái)開(kāi)發(fā)這些管道流:我們可以模仿標(biāo)準(zhǔn)類(lèi),顯式地連接兩個(gè)流;我們也可以開(kāi)發(fā)一個(gè) Pipe 類(lèi)并從這個(gè)類(lèi)抽取輸出流和輸入流。我們不使用這兩種方式而是使用更簡(jiǎn)單的方式:創(chuàng)建一個(gè) PipeInputStream ,然后抽取關(guān)聯(lián)的輸出流。

          這些流的一般操作如下:

          • 我們把內(nèi)部數(shù)組用作環(huán)緩沖區(qū)(請(qǐng)看圖 1):這個(gè)數(shù)組中維護(hù)著一個(gè)讀索引和一個(gè)寫(xiě)索引;數(shù)據(jù)被寫(xiě)到寫(xiě)索引所指的位置,數(shù)據(jù)從讀索引所指的位置被讀取;當(dāng)兩個(gè)索引到達(dá)緩沖區(qū)末尾時(shí),它們回繞到緩沖區(qū)起始點(diǎn)。任一個(gè)索引不能超越另一個(gè)索引。當(dāng)寫(xiě)索引到達(dá)讀索引時(shí),管道是滿(mǎn)的,不能再寫(xiě)任何數(shù)據(jù)。當(dāng)讀索引到達(dá)寫(xiě)索引時(shí),管道是空的,不能再讀任何數(shù)據(jù)。
          • 同步被用來(lái)確保兩個(gè)協(xié)作線(xiàn)程看到管道狀態(tài)的最新值。Java 語(yǔ)言規(guī)范對(duì)內(nèi)存訪(fǎng)問(wèn)的順序的規(guī)定是很寬容的,因此,無(wú)法使用無(wú)鎖緩沖技術(shù)。

          圖 1. 環(huán)緩沖區(qū)
          環(huán)緩沖區(qū)

          在下面的代碼清單中給出的是實(shí)現(xiàn)這些管道流的代碼。清單 8 顯示了這個(gè)類(lèi)所用的構(gòu)造函數(shù)和變量。您可以從這個(gè) InputStream 中抽取相應(yīng)的 OutputStream (請(qǐng)看清單 17 中的代碼)。在構(gòu)造函數(shù)中您可以指定內(nèi)部緩沖區(qū)的大小和滯后級(jí)別;這是緩沖區(qū)容量的一部分,在相應(yīng)的讀程序線(xiàn)程或?qū)懗绦蚓€(xiàn)程被立即喚醒前必須被使用或可用。我們維護(hù)兩個(gè)變量, readerwriter ,它們與讀程序線(xiàn)程和寫(xiě)程序線(xiàn)程相對(duì)應(yīng)。我們用它們來(lái)發(fā)現(xiàn)什么時(shí)候一個(gè)線(xiàn)程已死亡而另一個(gè)線(xiàn)程仍在訪(fǎng)問(wèn)流。


          清單 8. 一個(gè)替代的管道流實(shí)現(xiàn)
          package org.merlin.io;
                                  import java.io.*;
                                  /**
                                  * An efficient connected stream pair for communicating between
                                  * the threads of an application. This provides a less-strict contract
                                  * than the standard piped streams, resulting in much-improved
                                  * performance. Also supports non-blocking operation.
                                  *
                                  * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
                                  */
                                  public class PipeInputStream extends InputStream {
                                  // default values
                                  private static final int DEFAULT_BUFFER_SIZE = 8192;
                                  private static final float DEFAULT_HYSTERESIS = 0.75f;
                                  private static final int DEFAULT_TIMEOUT_MS = 1000;
                                  // flag indicates whether method applies to reader or writer
                                  private static final boolean READER = false, WRITER = true;
                                  // internal pipe buffer
                                  private byte[] buffer;
                                  // read/write index
                                  private int readx, writex;
                                  // pipe capacity, hysteresis level
                                  private int capacity, level;
                                  // flags
                                  private boolean eof, closed, sleeping, nonBlocking;
                                  // reader/writer thread
                                  private Thread reader, writer;
                                  // pending exception
                                  private IOException exception;
                                  // deadlock-breaking timeout
                                  private int timeout = DEFAULT_TIMEOUT_MS;
                                  public PipeInputStream () {
                                  this (DEFAULT_BUFFER_SIZE, DEFAULT_HYSTERESIS);
                                  }
                                  public PipeInputStream (int bufferSize) {
                                  this (bufferSize, DEFAULT_HYSTERESIS);
                                  }
                                  // e.g., hysteresis .75 means sleeping reader/writer is not
                                  // immediately woken until the buffer is 75% full/empty
                                  public PipeInputStream (int bufferSize, float hysteresis) {
                                  if ((hysteresis < 0.0) || (hysteresis > 1.0))
                                  throw new IllegalArgumentException ("Hysteresis: " + hysteresis);
                                  capacity = bufferSize;
                                  buffer = new byte[capacity];
                                  level = (int) (bufferSize * hysteresis);
                                  }
                                  

          清單 9 中的配置方法允許您配置流的超時(shí)值和非阻塞模式。超時(shí)值的單位是毫秒,它表示阻塞的線(xiàn)程在過(guò)了這段時(shí)間后將被自動(dòng)喚醒;這對(duì)于打破在一個(gè)線(xiàn)程死亡的情況下可能發(fā)生的死鎖是必要的。在非阻塞模式中,如果線(xiàn)程阻塞,那么 InterruptedIOException 將被拋出。


          清單 9. 管道配置
            public void setTimeout (int ms) {
                                  this.timeout = ms;
                                  }
                                  public void setNonBlocking (boolean nonBlocking) {
                                  this.nonBlocking = nonBlocking;
                                  }
                                  

          清單 10 中的讀方法都遵循相當(dāng)標(biāo)準(zhǔn)的模式:如果我們還沒(méi)有讀線(xiàn)程的引用,那么我們先取得它,然后我們驗(yàn)證輸入?yún)?shù),核對(duì)流未被關(guān)閉或沒(méi)有異常待處理,確定可以讀取多少數(shù)據(jù),最后把數(shù)據(jù)從內(nèi)部的環(huán)緩沖區(qū)復(fù)制到讀程序的緩沖區(qū)。清單 12 中的 checkedAvailable() 方法在返回前自動(dòng)地等待,直到出現(xiàn)一些可用的數(shù)據(jù)或流被關(guān)閉。


          清單 10. 讀數(shù)據(jù)
            private byte[] one = new byte[1];
                                  public int read () throws IOException {
                                  // read 1 byte
                                  int amount = read (one, 0, 1);
                                  // return EOF / the byte
                                  return (amount < 0) ? -1 : one[0] & 0xff;
                                  }
                                  public synchronized int read (byte data[], int offset, int length)
                                  throws IOException {
                                  // take a reference to the reader thread
                                  if (reader == null)
                                  reader = Thread.currentThread ();
                                  // check parameters
                                  if (data == null) {
                                  throw new NullPointerException ();
                                  } else if ((offset < 0) || (offset + length > data.length)
                                  || (length < 0)) { // check indices
                                  throw new IndexOutOfBoundsException ();
                                  } else {
                                  // throw an exception if the stream is closed
                                  closedCheck ();
                                  // throw any pending exception
                                  exceptionCheck ();
                                  if (length <= 0) {
                                  return 0;
                                  } else {
                                  // wait for some data to become available for reading
                                  int available = checkedAvailable (READER);
                                  // return -1 on EOF
                                  if (available < 0)
                                  return -1;
                                  // calculate amount of contiguous data in pipe buffer
                                  int contiguous = capacity - (readx % capacity);
                                  // calculate how much we will read this time
                                  int amount = (length > available) ? available : length;
                                  if (amount > contiguous) {
                                  // two array copies needed if data wrap around the buffer end
                                  System.arraycopy (buffer, readx % capacity, data, offset,
                                  contiguous);
                                  System.arraycopy (buffer, 0, data, offset + contiguous,
                                  amount - contiguous);
                                  } else {
                                  // otherwise, one array copy needed
                                  System.arraycopy (buffer, readx % capacity, data, offset,
                                  amount);
                                  }
                                  // update indices with amount of data read
                                  processed (READER, amount);
                                  // return amount read
                                  return amount;
                                  }
                                  }
                                  }
                                  public synchronized long skip (long amount) throws IOException {
                                  // take a reference to the reader thread
                                  if (reader == null)
                                  reader = Thread.currentThread ();
                                  // throw an exception if the stream is closed
                                  closedCheck ();
                                  // throw any pending exception
                                  exceptionCheck ();
                                  if (amount <= 0) {
                                  return 0;
                                  } else {
                                  // wait for some data to become available for skipping
                                  int available = checkedAvailable (READER);
                                  // return 0 on EOF
                                  if (available < 0)
                                  return 0;
                                  // calculate how much we will skip this time
                                  if (amount > available)
                                  amount = available;
                                  // update indices with amount of data skipped
                                  processed (READER, (int) amount);
                                  // return amount skipped
                                  return amount;
                                  }
                                  }
                                  

          當(dāng)數(shù)據(jù)從這個(gè)管道被讀取或數(shù)據(jù)被寫(xiě)到這個(gè)管道時(shí),清單 11 中的方法被調(diào)用。該方法更新有關(guān)的索引,如果管道達(dá)到它的滯后級(jí)別,該方法自動(dòng)地喚醒阻塞的線(xiàn)程。


          清單 11. 更新索引
            private void processed (boolean rw, int amount) {
                                  if (rw == READER) {
                                  // update read index with amount read
                                  readx = (readx + amount) % (capacity * 2);
                                  } else {
                                  // update write index with amount written
                                  writex = (writex + amount) % (capacity * 2);
                                  }
                                  // check whether a thread is sleeping and we have reached the
                                  // hysteresis threshold
                                  if (sleeping && (available (!rw) >= level)) {
                                  // wake sleeping thread
                                  notify ();
                                  sleeping = false;
                                  }
                                  }
                                  

          在管道有可用空間或可用數(shù)據(jù)(取決于 rw 參數(shù))前,清單 12 中的 checkedAvailable() 方法一直等待,然后把空間的大小或數(shù)據(jù)的多少返回給調(diào)用程序。在這個(gè)方法內(nèi)還核對(duì)流未被關(guān)閉、管道未被破壞等。


          清單 12. 檢查可用性
            public synchronized int available () throws IOException {
                                  // throw an exception if the stream is closed
                                  closedCheck ();
                                  // throw any pending exception
                                  exceptionCheck ();
                                  // determine how much can be read
                                  int amount = available (READER);
                                  // return 0 on EOF, otherwise the amount readable
                                  return (amount < 0) ? 0 : amount;
                                  }
                                  private int checkedAvailable (boolean rw) throws IOException {
                                  // always called from synchronized(this) method
                                  try {
                                  int available;
                                  // loop while no data can be read/written
                                  while ((available = available (rw)) == 0) {
                                  if (rw == READER) { // reader
                                  // throw any pending exception
                                  exceptionCheck ();
                                  } else { // writer
                                  // throw an exception if the stream is closed
                                  closedCheck ();
                                  }
                                  // throw an exception if the pipe is broken
                                  brokenCheck (rw);
                                  if (!nonBlocking) { // blocking mode
                                  // wake any sleeping thread
                                  if (sleeping)
                                  notify ();
                                  // sleep for timeout ms (in case of peer thread death)
                                  sleeping = true;
                                  wait (timeout);
                                  // timeout means that hysteresis may not be obeyed
                                  } else { // non-blocking mode
                                  // throw an InterruptedIOException
                                  throw new InterruptedIOException
                                  ("Pipe " + (rw ? "full" : "empty"));
                                  }
                                  }
                                  return available;
                                  } catch (InterruptedException ex) {
                                  // rethrow InterruptedException as InterruptedIOException
                                  throw new InterruptedIOException (ex.getMessage ());
                                  }
                                  }
                                  private int available (boolean rw) {
                                  // calculate amount of space used in pipe
                                  int used = (writex + capacity * 2 - readx) % (capacity * 2);
                                  if (rw == WRITER) { // writer
                                  // return amount of space available for writing
                                  return capacity - used;
                                  } else { // reader
                                  // return amount of data in pipe or -1 at EOF
                                  return (eof && (used == 0)) ? -1 : used;
                                  }
                                  }
                                  

          清單 13 中的方法關(guān)閉這個(gè)流;該方法還提供對(duì)讀程序或?qū)懗绦蜿P(guān)閉流的支持。阻塞的線(xiàn)程被自動(dòng)喚醒,該方法還檢查各種其它情況是否正常。


          清單 13. 關(guān)閉流
            public void close () throws IOException {
                                  // close the read end of this pipe
                                  close (READER);
                                  }
                                  private synchronized void close (boolean rw) throws IOException {
                                  if (rw == READER) { // reader
                                  // set closed flag
                                  closed = true;
                                  } else if (!eof) { // writer
                                  // set eof flag
                                  eof = true;
                                  // check if data remain unread
                                  if (available (READER) > 0) {
                                  // throw an exception if the reader has already closed the pipe
                                  closedCheck ();
                                  // throw an exception if the reader thread has died
                                  brokenCheck (WRITER);
                                  }
                                  }
                                  // wake any sleeping thread
                                  if (sleeping) {
                                  notify ();
                                  sleeping = false;
                                  }
                                  }
                                  

          清單 14 中的方法檢查這個(gè)流的狀態(tài)。如果有異常待處理,那么流被關(guān)閉或管道被破壞(也就是說(shuō),讀程序線(xiàn)程或?qū)懗绦蚓€(xiàn)程已死亡),異常被拋出。


          清單 14. 檢查流狀態(tài)
            private void exceptionCheck () throws IOException {
                                  // throw any pending exception
                                  if (exception != null) {
                                  IOException ex = exception;
                                  exception = null;
                                  throw ex; // could wrap ex in a local exception
                                  }
                                  }
                                  private void closedCheck () throws IOException {
                                  // throw an exception if the pipe is closed
                                  if (closed)
                                  throw new IOException ("Stream closed");
                                  }
                                  private void brokenCheck (boolean rw) throws IOException {
                                  // get a reference to the peer thread
                                  Thread thread = (rw == WRITER) ? reader : writer;
                                  // throw an exception if  the peer thread has died
                                  if ((thread != null) && !thread.isAlive ())
                                  throw new IOException ("Broken pipe");
                                  }
                                  

          當(dāng)數(shù)據(jù)被寫(xiě)入這個(gè)管道時(shí),清單 15 中的方法被調(diào)用。總的來(lái)說(shuō),它類(lèi)似于讀方法:我們先取得寫(xiě)程序線(xiàn)程的副本,然后檢查流是否被關(guān)閉,接著進(jìn)入把數(shù)據(jù)復(fù)制到管道的循環(huán)。和前面一樣,該方法使用 checkedAvailable() 方法,checkedAvailable() 自動(dòng)阻塞,直到管道中有可用的容量。


          清單 15. 寫(xiě)數(shù)據(jù)
          private synchronized void writeImpl (byte[] data, int offset, int length)
                                  throws IOException {
                                  // take a reference to the writer thread
                                  if (writer == null)
                                  writer = Thread.currentThread ();
                                  // throw an exception if the stream is closed
                                  if (eof || closed) {
                                  throw new IOException ("Stream closed");
                                  } else {
                                  int written = 0;
                                  try {
                                  // loop to write all the data
                                  do {
                                  // wait for space to become available for writing
                                  int available = checkedAvailable (WRITER);
                                  // calculate amount of contiguous space in pipe buffer
                                  int contiguous = capacity - (writex % capacity);
                                  // calculate how much we will write this time
                                  int amount = (length > available) ? available : length;
                                  if (amount > contiguous) {
                                  // two array copies needed if space wraps around the buffer end
                                  System.arraycopy (data, offset, buffer, writex % capacity,
                                  contiguous);
                                  System.arraycopy (data, offset + contiguous, buffer, 0,
                                  amount - contiguous);
                                  } else {
                                  // otherwise, one array copy needed
                                  System.arraycopy (data, offset, buffer, writex % capacity,
                                  amount);
                                  }
                                  // update indices with amount of data written
                                  processed (WRITER, amount);
                                  // update amount written by this method
                                  written += amount;
                                  } while (written < length);
                                  // data successfully written
                                  } catch (InterruptedIOException ex) {
                                  // write operation was interrupted; set the bytesTransferred
                                  // exception field to reflect the amount of data written
                                  ex.bytesTransferred = written;
                                  // rethrow exception
                                  throw ex;
                                  }
                                  }
                                  }
                                  

          如清單 16 所示,這個(gè)管道流實(shí)現(xiàn)的特點(diǎn)之一是寫(xiě)程序可設(shè)置一個(gè)被傳遞給讀程序的異常。


          清單 16. 設(shè)置異常
            private synchronized void setException (IOException ex)
                                  throws IOException {
                                  // fail if an exception is already pending
                                  if (exception != null)
                                  throw new IOException ("Exception already set: " + exception);
                                  // throw an exception if the pipe is broken
                                  brokenCheck (WRITER);
                                  // take a reference to the pending exception
                                  this.exception = ex;
                                  // wake any sleeping thread
                                  if (sleeping) {
                                  notify ();
                                  sleeping = false;
                                  }
                                  }
                                  

          清單 17 給出這個(gè)管道的有關(guān)輸出流的代碼。 getOutputStream() 方法返回 OutputStreamImpl ,OutputStreamImpl 是使用前面給出的方法來(lái)把數(shù)據(jù)寫(xiě)到內(nèi)部管道的輸出流。OutputStreamImpl 類(lèi)繼承了 OutputStreamEx ,OutputStreamEx 是允許為讀線(xiàn)程設(shè)置異常的輸出流類(lèi)的擴(kuò)展。


          清單 17. 輸出流
            public OutputStreamEx getOutputStream () {
                                  // return an OutputStreamImpl associated with this pipe
                                  return new OutputStreamImpl ();
                                  }
                                  private class OutputStreamImpl extends OutputStreamEx {
                                  private byte[] one = new byte[1];
                                  public void write (int datum) throws IOException {
                                  // write one byte using internal array
                                  one[0] = (byte) datum;
                                  write (one, 0, 1);
                                  }
                                  public void write (byte[] data, int offset, int length)
                                  throws IOException {
                                  // check parameters
                                  if (data == null) {
                                  throw new NullPointerException ();
                                  } else if ((offset < 0) || (offset + length > data.length)
                                  || (length < 0)) {
                                  throw new IndexOutOfBoundsException ();
                                  } else if (length > 0) {
                                  // call through to writeImpl()
                                  PipeInputStream.this.writeImpl (data, offset, length);
                                  }
                                  }
                                  public void close () throws IOException {
                                  // close the write end of this pipe
                                  PipeInputStream.this.close (WRITER);
                                  }
                                  public void setException (IOException ex) throws IOException {
                                  // set a pending exception
                                  PipeInputStream.this.setException (ex);
                                  }
                                  }
                                  // static OutputStream extension with setException() method
                                  public static abstract class OutputStreamEx extends OutputStream {
                                  public abstract void setException (IOException ex) throws IOException;
                                  }
                                  }
                                  





          回頁(yè)首


          使用新的管道流

          清單 18 演示了怎樣使用新的管道流來(lái)解決上一篇文章中的問(wèn)題。請(qǐng)注意,寫(xiě)程序線(xiàn)程中出現(xiàn)的任何異常均可在流中被傳遞。


          清單 18. 使用新的管道流
          public static InputStream newPipedCompress (final InputStream in)
                                  throws IOException {
                                  PipeInputStream source = new PipeInputStream ();
                                  final PipeInputStream.OutputStreamEx sink = source.getOutputStream ();
                                  new Thread () {
                                  public void run () {
                                  try {
                                  GZIPOutputStream gzip = new GZIPOutputStream (sink);
                                  Streams.io (in, gzip);
                                  gzip.close ();
                                  } catch (IOException ex) {
                                  try {
                                  sink.setException (ex);
                                  } catch (IOException ignored) {
                                  }
                                  }
                                  }
                                  }.start ();
                                  return source;
                                  }
                                  





          回頁(yè)首


          性能結(jié)果

          在下面的表中顯示的是這些新的流和標(biāo)準(zhǔn)流的性能,測(cè)試環(huán)境是運(yùn)行 Java 2 SDK,v1.4.0 的 800MHz Linux 機(jī)器。性能測(cè)試程序與我在上一篇文章中用的相同:

          管道流
          15KB:21ms;15MB:20675ms
          新的管道流
          15KB:0.68ms;15MB:158ms
          字節(jié)數(shù)組流
          15KB:0.31ms;15MB:745ms
          新的字節(jié)數(shù)組流
          15KB:0.26ms;15MB:438ms

          與上一篇文章中的性能差異只反映了我的機(jī)器中不斷變化的環(huán)境負(fù)載。您可以從這些結(jié)果中看到,在大容量數(shù)據(jù)方面,新的管道流的性能遠(yuǎn)好于蠻力解決方案;但是,新的管道流的速度仍然只有我們分析的工程解決方案的速度的一半左右。顯然,在現(xiàn)代的 Java 虛擬機(jī)中使用多個(gè)線(xiàn)程的開(kāi)銷(xiāo)遠(yuǎn)比以前小得多。





          回頁(yè)首


          結(jié)束語(yǔ)

          我們分析了兩組可替代標(biāo)準(zhǔn) Java API 的流的流: BytesOutputStreamBytesInputStream 是字節(jié)數(shù)組流的非同步替代者。因?yàn)檫@些類(lèi)的預(yù)期的用例涉及單個(gè)線(xiàn)程的訪(fǎng)問(wèn),所以不采用同步是合理的選擇。實(shí)際上,執(zhí)行時(shí)間的縮短(最多可縮短 40%)很可能與同步的消滅沒(méi)有多大關(guān)系;性能得到提高的主要原因是在提供只讀訪(fǎng)問(wèn)時(shí)避免了不必要的復(fù)制。第二個(gè)示例 PipeInputStream 可替代管道流;為了減少超過(guò) 99% 的執(zhí)行時(shí)間,這個(gè)流使用寬松的約定、改進(jìn)的緩沖區(qū)大小和基于數(shù)組的操作。在這種情況下無(wú)法使用不同步的代碼;Java 語(yǔ)言規(guī)范排除了可靠地執(zhí)行這種代碼的可能性,否則,在理論上是可以實(shí)現(xiàn)最少鎖定的管道。

          字節(jié)數(shù)組流和管道流是基于流的應(yīng)用程序內(nèi)部通信的主要選擇。雖然新的 I/O API 提供了一些其它選擇,但是許多應(yīng)用程序和 API 仍然依賴(lài)標(biāo)準(zhǔn)流,而且對(duì)于這些特殊用途來(lái)說(shuō),新的 I/O API 并不一定有更高的效率。通過(guò)適當(dāng)?shù)販p少同步的使用、有效地采用基于數(shù)組的操作以及最大程度地減少不必要的復(fù)制,性能結(jié)果得到了很大的提高,從而提供了完全適應(yīng)標(biāo)準(zhǔn)流框架的更高效的操作。在應(yīng)用程序開(kāi)發(fā)的其它領(lǐng)域中采用相同的步驟往往能取得類(lèi)似地性能提升。

          posted on 2008-05-23 12:32 Daniel 閱讀(233) 評(píng)論(0)  編輯  收藏 所屬分類(lèi): CoreJava
          <2025年8月>
          272829303112
          3456789
          10111213141516
          17181920212223
          24252627282930
          31123456

          常用鏈接

          留言簿(3)

          隨筆檔案

          文章分類(lèi)

          文章檔案

          相冊(cè)

          搜索

          •  

          最新評(píng)論

          主站蜘蛛池模板: 阿拉善盟| 华蓥市| 晋江市| 习水县| 三江| 泗洪县| 蓬溪县| 米易县| 汉沽区| 铁力市| 古蔺县| 张家口市| 东宁县| 六安市| 包头市| 阿拉善左旗| 道孚县| 泸定县| 兖州市| 休宁县| 屏边| 绿春县| 莒南县| 阳原县| 宜良县| 峡江县| 玛曲县| 南昌市| 吉隆县| 锡林郭勒盟| 西乌| 长岛县| 璧山县| 鹰潭市| 奉节县| 贵定县| 日喀则市| 哈巴河县| 益阳市| 荥阳市| 桓台县|