http://www-128.ibm.com/developerworks/cn/java/j-csp2/
用 JCSP 進(jìn)行并發(fā)編程

級(jí)別: 中級(jí)

Abhijit Belapurkar
高級(jí)技術(shù)架構(gòu)師, Infosys Technologies Limited
2005 年 7 月 07 日

在這篇由三部分構(gòu)成的面向 Java 程序員的通信順序進(jìn)程 (CSP)介紹的第二期中,Abhijit Belapurkar 將介紹如何使用基于 Java 的 JCSP 庫(kù)來編寫能夠確保沒有并發(fā)問題(例如爭(zhēng)用風(fēng)險(xiǎn)、 死鎖、活動(dòng)鎖、資源耗盡)的 Java 應(yīng)用程序。

CSP 是對(duì)并發(fā)對(duì)象之間的復(fù)雜交互進(jìn)行建模的范式。使用 CSP 的主要優(yōu)勢(shì)之一是:對(duì)程序每一階段所包含對(duì)象的行為進(jìn)行精確地指定和驗(yàn)證。CSP 的理論和實(shí)踐對(duì)于并發(fā)設(shè)計(jì)和編程領(lǐng)域有深遠(yuǎn)的影響。它是 occam 這樣的編程語(yǔ)言的基礎(chǔ),對(duì)其他語(yǔ)言(例如 Ada)的設(shè)計(jì)也有影響。就像在本文 第 1 部分 簡(jiǎn)要討論過的,由于適合在 Java 平臺(tái)上進(jìn)行安全、優(yōu)雅的多線程編程,所以 CSP 對(duì) Java 開發(fā)人員也是無(wú)價(jià)的。

在我的這篇由三部分組成的 Java 平臺(tái) CSP 編程介紹的第 2 部分中,我把重點(diǎn)放在 CSP 理論和實(shí)踐上,特別是它在 Java 語(yǔ)言中多線程程序設(shè)計(jì)的應(yīng)用。我將從 CSP 理論的概述開始介紹,然后介紹基于 Java 的 JCSP 庫(kù)實(shí)現(xiàn),JCSP 的核心是 CSP 。

CSP 基礎(chǔ)
CSP 的基本構(gòu)造是進(jìn)程和進(jìn)程之間各種形式的通信。CSP 中的每件事都是進(jìn)程,甚至(子)進(jìn)程網(wǎng)絡(luò)也是進(jìn)程。但是,在進(jìn)程之間沒有直接交互 —— 所有交互都通過 CSP 的同步對(duì)象(例如各級(jí)進(jìn)程訂閱的通信通道和事件邊界)實(shí)現(xiàn)的。

CSP 進(jìn)程 與典型的 Java 對(duì)象不同:封裝在進(jìn)程組件中的數(shù)據(jù) 操縱數(shù)據(jù)的算法都是私有的。也就是說,進(jìn)程沒有對(duì)外可以調(diào)用的方法(除了啟動(dòng)進(jìn)程必須調(diào)用的方法之外),算法只能在進(jìn)程自己的控制線程內(nèi)執(zhí)行。如果把這種方法與 Java 語(yǔ)言中的方法調(diào)用進(jìn)行對(duì)比,就可以立即看出 CSP 是如何消除顯式鎖定的需求的:

不要錯(cuò)過本系列的其余部分!
“適用于 Java 程序員的 CSP”是對(duì)通信順序進(jìn)程(Communicating Sequential Processes —— CSP)進(jìn)行介紹的由三部分組成的一個(gè)系列。CSP 是并發(fā)編程的一個(gè)范式,它承認(rèn)了并發(fā)編程的復(fù)雜性,卻并沒有把復(fù)雜性留給開發(fā)人員。請(qǐng)參閱本系列的其他部分:

第 2 部分:用 JCSP 進(jìn)行并發(fā)編程

第 3 部分: JCSP 的高級(jí)主題

在 Java 語(yǔ)言中,在對(duì)象上調(diào)用的方法總是在調(diào)用者的線程中運(yùn)行。但也有一個(gè)特殊的控制線程是通過系統(tǒng)中的多個(gè)對(duì)象進(jìn)行工作的。對(duì)于大部分情況來說,對(duì)象沒有自己的生命 —— 它們只是在運(yùn)行線程調(diào)用它們的方法時(shí)才存在。因此,不同的執(zhí)行線程可以在同一時(shí)間試圖調(diào)用同一對(duì)象的同一方法,就像 第 1 部分所討論的那樣。顯然,這種情況在 CSP 中永遠(yuǎn)不會(huì)發(fā)生。

通信通道和進(jìn)程網(wǎng)絡(luò)
進(jìn)程間通信最簡(jiǎn)單的機(jī)制就是通過通道讀寫數(shù)據(jù)。CSP 中基本的通道構(gòu)造是同步的(synchronous)點(diǎn)對(duì)點(diǎn)的(point-to-point);也就是說,它不包含內(nèi)部緩沖,并且把一個(gè)進(jìn)程連接到另外一個(gè)進(jìn)程。從這個(gè)基本通道開始,有可能構(gòu)建多個(gè)閱讀器/寫入器通道(即一對(duì)多、多對(duì)一和多對(duì)多)。

CSP 中的進(jìn)程構(gòu)成了復(fù)雜系統(tǒng)的基本構(gòu)造塊 —— 一個(gè)進(jìn)程可以同一個(gè)或多個(gè)其他進(jìn)程連接起來(全都設(shè)置成并行的),從而構(gòu)成一個(gè)進(jìn)程網(wǎng)絡(luò)。可以把這個(gè)網(wǎng)絡(luò)本身想像成一個(gè)進(jìn)程,這個(gè)進(jìn)程還可以遞歸地與其他進(jìn)程、它們自己的網(wǎng)絡(luò)或者其他類似東西組合在一起,形成一個(gè)為了最好地解決手上問題而設(shè)計(jì)的復(fù)雜排列的金字塔。

如果單獨(dú)考慮,那么進(jìn)程僅僅是一個(gè)獨(dú)立的串行程序,它只與外部 I/O 設(shè)備交互。這個(gè)程序本身并不需要考慮在 I/O 通道另一端的進(jìn)程是否存在或?qū)Ψ降男再|(zhì)。

CSP 理論已經(jīng)在許多基于 Java 的框架中實(shí)現(xiàn)了,包括面向 Java 的通信順序進(jìn)程(Communicating Sequential Processes for Java,JCSP) 庫(kù)。

關(guān)于 CSP 的更多內(nèi)容
本文提供了對(duì) CSP 復(fù)雜主題的一般性介紹。如果對(duì)于深入理論底層的數(shù)學(xué)機(jī)制有興趣,那么請(qǐng)參閱 C.A.R. Hoare 的原文章以及他針對(duì)這一主題撰寫的書。要想獲得 CSP 理論的最新發(fā)展(這些年已經(jīng)做了更新),請(qǐng)參閱 Bill Roscoe 撰寫的書。要想獲得廣泛的參考來源,請(qǐng)參考牛津大學(xué)計(jì)算機(jī)實(shí)驗(yàn)室和 WoTUG 主頁(yè)的 CSP 歸檔。還請(qǐng)參閱 參考資料,以獲取所有這些參考和更多內(nèi)容的鏈接。

JCSP 庫(kù)
JCSP 庫(kù)由英國(guó)坎特伯雷市肯特大學(xué)的 Peter Welch 教授和 Paul Austin 開發(fā)(請(qǐng)參閱 參考資料)。對(duì)于本文余下的大部分內(nèi)容來說,我會(huì)把重點(diǎn)放在 CSP 概念在 JCSP 中的實(shí)現(xiàn)方式上。因?yàn)?Java 語(yǔ)言沒有提供對(duì) CSP 構(gòu)造的自帶支持,所以 JCSP 庫(kù)內(nèi)部使用 Java 語(yǔ)言 實(shí)際 支持的、自帶的并發(fā)構(gòu)造,例如 synchronizedwaitnotify。為了幫助您正確地理解 JCSP 的工作方式,我將從這些 Java 構(gòu)造的角度對(duì) JCSP 庫(kù)中某些類的內(nèi)部實(shí)現(xiàn)進(jìn)行了解釋。

注意,后續(xù)章節(jié)中的示例基于或來自 JCSP 庫(kù)的 Javadoc 文檔,或者基于可以在 JCSP 主頁(yè)上得到的演示文稿。

JCSP 中的進(jìn)程
在 JCSP 中,進(jìn)程實(shí)際上就是實(shí)現(xiàn)了 CSProcess 接口的類。清單 1 顯示了這個(gè)接口:

清單 1. CSProcess 接口

package jcsp.lang;

public interface CSProcess
{
    public void run();
}

注意,CSProcess 接口看起來就像 Java 語(yǔ)言的 Runnable 接口,而且它也充當(dāng)著類似的角色。雖然 JCSP 目前是用標(biāo)準(zhǔn) Java API 實(shí)現(xiàn)的,但是并不需要這樣,而且在未來可能真的不需要這樣。出于這個(gè)原因,在 JCSP 中沒有直接使用 Runnable 接口。

驗(yàn)證 JCSP 程序
Peter Welch 教授和其他人構(gòu)建了一個(gè)正式的 CSP 模型,從而可以用 CSP 術(shù)語(yǔ)對(duì)多線程 Java 程序進(jìn)行分析,并驗(yàn)證程序是否會(huì)造成導(dǎo)致爭(zhēng)用風(fēng)險(xiǎn)、死鎖和資源耗盡的 bug。因?yàn)?JCSP 庫(kù)使用模型底部的監(jiān)視器機(jī)制 (即 synchronized()wait()notify()notifyAll()) ,所以基于 JCSP 的應(yīng)用程序可以用各種軟件工程工具進(jìn)行驗(yàn)證,其中包括一些商業(yè)化支持的工具。請(qǐng)參閱 參考資料,學(xué)習(xí)關(guān)于 FDR2 的內(nèi)容,這是一個(gè)針對(duì)基于 CSP 的程序的模型檢測(cè)工具。

JCSP 定義了兩個(gè)接口用于從通道讀取對(duì)象和向通道寫入對(duì)象。從通道讀取對(duì)象的接口叫作 ChannelInput ,它只有一個(gè)方法,叫作 read()。如果進(jìn)程調(diào)用一個(gè)實(shí)現(xiàn) ChannelInput 接口的對(duì)象的這個(gè)方法,那么進(jìn)程會(huì)阻塞,直到在通道另一端的進(jìn)程實(shí)際向通道寫入了一個(gè)對(duì)象。 一旦在通道上有對(duì)象可用,對(duì)象就被返回給調(diào)用進(jìn)程。類似地,ChannelOutput 接口也只有一個(gè)方法,叫作 write(Object o)。如果進(jìn)程調(diào)用 一個(gè)實(shí)現(xiàn) ChannelOutput 接口的對(duì)象的這個(gè)方法,進(jìn)程也會(huì)阻塞,直到通道接受對(duì)象。正如前面提到過的,最簡(jiǎn)單的通道類型沒有緩沖,所以它在另一端(讀取)的進(jìn)程調(diào)用 read() 之前不會(huì)接受對(duì)象。

從現(xiàn)在開始,我將使用代碼示例來演示這些和其他 JCSP 構(gòu)造如何工作。在清單 2 中,可以看到一個(gè)非常簡(jiǎn)單的進(jìn)程,它輸出 1 到 100 之間的所有偶數(shù):

清單 2. 生成 1 到 100 之間偶數(shù)的進(jìn)程

import jcsp.lang.*;

public class SendEvenIntsProcess implements CSProcess 
{
    private ChannelOutput out;

    public SendEvenIntsProcess(ChannelOutput out)
    {
      this.out = out;
    }

    public void run()
    {
      for (int i = 2; i <= 100; i = i + 2)
      {
        out.write (new Integer (i));
      }
    }
}

與每一個(gè)寫進(jìn)程對(duì)應(yīng),必須有一個(gè)讀進(jìn)程。如果不存在這樣的進(jìn)程,則會(huì)造成 SendEvenIntsProcessChannelOutput 對(duì)象的 out 進(jìn)行第一次寫操作之后立即無(wú)限期阻塞。清單 3 演示了一個(gè)簡(jiǎn)單的讀進(jìn)程,該進(jìn)程與清單 2 介紹的寫進(jìn)程對(duì)應(yīng):

清單 3. 對(duì)應(yīng)的消費(fèi)者進(jìn)程

import jcsp.lang.*;

public class ReadEvenIntsProcess implements CSProcess
{
    private ChannelInput in;
    public ReadEvenIntsProcess(ChannelInput in)
    {
      this.in = in;
    }

    public void run()
    {
      while (true)
      {
        Integer d = (Integer)in.read();
        System.out.println("Read: " + d.intValue());
      }
    }
}

JCSP 中的通道
到目前為止,我只有兩個(gè)獨(dú)立的進(jìn)程。下一步就是使用一個(gè)用作共享同步機(jī)制的公共通道把它們聯(lián)系在一起,然后從中剔除一個(gè)進(jìn)程。channel 接口是 JCSP 的 ChannelInputChannelOutput 接口的子接口,是讀取和寫入對(duì)象的公共接口。這個(gè)接口有許多可能的實(shí)現(xiàn),就像下面描述的一樣:

  • One2OneChannel,顧名思義,實(shí)現(xiàn)了“單一寫入器/單一閱讀器”類型的通道。

  • One2AnyChannel 實(shí)現(xiàn)了“單一寫入器/多閱讀器”對(duì)象通道。(注意,這不是廣播機(jī)制,實(shí)際上,為了從通道讀取對(duì)象,多個(gè)閱讀器要進(jìn)行相互競(jìng)爭(zhēng);在指定時(shí)間只有一個(gè)閱讀器能使用通道和寫入器進(jìn)行溝通。)

  • Any2OneChannel 實(shí)現(xiàn)了 “多寫入器/單一閱讀器”對(duì)象通道。同上面的情況一樣,寫入進(jìn)程彼此競(jìng)爭(zhēng)使用通道。在指定時(shí)間,只有閱讀器和眾多寫入器中的一個(gè)在實(shí)際使用通道。

  • Any2AnyChannel 實(shí)現(xiàn)了“多寫入器/多閱讀器”對(duì)象通道。讀取進(jìn)程彼此競(jìng)爭(zhēng)使用的通道,寫入進(jìn)程也一樣。在指定時(shí)間只有一個(gè)閱讀器和一個(gè)寫入器在實(shí)際使用通道。

在清單 3 的示例中,我只有一個(gè)寫入器進(jìn)程和一個(gè)閱讀器進(jìn)程,所以 One2OneChannel 類就足夠了。驅(qū)動(dòng)器程序的示例代碼如清單 4 所示:

清單 4. 驅(qū)動(dòng)器程序

import jcsp.lang.*;

public class DriverProgram
{
    public static void main(String[] args)
    {
      One2OneChannel chan = new One2OneChannel();
      new Parallel
      (
        new CSProcess[]
	    {
	      new SendEvenIntsProcess (chan),
	      new ReadEvenIntsProcess (chan)
	    }
      ).run ();
    }
}

正如代碼表示的,我首先實(shí)例化一個(gè)新的 One2OneChannel 對(duì)象,然后把它傳遞給 SendEvenIntsProcessReadEventIntsProcess 進(jìn)程的構(gòu)造函數(shù)。這樣做是因?yàn)?One2OneChannel 同時(shí)實(shí)現(xiàn)了兩個(gè)接口 —— ChannelInputChannelOutput

通道內(nèi)部
因?yàn)橥ǖ涝?JCSP 中是重要的概念,所以在進(jìn)行下一步之前,要確定您確實(shí)理解了它們的工作方式。正如我在前面提到的,通道在默認(rèn)情況下是非緩沖的,但是也可以把它們變成緩沖的。實(shí)現(xiàn)方式是:通道本身并不處理緩沖特性,而是把這個(gè)責(zé)任委托給其他類,其他類必須實(shí)現(xiàn)叫作 ChannelDataStore 的接口。JCSP 為這個(gè)接口提供了多個(gè)內(nèi)置實(shí)現(xiàn),其中包括以下幾個(gè)實(shí)現(xiàn):

  • ZeroBuffer,對(duì)應(yīng)默認(rèn)的非緩沖特性。

  • Buffer,為與之相關(guān)聯(lián)的通道提供了一個(gè)阻塞的先進(jìn)先出的緩沖語(yǔ)義。

  • InfiniteBuffer,也提供先進(jìn)先出語(yǔ)義,但是如果緩沖為空,那么可以將閱讀器阻塞。寫入器永遠(yuǎn)不會(huì)阻塞,因?yàn)榫彌_可以無(wú)限擴(kuò)展,或者至少到了底層內(nèi)存系統(tǒng)設(shè)置的限制為止。

通道實(shí)戰(zhàn)
考慮一個(gè)實(shí)際使用的通道示例。當(dāng)我創(chuàng)建了如清單 4 所示的 One2OneChannel 實(shí)例時(shí),我把它內(nèi)部的 ChannelDatasource 設(shè)置成 ZeroBuffer 的一個(gè)新實(shí)例。ZeroBuffer 只能保存一個(gè)對(duì)象(或整數(shù))。它有一個(gè)內(nèi)部狀態(tài)變量,該變量的起始值為 EMPTY,只要放進(jìn)一個(gè)對(duì)象,該變量的值就變成 FULL 了。

當(dāng) SendEvenIntsProcess 進(jìn)程在它的 out 通道上進(jìn)行 write 操作時(shí),會(huì)發(fā)生什么呢?One2OneChannel 類的 write() 方法是一個(gè) synchronized() 方法。因此,發(fā)送方進(jìn)程運(yùn)行所在的線程(很快就會(huì)看到發(fā)送方進(jìn)程和閱讀器進(jìn)程運(yùn)行在獨(dú)立的線程中)就會(huì)得到與這個(gè)通道實(shí)例相關(guān)聯(lián)的監(jiān)視器鎖,并繼續(xù)處理方法。在該方法中,業(yè)務(wù)的第一個(gè)順序就是調(diào)用內(nèi)部持有的 ZeroBuffer 實(shí)例的 put 方法,把對(duì)象(或者在這個(gè)示例中是整數(shù))寫到 ZeroBuffer 實(shí)例。這樣就把緩沖的狀態(tài)變成 FULL。這時(shí),調(diào)用線程調(diào)用 wait,造成線程進(jìn)入監(jiān)視器的 等候集,后面進(jìn)行的操作是釋放監(jiān)視器鎖和阻塞線程。

稍后,閱讀器線程調(diào)用通道上的 read 操作(這也是一個(gè)同步的方法,所以閱讀器線程在繼續(xù)處理之前必須得到監(jiān)視器鎖)。因?yàn)閮?nèi)部緩沖的狀態(tài)是 FULL,所以可用數(shù)據(jù)將被返回,并發(fā)出一個(gè) notify()notify() 喚醒發(fā)送方線程,然后發(fā)送方線程退出監(jiān)視器等候集,并重新申請(qǐng)監(jiān)視器鎖。

在反過來的場(chǎng)景中,如果閱讀器線程調(diào)用通道上的 read 方法時(shí),通道的內(nèi)部緩沖狀態(tài)是 EMPTY,那么閱讀器線程就不得不 wait,在這種情況下,發(fā)送方線程要在把數(shù)據(jù)對(duì)象寫入內(nèi)部緩沖之后通知閱讀器線程。

Parallel 構(gòu)造
清單 4 中,您可能已經(jīng)注意到驅(qū)動(dòng)器程序引入了一個(gè)新類,叫作 ParallelParallel 類是由 JCSP 以預(yù)定義 CSProcess 的形式提供的,它接受一組獨(dú)立的 CSProcess 實(shí)例,并“平行地”運(yùn)行它們 (除了最后一個(gè)之外,所有進(jìn)程都在獨(dú)立的線程中運(yùn)行;最后一個(gè)進(jìn)程由 Parallel 對(duì)象在自己的控制線程中運(yùn)行)。 Parallel 進(jìn)程的 run 方法只有在所有的部件進(jìn)程終止的時(shí)候才終止。所以 Parallel 進(jìn)程是一種把多個(gè)獨(dú)立進(jìn)程組織起來的機(jī)制,它用通道(在驅(qū)動(dòng)器程序中示例中)作為“線”把進(jìn)程連在一起。

了解 Parallel 構(gòu)造的另一個(gè)途徑是說:它可以把小的、簡(jiǎn)單的組件組合成更高層次的進(jìn)程。實(shí)際上,Parallel 允許通過迭代把前面迭代中創(chuàng)建的組件與新的組件連接起來,創(chuàng)建出任意復(fù)雜程度的完全連接的進(jìn)程網(wǎng)絡(luò)。生成的進(jìn)程網(wǎng)絡(luò)可以像一個(gè) CSProcess 對(duì)象一樣公開和使用。

Parallel 示例
JCSP 庫(kù)提供了一組即插即用的組件,不過僅僅是出于教育的目的,正好適合我的目的:進(jìn)入其中幾個(gè)的內(nèi)部實(shí)現(xiàn),可以很好的表現(xiàn)如何在 JCSP 中組合成網(wǎng)絡(luò)化的并發(fā)進(jìn)程。我用下面的示例進(jìn)程來表現(xiàn) JCSP 中 Parallel 構(gòu)造的內(nèi)部工作方式:

  • PlusInt 在兩個(gè)輸入流中都接受整數(shù),把整數(shù)加在一起,然后把結(jié)果輸出到輸出流。

  • Delta2Int 平行地把到達(dá)它的輸入流的每個(gè)整數(shù)廣播到它的兩個(gè)輸出通道。

  • PrefixInt 在它的整數(shù)輸入流之前加上一個(gè)(用戶配置的)整數(shù)。(也就是說,在這個(gè)進(jìn)程的輸出通道上有整數(shù)可用之前,第一個(gè)輸出是預(yù)先配置的整數(shù)。后面的輸出才是從輸入流得到的整數(shù)。)

  • IntegrateInt 是一個(gè)用 Parallel 構(gòu)造組合了前三個(gè)進(jìn)程的進(jìn)程。它的功能是輸出來自它的輸入通道的整數(shù)的中間匯總值。

IntegrateInt 類的 run 方法如清單 5 所示:

清單 5. IntegrateInt 進(jìn)程

import jcsp.lang.*;

public class IntegrateInt implements CSProcess 
{
  private final ChannelInputInt in;
  private final ChannelOutputInt out;

  public IntegrateInt (ChannelInputInt in, ChannelOutputInt out)
  {
    this.in = in;
    this.out = out;
  }

  public void run()
  {
      One2OneChannelInt a = new One2OneChannelInt ();
      One2OneChannelInt b = new One2OneChannelInt ();
      One2OneChannelInt c = new One2OneChannelInt ();

      new Parallel 
      (
        new CSProcess[]
        {
          new PlusInt (in, c, a),
          new Delta2Int (a, out, b),
          new PrefixInt (0, b, c)
        }
      ).run ();
  }
}

注意,與 請(qǐng)單 4 中使用的通道相比,這個(gè)示例中使用了不同種類的通道。 IntegrateInt 類使用 ChannelInputIntChannelOutputInt 通道,顧名思義,可以用它們傳遞 int 類型的整數(shù)。相比之下,清單 4 中的驅(qū)動(dòng)器程序使用了 ChannelInputChannelOutput,它們是 對(duì)象 通道,可以用來在通道中從發(fā)送方給接收方發(fā)送任意對(duì)象。出于這個(gè)原因,在清單 4 中傳遞 int 值之前,我不得不把 int 值包裝成 Integer 對(duì)象。

在清單 5 中,還需要注意觀察什么呢?實(shí)際上,PrefixInt 進(jìn)程的第一個(gè)輸出是 0,它是通過 PlusInt 進(jìn)程添加到輸入通道到達(dá)的第一個(gè)整數(shù)上的。這個(gè)結(jié)果被寫入通道 a,它構(gòu)成了 Delta2Int 進(jìn)程的輸入通道。Delta2Int 進(jìn)程把整數(shù)結(jié)果寫到 out (進(jìn)程的整體輸出通道)并把它發(fā)送到 PrefixInt 進(jìn)程。然后 PrefixInt 進(jìn)程把整數(shù)作為輸入發(fā)送給 PlusInt 進(jìn)程,并添加到流中的第二個(gè)整數(shù),如此類推。

IntegrateInt 進(jìn)程組成的圖示如圖 1 所示:

圖 1. IntegrateInt 進(jìn)程
IntegrateInt 進(jìn)程

網(wǎng)絡(luò)中的網(wǎng)絡(luò)
IntegrateInt 進(jìn)程就是這樣由三個(gè)小進(jìn)程組成,它本身可以當(dāng)作一個(gè)復(fù)合進(jìn)程來用。JCSP 庫(kù)提供了一個(gè)叫作 SquaresInt 的進(jìn)程,顧名思義,它生成一個(gè)整數(shù)流,整數(shù)流是自然數(shù) (1、2、3、4,等等)的平方。這個(gè)進(jìn)程的代碼如清單 6 所示:

清單 6. SquaresInt 進(jìn)程

public class SquaresInt implements CSProcess 
{
  private final ChannelOutputInt out;

  public SquaresInt (ChannelOutputInt out)
  {
    this.out = out;
  }

  public void run()
  {
      One2OneChannelInt a = new One2OneChannelInt ();
      One2OneChannelInt b = new One2OneChannelInt ();

      new Parallel 
      (
        new CSProcess[]
        {
          new NumbersInt (a),
          new IntegrateInt (a, b),
          new PairsInt (b, out)
        }
      ).run ();
  }
}

我可以肯定您已經(jīng)注意到清單 6 顯示的兩個(gè)新進(jìn)程。NumbersInt 是一個(gè)內(nèi)置進(jìn)程,它只是在其輸出通道中輸出從 0 開始的自然數(shù)。PairsInt 進(jìn)程則把連續(xù)的一對(duì)輸入值相加并輸出結(jié)果。這兩個(gè)新進(jìn)程和 IntegrateInt 一起構(gòu)成了 SquaresInt 進(jìn)程,如圖 2 中的圖表所示:

圖 2. SquaresInt 進(jìn)程
SquaresInt 進(jìn)程

SquaresInt 的工作方式
在進(jìn)入下一部分之前,先來考慮 SquaresInt 進(jìn)程的內(nèi)部工作方式。在下面可以看到 SquaresInt 內(nèi)部每個(gè)通道上的交通流向:


Channel "a":	[0, 1, 2, 3, 4, 5, 6, 7, 8, ...ad infinitum]
Channel "b":	[0, 1, 3, 6, 10, 15, 21, 28, 36, ...ad infinitum]
Channel "out":	[1, 4, 9, 16, 25, 36, 49, 64, 81 ...ad infinitum]

您有沒有看這樣的模式:寫入通道 a 的整數(shù)造成它們也被寫入通道 b,因此也寫到通道 out?在第一次“滴答”當(dāng)中,NumbersInt 進(jìn)程把整數(shù) 0 寫入通道 aIntegrateInt 進(jìn)程也把整數(shù) 0 (是當(dāng)前匯總的值)寫入通道 bPairsInt 進(jìn)程在這次滴答中什么都不產(chǎn)生,因?yàn)樗枰幚韮蓚€(gè)輸入。在第二次滴答中,NumbersInt 進(jìn)程在它的輸出通道上寫入整數(shù) 1。這造成 IntegrateInt 進(jìn)程把匯總值修改成 0+1=1,所以把整數(shù) 1 寫入通道 b

這時(shí), PairsInt 有了兩個(gè)整數(shù)輸入可以處理 —— 整數(shù) 0 來自前一次滴答,整數(shù) 1 來自當(dāng)前滴答。它把它們加在一起,并把輸出 0+1=1 寫到通道 out。請(qǐng)注意 1 是 1 的平方,所以我們現(xiàn)在可能是在正確的軌道上。繼續(xù)把示例前進(jìn)到下一個(gè)(第三個(gè))滴答,NumbersInt 進(jìn)程把把整數(shù) 2 寫入通道 a。這使 IntegrateInt 進(jìn)程把匯總值更新為 1 (前一個(gè)匯總值) + 2 (新值) = 3 并把這個(gè)整數(shù)寫入通道 b

PairsInt 進(jìn)程看到最后兩個(gè)整數(shù)是什么?它們是 1 (在前一次滴答期間) 和 3 (在當(dāng)前滴答期間)。所以,進(jìn)程把這兩個(gè)整數(shù)加在一起,并把 1+3=4 寫入通道 out。您會(huì)注意到 4 是 2 的平方,這意味著 SquaresInt 工作起來就像它應(yīng)當(dāng)工作的那樣。實(shí)際上,應(yīng)當(dāng)繼續(xù)運(yùn)行這個(gè)程序到任意數(shù)量的滴答,這樣就可以驗(yàn)證寫入通道 out 的整數(shù)總是在序列中的下一個(gè)整數(shù)的平方。我在下一節(jié)精確地這一操作。

數(shù)學(xué)問題
就在您納悶的時(shí)候,我想解釋一下生成平方值的數(shù)學(xué)基礎(chǔ)。假設(shè)在 NumbersInt 進(jìn)程已經(jīng)把整數(shù)輸出到某個(gè) n-1 的時(shí)候,您偷看到了箱子內(nèi)部。IntegrateInt 進(jìn)程最后生成(而且通過共享通道 b 放到 PairsInt 進(jìn)程)的中間匯總會(huì)是 [1+2+3+...+(n-1)] = (n-1)(n-2)/2

在下一次滴答期間,NumbersInt 會(huì)輸出 n,這造成 IntegrateInt 進(jìn)程的中間匯總增長(zhǎng)為 (1+2+3+...+n) = n(n-1)/2。然后這個(gè)匯總會(huì)通過共享通道 b 傳給 PairsInt 進(jìn)程。 PairsInt 會(huì)把這兩個(gè)數(shù)加在一起,生成 [(n-1)(n-2)/2 + n(n-1)/2] = [(n-2) + n](n-1)/2 = (2n-2)(n-1)/2 = (n-1)exp2

接下來,NumbersInt 進(jìn)程會(huì)產(chǎn)生(n+1)。與之對(duì)應(yīng),IntegrateInt 進(jìn)程會(huì)把 n(n+1)/2 送到 PairsInt 進(jìn)程。然后 PairsInt 會(huì)生成 [n(n-1)/2 + n(n+1)/2] = nexp2。針對(duì)所有的 n 對(duì)這進(jìn)行通用化,就會(huì)按照期望的那樣產(chǎn)生全部平方。

JCSP 中的確定性
以上示例演示了 CSP 的復(fù)合語(yǔ)言 —— 即如何用 Parallel 構(gòu)造把細(xì)致的無(wú)狀態(tài)的組件組成分層的網(wǎng)絡(luò)。所有這類相互通信的平行進(jìn)程的分層網(wǎng)絡(luò)的賣點(diǎn)就是:它們是完全確定的。在這個(gè)上下文環(huán)境中 確定 意味著什么呢?它意味著這類分層網(wǎng)絡(luò)的輸出只取決于提供給它的輸入,而不用考慮網(wǎng)絡(luò)運(yùn)行的運(yùn)行時(shí)環(huán)境(JVM)的特性。也就是說,進(jìn)程網(wǎng)絡(luò)獨(dú)立于 JVM 的調(diào)度策略,也獨(dú)立于它所分布的多處理器。(我在這里假設(shè)的是個(gè)單一節(jié)點(diǎn),但是,沒有什么固有的東西會(huì)防礙把這個(gè)討論引入物理上分布在多個(gè)節(jié)點(diǎn)上而在進(jìn)程之間通過線路進(jìn)行通信的進(jìn)程網(wǎng)絡(luò)上。)

確定性會(huì)是工具包中的強(qiáng)大工具,因?yàn)樗梢宰屇逦赝茢喑龀绦虻男袨椋槐負(fù)?dān)心運(yùn)行時(shí)環(huán)境對(duì)它可能產(chǎn)生的影響。同時(shí),確定性不是并發(fā)性編程惟一可能的技術(shù)或必需的技術(shù)。因?yàn)橄乱粋€(gè)(也是最后一個(gè))實(shí)例將顯示,非確定性在 JSP 中也是同樣強(qiáng)大的實(shí)用概念。

JCSP 中的非確定性
非確定是許多真實(shí)的應(yīng)用程序的因子,在這些應(yīng)用程序,可見的輸出是某個(gè)功能或者事件發(fā)生的順序。換句話說,當(dāng)結(jié)果取決于設(shè)計(jì)的調(diào)度,而 不是 取決于事件時(shí),就是非確定性在并發(fā)應(yīng)用程序中發(fā)揮作用的地方了。您將會(huì)看到,JCSP 顯式地處理這類問題。

例如,假設(shè)一個(gè)進(jìn)程對(duì)于 下面要做什么 有許多備選項(xiàng),每個(gè)備選項(xiàng)都有一個(gè)與之關(guān)聯(lián)的 警衛(wèi)(guard),警衛(wèi)必須處于“就緒(ready)”狀態(tài),這樣才能讓備選項(xiàng)得以考慮。進(jìn)程可以從可用的備選項(xiàng)(也就是就緒的)中選擇一個(gè)選項(xiàng);選擇本身可能基于不同的策略,可能是任意選擇、最高優(yōu)先級(jí)選擇或者公平選擇。

事件選擇策略
在 JCSP 的特定上下文中,提供了一個(gè)叫作 Guard 的抽象類,競(jìng)爭(zhēng)進(jìn)程選擇的事件必須繼續(xù)它。進(jìn)程本身使用另一個(gè)預(yù)先提供的類,叫作 Alternative,這些警衛(wèi)對(duì)象必須以對(duì)象數(shù)組的形式傳遞給它的構(gòu)造函數(shù)。Alternative 類為三種事件選擇策略提供了方法。

Alternative 類的 select() 方法對(duì)應(yīng)著 任意選擇 策略。select() 方法調(diào)用一直受阻塞,直到一個(gè)或多個(gè)警衛(wèi)就緒為止(請(qǐng)記住,所有競(jìng)爭(zhēng)的警衛(wèi)對(duì)于 Alternative 類來說都是已知的)。其中一個(gè)就緒的警衛(wèi)被隨機(jī)選中,它的索引(在傳遞進(jìn)去的警衛(wèi)數(shù)組中)也被返回。

priSelect() 方法對(duì)應(yīng)著 最高優(yōu)先級(jí) 策略。也就是說,如果不止一個(gè)警衛(wèi)就緒,則返回索引值最低的那個(gè);這里面的假設(shè)是:在數(shù)組中傳遞給 Alternative 構(gòu)造函數(shù)的警衛(wèi)已經(jīng)按照優(yōu)先級(jí)順序進(jìn)行降序排序了。

最后,方法 fairSelect 是在多個(gè)就緒警衛(wèi)中進(jìn)行 公平 選擇:在這個(gè)方法的連續(xù)調(diào)用中,在其他就緒而且可用的警衛(wèi)沒被選中之前,不會(huì)有某個(gè)就緒的警衛(wèi)被選中兩次。所以,如果警衛(wèi)的總數(shù)是 n,那么在最壞的情況下,就緒的警衛(wèi)沒獲得選中的次數(shù)不會(huì)連續(xù)超過 n 次。

如果進(jìn)程不關(guān)心如何選擇多個(gè)就緒警衛(wèi),那么任意選擇策略最合適;如果進(jìn)程想保證沒有資源耗盡或者最差服務(wù)次數(shù),例如在實(shí)時(shí)系統(tǒng)中,那么任意選擇就不太適用了。在前面的情況下,推薦使用 fairSelect 方法,而在后面的情況下,用 priSelect() 方法最好。

警衛(wèi)類型
大體來說,JCSP 提供了三類警衛(wèi):

  • 通道警衛(wèi) 總是對(duì)應(yīng)著進(jìn)程等候從中讀取數(shù)據(jù)的通道。也就是說,只有在通道另一端的進(jìn)程已經(jīng)輸出數(shù)據(jù),而該數(shù)據(jù)還沒有被進(jìn)程輸入的時(shí)候,警衛(wèi)才就緒。

  • 計(jì)時(shí)器警衛(wèi) 總是和設(shè)置(絕對(duì))超時(shí)對(duì)應(yīng)。也就是說,如果超時(shí),則計(jì)時(shí)器警衛(wèi)就會(huì)就緒。

  • 跳過警衛(wèi) 總是就緒。

JCSP 中的通道警衛(wèi) 可以是以下類型:AltingChannelInput/AltingChannelInputInt,只要在對(duì)應(yīng)的通道中有了對(duì)象或整數(shù)數(shù)據(jù),則這兩個(gè)通道將就緒;或者 AltingChannelAccept,如果在通道中出現(xiàn)不可接受的“CALL”(這一點(diǎn)后面有更多介紹),則通道就會(huì)就緒。這些都是抽象類,它們擁有 One2OneAny2One 類型通道形式的具體實(shí)現(xiàn)。JCSP 中的計(jì)時(shí)器 警衛(wèi)屬于 CSTimer 類型,而 跳過警衛(wèi) 則是以 Skip 類的形式提供的。

運(yùn)作中的警衛(wèi)
我用一個(gè)簡(jiǎn)單的示例,演示如何用 JCSP 警衛(wèi)實(shí)現(xiàn)并發(fā)應(yīng)用程序中的非確定性,借此總結(jié)對(duì) JCSP 的介紹。假設(shè)您必須開發(fā)一個(gè)乘法(或者 倍增) 設(shè)計(jì),讀取的整數(shù)在輸出通道以固定速率到達(dá),可以用某個(gè)乘數(shù)乘以它們,然后把它們寫入其輸出通道。設(shè)備可以用一個(gè)初始乘數(shù)開始,但是這個(gè)乘數(shù)每 5 秒鐘自動(dòng)加倍。

這個(gè)故事中介紹的方法是這樣的:系統(tǒng)中存在著第二個(gè)控制器進(jìn)程,它能通過專用通道向設(shè)備發(fā)送 suspend operation 信號(hào)。這使設(shè)備中止自身,并把乘數(shù)的當(dāng)前值通過第二個(gè)通道發(fā)送給控制器。

在中止的時(shí)候,設(shè)備只應(yīng)當(dāng)允許全部進(jìn)入的整數(shù)不經(jīng)變化地通過它的輸出通道。控制器進(jìn)程 —— 可能在用設(shè)備發(fā)送給它的乘數(shù)做了某些計(jì)算中的一種 —— 通過專用通道把一個(gè)新乘數(shù)發(fā)送回設(shè)備。(請(qǐng)注意:只要設(shè)備處于 中止 狀態(tài),就會(huì)被迫接受這個(gè)乘數(shù)。)

更新過的乘數(shù)插入到設(shè)備,并充當(dāng)設(shè)備的喚醒信號(hào)。設(shè)備現(xiàn)在繼續(xù)執(zhí)行它的放大操作,用新更新的乘數(shù)乘上輸入的整數(shù)。計(jì)時(shí)器這時(shí)也重置,所以新的乘數(shù)也在 5 秒之后被設(shè)置成加倍數(shù)值,如此類推。

圖 3 中的圖表說明了這個(gè)放大設(shè)備:

圖 3. 放大設(shè)備
ScaleInt 進(jìn)程

ScaleInt 進(jìn)程
放大設(shè)備的源代碼在清單 7 中顯示。這個(gè)示例中的非確定性是因?yàn)椋?I xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">output 的值基于 ininject 流的值(同時(shí)還基于這些值到達(dá)的順序)。

清單 7. ScaleInt 進(jìn)程

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class ScaleInt implements CSProcess
{
  private int s;
  private final ChannelOutputInt out, factor;
  private final AltingChannelInputInt in, suspend, inject;

  public ScaleInt (int s, AltingChannelInputInt suspend, AltingChannelInputInt in, 
    ChannelOutputInt factor, AltingChannelInputInt inject, ChannelOutputInt out)
  {
    this.s = s;
	this.in = in;
	this.out = out;
	this.suspend = suspend;
	this.factor = factor;
	this.inject = inject;
  }

  public void run()
  {
	final long second = 1000;               // Java timings are in millisecs
	final long doubleInterval = 5*second;
	final CSTimer timer = new CSTimer ();

	final Alternative normalAlt = new Alternative (new Guard[] {suspend, timer, in});
	
	final int NORMAL_SUSPEND=0, NORMAL_TIMER=1, NORMAL_IN = 2;

	final Alternative suspendedAlt = new Alternative (new Guard[] {inject, in});
	
	final int SUSPENDED_INJECT=0, SUSPENDED_IN = 1;
	
	long timeout = timer.read () + doubleInterval;
	timer.setAlarm (timeout);

	while (true)
	{
	  switch (normalAlt.priSelect ())
	  {
		case NORMAL_SUSPEND:
		  suspend.read ();              // don't care what's sent
		  factor.write (s);             // reply with the crucial information
		  boolean suspended = true;
		  while (suspended)
		  {
		    switch (suspendedAlt.priSelect ())
			{
			  case SUSPENDED_INJECT:    // this is the resume signal as well
			    s = inject.read ();     // get the new scaling factor
				suspended = false;      // and resume normal operations
				timeout = timer.read () + doubleInterval;
				timer.setAlarm (timeout);
				break;
			  case SUSPENDED_IN:
			    out.write (in.read ());
				break;
			}
		  }
		  break;
		case NORMAL_TIMER:
		  timeout = timer.read () + doubleInterval;
		  timer.setAlarm (timeout);
		  s = s*2;
		  break;
		case NORMAL_IN:
		  out.write (s * in.read ());
		  break;
	  }
    }
  }
}

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class Controller implements CSProcess
{
  private long interval;
  private final ChannelOutputInt suspend, inject;
  private final ChannelInputInt factor;

  public Controller (long interval, ChannelOutputInt suspend, ChannelOutputInt inject, 
    ChannelInputInt factor)
  { 
    this.interval = interval;
    this.suspend = suspend;
    this.inject = inject;
    this.factor = factor;
  }

  public void run ()
  {
	int currFactor = 0;
	final CSTimer tim = new CSTimer ();
	long timeout = tim.read ();
	while (true)
	{
	  timeout += interval;
	  tim.after (timeout);        // blocks until timeout reached
	  suspend.write (0);          // suspend signal (value irrelevant)
	  currFactor = factor.read ();			
	  currFactor ++;              // compute new factor
	  inject.write (currFactor);  // inject new factor
	}
  }
}

import jcsp.lang.*;
import jcsp.plugNplay.ints.*;

public class DriverProgram
{
  public static void main(String args[])
  {
	try
	{
	  final One2OneChannelInt temp = new One2OneChannelInt ();
	  final One2OneChannelInt in = new One2OneChannelInt ();
	  final One2OneChannelInt suspend = new One2OneChannelInt ();
	  final One2OneChannelInt factor = new One2OneChannelInt ();
	  final One2OneChannelInt inject = new One2OneChannelInt ();
	  final One2OneChannelInt out = new One2OneChannelInt ();
		
	  new Parallel
	  (
		new CSProcess[]
		{
		  new NumbersInt (temp),
		  new FixedDelayInt (1000, temp, in),
		  new ScaleInt (2, suspend, in, factor, inject, out),
		  new Controller (6000, suspend, inject, factor),
		  new PrinterInt (out, "--> ", "\n")
		}
	  ).run ();
	}
	catch (Exception e)
	{
		e.printStackTrace();
	}
  }
}

上面的類 ScaleInt 對(duì)應(yīng)著放大設(shè)備。正如前面提到的,這個(gè)類必須實(shí)現(xiàn) CSProcess 接口。因?yàn)樯厦娴拇a演示了許多概念,所以我將逐個(gè)討論它的不同方面。

兩個(gè)備選項(xiàng)
ScaleInt 類中,我們感興趣的第一個(gè)方法是 run()。在 run() 方法中,要做的第一件事是創(chuàng)建 Alternative 類的兩個(gè)實(shí)例,每個(gè)都有一組不同的 Guard 對(duì)象。

第一個(gè) Alternative 實(shí)例由變量 normalAlt 表示,它是為設(shè)備正常操作的時(shí)候使用的。與之關(guān)聯(lián)的警衛(wèi)列表如下所示:

  • suspendOne2OneChannelInt 的實(shí)例。正如前面提到過的,One2OneChannelInt 實(shí)現(xiàn)了單一閱讀器/寫入器整數(shù)通道,通道是零緩沖、完全同步的。這是控制器進(jìn)程向設(shè)備發(fā)送中止信號(hào)的通道。

  • timerCSTimer 的實(shí)例,它被設(shè)置成每 5 秒觸發(fā)一次,每次觸發(fā)時(shí),設(shè)備會(huì)把乘數(shù)的當(dāng)前值加倍。

  • inOne2OneChannelInt 的實(shí)例,設(shè)備通過它接收輸入的整數(shù)。

第二個(gè) Alternative 實(shí)例由 suspendedAlt 表示,它是供設(shè)備在已經(jīng)被 Controller 中止的情況下使用的。與之關(guān)聯(lián)的警衛(wèi)如下如示:

  • injectOne2OneChannelInt 的實(shí)例,由控制器進(jìn)程使用,用來向設(shè)備發(fā)送新的乘數(shù)(也充當(dāng)喚醒信號(hào))。

  • in 是前面已經(jīng)看到的 One2OneChannelInt 相同的實(shí)例;設(shè)備通過這個(gè)通道接收輸入整數(shù)。

兩個(gè) Alternative 實(shí)例被用在不同的情況下等候警衛(wèi)就緒,列表的順序是隱式的優(yōu)先級(jí)順序。例如,如果 normalAltsuspendtimer 警衛(wèi)恰好同時(shí)就緒,那么和 suspend 警衛(wèi)對(duì)應(yīng)的事件首先被處理。

警衛(wèi)就緒
下一個(gè)我們感興趣的是在每個(gè)警衛(wèi)就緒的時(shí)候,發(fā)生了什么。我首先研究 normalSelect,假設(shè)設(shè)備操作正常(也就是說,還沒有被中止):

  • 如果控制器向設(shè)備發(fā)送了 suspend 信號(hào),那么這個(gè)事件以最高優(yōu)先級(jí)得到處理。作為響應(yīng),設(shè)備把乘數(shù)的當(dāng)前值通過叫作 factor 的通道發(fā)送給控制器。然后將叫作 suspended 的內(nèi)部標(biāo)志設(shè)置為 true,然后進(jìn)入循環(huán),等候別人發(fā)送信號(hào),以繼續(xù)其操作。在循環(huán)內(nèi)部,設(shè)備調(diào)用第二個(gè) Alternative 實(shí)例上的 priSelect() 方法 (suspendedAlt)。

    這個(gè) Alternative 實(shí)例包含兩個(gè)警衛(wèi):第一個(gè)表示控制器向設(shè)備發(fā)送乘數(shù)的事件,第二個(gè)表示整數(shù)到達(dá)設(shè)備的輸入通道。在前一種情況下,設(shè)備用從 inject 通道讀取的值來更新乘數(shù)(保存在變量 s 中),并將 suspended 標(biāo)志設(shè)置回 false (這樣就保證了在下一次迭代時(shí)可以退出內(nèi)部循環(huán)),用當(dāng)前計(jì)時(shí)器的值作為基值重新設(shè)置鬧鐘。在后一種情況下,設(shè)備只是從它的輸入通道讀取整數(shù),并把整數(shù)寫入輸出通道(也就是說,在設(shè)備中止時(shí),不許使用乘數(shù)的要求)。

  • 具有下一個(gè)優(yōu)先級(jí)得到處理的事件是鬧鐘到期事件。這造成設(shè)備把當(dāng)前乘數(shù)加倍,用當(dāng)前計(jì)時(shí)器的值作為基值重新設(shè)置鬧鐘,然后返回,繼續(xù)等候下一個(gè)事件。

  • 第三個(gè)可能是事件是從設(shè)備的輸入通道接收整數(shù)的事件。與之對(duì)應(yīng)的是,設(shè)備讀取整數(shù),用當(dāng)前乘數(shù) s 乘上它,并將結(jié)果寫入設(shè)備的輸出通道。

Controller 類
下一個(gè)要考慮的類是 Controller 類。請(qǐng)記住,控制器類的任務(wù)是周期性地(大概是基于復(fù)雜的計(jì)算)向設(shè)備進(jìn)程插入乘數(shù)值。在這個(gè)示例中,周期基礎(chǔ)只是一個(gè)計(jì)時(shí)器,該計(jì)時(shí)器按照規(guī)律的、配置好的間隔到期。每次到期時(shí),控制器就在 suspend 上寫一個(gè) 0(也就是說,它將中止設(shè)備),并在叫作 factor 的輸入通道上讀取當(dāng)前的乘數(shù)。

這時(shí),控制器只是把這個(gè)值加一,然后通過一對(duì)一通道 (叫作 inject,專門用于為這個(gè)目的) 將它插回設(shè)備。這就通知設(shè)備繼續(xù)工作的方式,這時(shí)計(jì)時(shí)器被重新設(shè)置成在適當(dāng)間隔后到期。

DriverProgram 類
最后剩下的類是驅(qū)動(dòng)器類 DriverProgram。這個(gè)類創(chuàng)建適當(dāng)?shù)耐ǖ篮?CSProcess 實(shí)例數(shù)組。它用 JCSP 提供的類 NumbersInt 生成一系列自然數(shù),通過temp 通道傳遞給另一個(gè)叫作 FixedDelayInt 的內(nèi)置類。顧名思義,FixedDelayInt 將來自其輸入通道的值在固定延遲(在示例代碼中,該延遲是 1 秒)之后發(fā)送到它的輸出通道。

這個(gè)自然數(shù)的流每隔一秒就被發(fā)送到 ScaleInt 進(jìn)程的 in 通道。ScaleInt 進(jìn)程的 out 通道的輸出傳遞給 JCSP 提供的 PrinterInt 進(jìn)程,然后該進(jìn)程再接著把整數(shù)值輸出到 System.out

第 2 部分的結(jié)束語(yǔ)
在這個(gè)由三部分組成的介紹適用于 Java 程序員的 CSP 的系列文章的第 2 部分中,我解釋并演示了并發(fā)編程中的 CSP 理論。然后是對(duì) CSP 構(gòu)造的概述,其中介紹了最流行的基于 Java 的 CSP 庫(kù) —— JCSP。由于 Java 語(yǔ)言沒有對(duì) CSP 構(gòu)造提供自帶的支持,所以 JCSP 庫(kù)內(nèi)部采 Java 支持 的自帶構(gòu)造,例如 synchronized()wait()notify()。為了幫助您正確地理解 JCSP 是如何工作的,我從 Java 構(gòu)造的角度解釋了一些 JCSP 類庫(kù)的內(nèi)部實(shí)現(xiàn),然后在幾個(gè)實(shí)際示例中演示了它們的用法。

這里所進(jìn)行的討論可以作為本系列最后一篇文章的絕好基礎(chǔ)。在最后一篇文章中,我將解釋 CSP 和 AOP 的相似性,并簡(jiǎn)要地對(duì) CSP 解決并發(fā)性的方法和新的 java.util.concurrent 包解決并發(fā)性的方法進(jìn)行比較,還將介紹許多用 JCSP 進(jìn)行高級(jí)同步 的技術(shù)。

致謝
非常感謝 Peter Welch 教授在我編寫這個(gè)文章系列期間給予的鼓勵(lì)。他在百忙之中抽出時(shí)間非常細(xì)致地審閱了草稿,并提供了許多寶貴的提高系列質(zhì)量和準(zhǔn)確性的建議。文章中如果還存在錯(cuò)誤的話,那都是由于我的原因!我在文章中使用的示例基于或來自 JCSP 庫(kù)的 javadoc 中提供的示例,以及 JCSP Web 站點(diǎn)上提供的 Powerpoint 演示文稿。這兩個(gè)來源提供了需要探索的大量信息。

參考資料

  • 您可以參閱本文在 developerWorks 全球站點(diǎn)上的 英文原文

  • Brian Goetz 編寫的由三部分組成的 “Threading lightly”是解決 Java 平臺(tái)上同步問題的巧妙而系統(tǒng)的方法。(developerWorks,2001 年 7 月)

  • Allen Holub 撰寫的“如果我是國(guó)王:關(guān)于解決 Java編程語(yǔ)言線程問題的建議” (developerWorks,2000 年 10 月)是一篇啟蒙性的、至今仍有意義的、關(guān)于 Java 平臺(tái)多線程編程錯(cuò)誤的概述。

  • C.A.R. Hoare 開創(chuàng)性的論文“Communicating Sequential Processes”把通信順序進(jìn)程的并行組成作為一種基本的編程結(jié)構(gòu)化方法提了出來(Communications of the ACM Archive,1978)。

  • 可以免費(fèi)獲得 PDF 格式的 C.A.R. Hoare 撰寫的 關(guān)于 CSP 的書籍

  • Bill Roscoe 撰寫的 Theory and Practice of Concurrency (Prentice Hall, 1997) 是最關(guān)于并發(fā)性和 CS 主題的最新書籍。

  • 牛津大學(xué)計(jì)算機(jī)實(shí)驗(yàn)室負(fù)責(zé)的 CSP Archive 是學(xué)習(xí)更多關(guān)于 CSP 內(nèi)容的好地方,除此之外,還有 WoTUG homepage

  • Peter Welch 教授和 Jeremy Martin 合著的“Formal Analysis of Concurrent Java Systems” (IOS Press, 2000) 是在 Java 語(yǔ)言中實(shí)踐 CSP 的良好起點(diǎn)。

  • JCSP homepage 由英國(guó)坎特伯雷市肯特大學(xué)負(fù)責(zé)。

  • FDR2 (故障偏差求精,F(xiàn)ailures-Divergence Refinement) 是面向基于 CSP 的程序的幾個(gè)商業(yè)化模型檢測(cè)工具之一。

  • CSP 的實(shí)現(xiàn)可用于 Java 語(yǔ)言之外的其他語(yǔ)言:C++CSP 是針對(duì) C++ 的實(shí)現(xiàn),而 J#.Net 是針對(duì) .Net 的實(shí)現(xiàn)。

  • Occam-pi 是一個(gè)語(yǔ)言平臺(tái),它期望用 pi-calculus 的移動(dòng)特性擴(kuò)展 occam 語(yǔ)言的 CSP 想法。請(qǐng)從 occam-pi homepage 學(xué)習(xí)這個(gè)尖端的研究。

  • 在學(xué)習(xí) occam 時(shí),您可能還想調(diào)查 occam 編程器的各種擴(kuò)展

  • 在 developerWorks Java 技術(shù)專區(qū) 可以找到 Java 編程各方面的文章。

  • 請(qǐng)參閱 Developer Bookstore,以獲得技術(shù)書籍的完整清單,其中包括數(shù)百本 Java 相關(guān)主題的書籍。

  • 還請(qǐng)參閱 Java 技術(shù)專區(qū)教程頁(yè),以獲得 developerWorks 上免費(fèi)的、以 Java 為重點(diǎn)的教程。

關(guān)于作者
Abhijit Belapurkar 擁有位于印度德里市的印度理工學(xué)院(IIT)計(jì)算機(jī)科學(xué)的理工學(xué)士學(xué)位。在過去的 11 年中,他一直工作在分布式應(yīng)用程序的架構(gòu)和信息安全領(lǐng)域,他在使用 Java 平臺(tái)構(gòu)建 n 層應(yīng)用程序方面也已經(jīng)有大約 6 年的工作經(jīng)驗(yàn)。他目前作為高級(jí)技術(shù)架構(gòu)師在 J2EE 領(lǐng)域工作,服務(wù)于印度班加羅爾的 Infosys 科技有限公司。