Vincent

          Vicent's blog
          隨筆 - 74, 文章 - 0, 評(píng)論 - 5, 引用 - 0
          數(shù)據(jù)加載中……

          Java 技術(shù): 使您輕松地進(jìn)行多線程應(yīng)用程序編程

          產(chǎn)者-消費(fèi)者方案是多線程應(yīng)用程序開(kāi)發(fā)中最常用的構(gòu)造之一 ― 因此困難也在于此。因?yàn)樵谝粋€(gè)應(yīng)用程序中可以多次重復(fù)生產(chǎn)者-消費(fèi)者行為,其代碼也可以如此。軟件開(kāi)發(fā)人員 Ze'ev Bubis 和 Saffi Hartal 創(chuàng)建了 Consumer 類,該類通過(guò)在一些多線程應(yīng)用程序中促進(jìn)代碼重用以及簡(jiǎn)化代碼調(diào)試和維護(hù)來(lái)解決這個(gè)問(wèn)題。請(qǐng)通過(guò)單擊本文頂部或底部的 討論來(lái)參與本文的 論壇,與作者和其他讀者分享您的想法。

          多線程應(yīng)用程序通常利用生產(chǎn)者-消費(fèi)者編程方案,其中由生產(chǎn)者線程創(chuàng)建重復(fù)性作業(yè),將其傳遞給作業(yè)隊(duì)列,然后由消費(fèi)者線程處理作業(yè)。雖然這種編程方法很有用,但是它通常導(dǎo)致重復(fù)的代碼,這對(duì)于調(diào)試和維護(hù)可能是真正的問(wèn)題。

          為了解決這個(gè)問(wèn)題并促進(jìn)代碼重用,我們創(chuàng)建了 Consumer 類。 Consumer 類包含所有用于作業(yè)隊(duì)列和消費(fèi)者線程的代碼,以及使這兩者能夠結(jié)合在一起的邏輯。這使我們可以專注于業(yè)務(wù)邏輯 ― 關(guān)于應(yīng)該如何處理作業(yè)的細(xì)節(jié) ― 而不是專注于編寫(xiě)大量冗余的代碼。同時(shí),它還使得調(diào)試多線程應(yīng)用程序的任務(wù)變得更為容易。

          在本文中,我們將簡(jiǎn)單觀察一下多線程應(yīng)用程序開(kāi)發(fā)中公共線程用法,同時(shí),解釋一下生產(chǎn)者-消費(fèi)者編程方案,并研究一個(gè)實(shí)際的示例來(lái)向您演示 Consumer 類是如何工作的。請(qǐng)注意,對(duì)于多線程應(yīng)用程序開(kāi)發(fā)或消費(fèi)者-生產(chǎn)者方案,本文不作深入介紹;有關(guān)那些主題,請(qǐng)參閱 參考資料獲取文章的清單。

          多線程基礎(chǔ)知識(shí)

          多線程是一種使應(yīng)用程序能同時(shí)處理多個(gè)操作的編程技術(shù)。通常有兩種不同類型的多線程操作使用多個(gè)線程:

          • 適時(shí)事件,當(dāng)作業(yè)必須在特定的時(shí)間或在特定的間隔內(nèi)調(diào)度執(zhí)行時(shí)
          • 后臺(tái)處理,當(dāng)后臺(tái)事件必須與當(dāng)前執(zhí)行流并行處理或執(zhí)行時(shí)

          適時(shí)事件的示例包括程序提醒、超時(shí)事件以及諸如輪詢和刷新之類的重復(fù)性操作。后臺(tái)處理的示例包括等待發(fā)送的包或等待處理的已接收的消息。





          回頁(yè)首


          生產(chǎn)者-消費(fèi)者關(guān)系

          生產(chǎn)者-消費(fèi)者方案很適合于后臺(tái)處理類別的情況。這些情況通常圍繞一個(gè)作業(yè)“生產(chǎn)者”方和一個(gè)作業(yè)“消費(fèi)者”方。當(dāng)然,關(guān)于作業(yè)并行執(zhí)行還有其它考慮事項(xiàng)。在大多數(shù)情況下,對(duì)于使用同一資源的作業(yè),應(yīng)以“先來(lái)先服務(wù)”的方式按順序處理,這可以通過(guò)使用單線程的消費(fèi)者輕松實(shí)現(xiàn)。通過(guò)使用這種方法,我們使用單個(gè)線程來(lái)訪問(wèn)單個(gè)資源,而不是用多個(gè)線程來(lái)訪問(wèn)單個(gè)資源。

          要啟用標(biāo)準(zhǔn)消費(fèi)者,當(dāng)作業(yè)到來(lái)時(shí)創(chuàng)建一個(gè)作業(yè)隊(duì)列來(lái)存儲(chǔ)所有作業(yè)。生產(chǎn)者線程通過(guò)將新對(duì)象添加到消費(fèi)者隊(duì)列來(lái)交付這個(gè)要處理的新對(duì)象。然后消費(fèi)者線程從隊(duì)列取出每個(gè)對(duì)象,并依次處理。當(dāng)隊(duì)列為空時(shí),消費(fèi)者進(jìn)入休眠。當(dāng)新的對(duì)象添加到空隊(duì)列時(shí),消費(fèi)者會(huì)醒來(lái)并處理該對(duì)象。因?yàn)榇蠖鄶?shù)應(yīng)用程序喜歡順序處理方式,所以消費(fèi)者通常是單線程的。





          回頁(yè)首


          問(wèn)題:代碼重復(fù)

          因?yàn)樯a(chǎn)者-消費(fèi)者方案很常用,所以在構(gòu)建應(yīng)用程序時(shí)它可能會(huì)出現(xiàn)幾次,這導(dǎo)致了代碼重復(fù)。我們認(rèn)識(shí)到,這顯示了在應(yīng)用程序開(kāi)發(fā)過(guò)程期間多次使用了生產(chǎn)者-消費(fèi)者方案的問(wèn)題。

          當(dāng)?shù)谝淮涡枰a(chǎn)者-消費(fèi)者行為時(shí),通過(guò)編寫(xiě)一個(gè)采用一個(gè)線程和一個(gè)隊(duì)列的類來(lái)實(shí)現(xiàn)該行為。當(dāng)?shù)诙涡枰@種行為時(shí),我們著手從頭開(kāi)始實(shí)現(xiàn)它,但是接著認(rèn)識(shí)到以前已經(jīng)做過(guò)這件事了。我們復(fù)制了代碼并修改了處理對(duì)象的方式。當(dāng)?shù)谌卧谠搼?yīng)用程序中實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者行為時(shí),很明顯我們復(fù)制了太多代碼。我們決定,需要一個(gè)適用的 Consumer 類,它將處理我們所有的生產(chǎn)者-消費(fèi)者方案。





          回頁(yè)首


          我們的解決方案:Consumer 類

          我們創(chuàng)建 Consumer 類的目的是:在我們的應(yīng)用程序中,消除這種代碼重復(fù) ― 為每個(gè)生產(chǎn)者-消費(fèi)者實(shí)例編寫(xiě)一個(gè)新作業(yè)隊(duì)列和消費(fèi)者線程來(lái)解決這個(gè)問(wèn)題。有了適當(dāng)?shù)?Consumer 類,我們所必須做的只是編寫(xiě)專門(mén)用于作業(yè)處理(業(yè)務(wù)邏輯)的代碼。這使得我們的代碼更清晰、更易于維護(hù)以及更改起來(lái)更靈活。

          我們對(duì) Consumer 類有如下需求:

          • 重用:我們希望這個(gè)類包括所有東西。一個(gè)線程、一個(gè)隊(duì)列以及使這兩者結(jié)合在一起的所有邏輯。這將使我們只須編寫(xiě)隊(duì)列中“消費(fèi)”特定作業(yè)的代碼。(因而,例如,程序員使用 Consumer 類時(shí),將重載 onConsume(ObjectjobToBeConsumed) 方法。)
          • 隊(duì)列選項(xiàng):我們希望能夠設(shè)置將由 Consumer 對(duì)象使用的隊(duì)列實(shí)現(xiàn)。但是,這意味著我們必須確保隊(duì)列是線程安全的或使用一個(gè)不會(huì)與消費(fèi)操作沖突的單線程生產(chǎn)者。無(wú)論使用哪種方法,都必須將隊(duì)列設(shè)計(jì)成允許不同的進(jìn)程能訪問(wèn)其方法。
          • Consumer 線程優(yōu)先級(jí):我們希望能夠設(shè)置 Consumer 線程運(yùn)行的優(yōu)先級(jí)。
          • Consumer 線程命名:線程擁有一個(gè)有意義的名稱會(huì)比較方便,當(dāng)然這的確有助于調(diào)試。例如,如果您向 Java 虛擬機(jī)發(fā)送了一個(gè)信號(hào),它將生成一個(gè)完整的線程轉(zhuǎn)儲(chǔ) ― 所有線程及其相應(yīng)堆棧跟蹤的快照。要在 Windows 平臺(tái)上生成這個(gè)線程轉(zhuǎn)儲(chǔ),您必須在 Java 程序運(yùn)行的窗口中按下鍵序列 <ctrl><break> ,或者單擊窗口上的“關(guān)閉”按鈕。有關(guān)如何使用完整的線程轉(zhuǎn)儲(chǔ)來(lái)診斷 Java 軟件問(wèn)題的更多信息,請(qǐng)參閱 參考資料




          回頁(yè)首


          類代碼

          getThread() 方法中,我們使用“惰性創(chuàng)建”來(lái)創(chuàng)建 Consumer 的線程,如清單 1 所示:


          清單 1. 創(chuàng)建 Consumer 的線程
               /**
                 * Lazy creation of the Consumer's thread.
                 *
                 * @return  the Consumer's thread
                 */
                private Thread getThread()
                {
                   if (_thread==null)
                   {
                      _thread = new Thread()
                      {
                         public void run()
                         {
                            Consumer.this.run();
                         }
                      };
                   }
                   return _thread;
          

          該線程的 run() 方法運(yùn)行 Consumerrun() 方法,它是主消費(fèi)者循環(huán),如清單 2 所示:


          清單 2. run() 方法是主 Consumer 循環(huán)
               /**
                 *  Main Consumer's thread method.
                 */
                private void run()
                {
                   while (!_isTerminated)
                   {
                      // job handling loop
                while (true)
                      {
                         Object o;
                         synchronized (_queue)
                         {
                            if (_queue.isEmpty())
                    break;
                            o = _queue.remove();
                         }
                         if (o == null)
                    break;
                         onConsume(o);
                      }
          
                      // if we are not terminated and the queue is still empty
                      // then wait until new jobs arrive.
          
                      synchronized(_waitForJobsMonitor)
                      {
                         if (_isTerminated)
                    break;
                         if(_queue.isEmpty())
                         {
                  try
                            {
                               _waitForJobsMonitor.wait();
                            }
                            catch (InterruptedException ex)
                            {
                            }
                         }
                      }
                   }
          }// run()
          

          基本上, Consumer 的線程一直運(yùn)行,直到隊(duì)列中不再有等待的作業(yè)為止。然后它進(jìn)入休眠,只在第一次調(diào)用 add(Object) 時(shí)醒來(lái),該方法向隊(duì)列添加一個(gè)新作業(yè)并“踢”醒該線程。

          使用 wait()notify() 機(jī)制來(lái)完成“睡眠”和“踢”。實(shí)際的消費(fèi)者工作由 OnConsume(Object) 方法處理,如清單 3 所示:


          清單 3. 喚醒和通知 Consumer
               /**
                * Add an object to the Consumer.
                * This is the entry point for the producer.
                * After the item is added, the Consumer's thread
                * will be notified.
                *
                * @param  the object to be 'consumed' by this consumer
                */
                public void add(Object o)
                {
                   _queue.add(o);
                   kickThread();
                }
          
                /**
                 * Wake up the thread (without adding new stuff to consume)
                 *
                 */
                public void kickThread()
                {
                   if (!this._thread.isInterrupted())
                   {
                      synchronized(_waitForJobsMonitor)
                      {
                         _waitForJobsMonitor.notify();
                      }
                   }
                }
          





          回頁(yè)首


          示例:MessagesProcessor

          為了向您展示 Consumer 類是如何工作的,我們將使用一個(gè)簡(jiǎn)單示例。 MessagesProcessor 類以異步方式處理進(jìn)入的消息(也就是說(shuō),不干擾調(diào)用線程)。其工作是在每個(gè)消息到來(lái)時(shí)打印它。 MessagesProcessor 具有一個(gè)處理到來(lái)的消息作業(yè)的內(nèi)部 Consumer 。當(dāng)新作業(yè)進(jìn)入空隊(duì)列時(shí), Consumer 調(diào)用 processMessage(String) 方法來(lái)處理它,如清單 4 所示:


          清單 4. MessagesProcessor 類
                class MessagesProcessor
                {
                   String _name;
                   // anonymous inner class that supplies the consumer
                   // capabilities for the MessagesProcessor
                   private Consumer _consumer = new Consumer()
                   {
                      // that method is called on each event retrieved
                      protected void onConsume(Object o)
                      {
                         if (!(o instanceof String))
                         {
                            System.out.println("illegal use, ignoring");
                            return;
                         }
                         MessagesProcesser.this.processMessage((String)o);
                      }
                   }.setName("MessagesProcessor").init();
          
                   public void gotMessageEvent(String s)
                   {
                      _consumer.add(s);
                   }
                   private void processMessage(String s)
                   {
                      System.out.println(_name+" processed message: "+s);
                   }
          
                   private void terminate()
                   {
                     _consumer.terminateWait();
                     _name = null;
                   }
          
                   MessagesProcessor()
                   {
                      _name = "Example Consumer";
                   }
                }
          

          正如您可以從上面的代碼中所看到的,定制 Consumer 相當(dāng)簡(jiǎn)單。我們使用了一個(gè)匿名內(nèi)部類來(lái)繼承 Consumer 類,并重載抽象方法 onConsume() 。因此,在我們的示例中,只需調(diào)用 processMessage





          回頁(yè)首


          Consumer 類的高級(jí)特性

          除了開(kāi)始時(shí)提出的基本需求以外,我們還為 Consumer 類提供了一些我們覺(jué)得有用的高級(jí)特性。

          事件通知

          • onThreadTerminate():只在終止 Consumer 前調(diào)用該方法。我們出于調(diào)試目的覆蓋了這個(gè)方法。
          • goingToRest():只在 Consumer 線程進(jìn)入休眠前調(diào)用該方法(也就是說(shuō),只在調(diào)用 _waitForJobsMonitor.wait() 之前調(diào)用)。只在需要消費(fèi)者在進(jìn)入休眠之前處理一批已處理工作的復(fù)雜情況中,可能需要這種通知。

          終止

          • terminate():Consumer 線程的異步終止。
          • terminateWait():設(shè)置調(diào)用線程一直等待,直到消費(fèi)者線程實(shí)際終止為止。

          在我們的示例中,如果使用 terminate() 而不是 terminateWait() ,那么將會(huì)出現(xiàn)問(wèn)題,因?yàn)樵趯?_name 設(shè)置成空值之后調(diào)用 onConsume() 方法。這將導(dǎo)致執(zhí)行 processMessage 的線程拋出一個(gè) NullPointerException





          回頁(yè)首


          結(jié)束語(yǔ):Consumer 類的好處

          可在 參考資料一節(jié)下載 Consumer 類的源代碼。請(qǐng)自由使用源代碼,并按照您的需要擴(kuò)展它。我們發(fā)現(xiàn)將這個(gè)類用于多線程應(yīng)用程序開(kāi)發(fā)有許多好處:

          • 代碼重用/重復(fù)代碼的消除:如果您有 Consumer 類,就不必為您應(yīng)用程序中的每個(gè)實(shí)例編寫(xiě)一個(gè)新的消費(fèi)者。如果在應(yīng)用程序開(kāi)發(fā)中頻繁使用生產(chǎn)者-消費(fèi)者方案,這可以很大程度地節(jié)省時(shí)間。另外,請(qǐng)牢記重復(fù)代碼是滋生錯(cuò)誤的沃土。它還使基本代碼的維護(hù)更為困難。
          • 更少錯(cuò)誤:使用驗(yàn)證過(guò)的代碼是一種防止錯(cuò)誤的好實(shí)踐,尤其是處理多線程應(yīng)用程序時(shí)。因?yàn)?Consumer 類已經(jīng)被調(diào)試過(guò),所以它更安全。消費(fèi)者還通過(guò)在線程和資源之間擔(dān)任安全中介來(lái)防止與線程相關(guān)的錯(cuò)誤。消費(fèi)者可以代表其它線程以順序的方式訪問(wèn)資源。
          • 漂亮、清晰的代碼:使用 Consumer 類有助于我們編寫(xiě)出更簡(jiǎn)單的代碼,這樣的代碼更容易理解和維護(hù)。如果我們不使用 Consumer 類,就必須編寫(xiě)代碼來(lái)處理兩種不同的功能:消費(fèi)邏輯(隊(duì)列和線程管理、同步等)和指定消費(fèi)者的用法或功能的代碼。

          posted on 2006-08-24 17:53 Binary 閱讀(251) 評(píng)論(0)  編輯  收藏 所屬分類: j2se

          主站蜘蛛池模板: 平乐县| 阳朔县| 聂荣县| 蒲城县| 广水市| 军事| 白城市| 师宗县| 县级市| 湟源县| 南投县| 赤水市| 明溪县| 伊宁县| 新郑市| 喀喇沁旗| 日喀则市| 弋阳县| 元阳县| 嘉禾县| 灵台县| 牙克石市| 合山市| 堆龙德庆县| 西峡县| 马龙县| 体育| 容城县| 全南县| 潮安县| 石渠县| 策勒县| 宁强县| 广西| 渭南市| 丘北县| 大足县| 孝昌县| 淮安市| 馆陶县| 曲水县|