Microsoft 近期推出了一種用于生成集成應(yīng)用程序的新平臺(tái)——Microsoft .NET 框架。.NET 框架允許開(kāi)發(fā)人員使用任何編程語(yǔ)言迅速生成和部署 Web 服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL) 和實(shí)時(shí) (JIT) 編譯器使這種不依賴(lài)語(yǔ)言的框架得以實(shí)現(xiàn)。
與 .NET 框架同時(shí)面世的還有一種新的編程語(yǔ)言 C#(讀作“C sharp”)。C# 是一種簡(jiǎn)單、新穎、面向?qū)ο蠛皖?lèi)型安全的編程語(yǔ)言。利用 .NET 框架和 C#(除 Microsoft? Visual Basic? 和 Managed C++ 之外),用戶可以編寫(xiě)功能強(qiáng)大的 Microsoft Windows? 和 Web 應(yīng)用程序及服務(wù)。本文提供了這樣的一個(gè)解決方案,它的重點(diǎn)是 .NET 框架和 C# 而不是編程語(yǔ)言。C# 語(yǔ)言的介紹可以在“ C# 簡(jiǎn)介和概述(英文)”找到。
近期的文章“MSMQ:可伸縮、高可用性的負(fù)載平衡解 決方案(英文)”介紹了一種解決方案,用于高可用性消息隊(duì)列 (MSMQ) 的可伸縮負(fù)載平衡解決方案體系結(jié)構(gòu)。此解決方案中涉及了一種將 Windows 服務(wù)用作智能消息路由器的開(kāi)發(fā)方案。這樣的解決方案以前只有 Microsoft Visual C++? 程序員才能實(shí)現(xiàn),而 .NET 框架的出現(xiàn)改變了這種情況。從下面的解決方案中,您可以看到這一點(diǎn)。
.NET 框架應(yīng)用程序
這里介紹 的解決方案是一種用來(lái)處理若干消息隊(duì)列的 Windows 服務(wù);其中每個(gè)隊(duì)列都是由多個(gè)線程進(jìn)行處理(接收和處理消息)。處理程序使用循環(huán)法技術(shù)或應(yīng)用程序特定值(消息 AppSpecific 屬性)從目的隊(duì)列列表中路由消息,并使用消息屬性來(lái)調(diào)用組件方法。(示例進(jìn)程也屬于這種情況。)在后一種情況下,組件的要求是它能夠?qū)崿F(xiàn)給定的接口 IWebMessage。要處理錯(cuò)誤,應(yīng)用程序需要將不能處理的消息發(fā)送到錯(cuò)誤隊(duì)列中。
消息應(yīng)用程序的結(jié)構(gòu)與以前的活動(dòng)模板庫(kù) (ATL) 應(yīng)用程序相似,它們之間的主要不同在于用于管理服務(wù)的代碼的封裝和 .NET 框架組件的使用。要?jiǎng)?chuàng)建 Windows 服務(wù),.NET 框架用戶僅僅需要?jiǎng)?chuàng)建一個(gè)從 ServiceBase(來(lái)自 System.ServiceControl 程序集)繼承的類(lèi)。這毫不奇怪,因?yàn)? .NET 框架是面向?qū)ο蟮摹?/font>
應(yīng)用程序結(jié)構(gòu)
應(yīng)用程序中主要的類(lèi)是 ServiceControl,它是從 ServiceBase 繼承的。因而,它必須實(shí)現(xiàn) OnStart 和 OnStop 方法,以及可選的 OnPause 和 OnContinue 方法。事實(shí)上,類(lèi)是在靜態(tài)方法 Main 內(nèi)構(gòu)造的:
using System; using System.ServiceProcess; public class ServiceControl: ServiceBase { // 創(chuàng)建服務(wù)對(duì)象的主入口點(diǎn) public static void Main() { ServiceBase.Run(new ServiceControl()); } // 定義服務(wù)參數(shù)的構(gòu)造對(duì)象 public ServiceControl() { CanPauseAndContinue = true; ServiceName = "MSDNMessageService"; AutoLog = false; } protected override void OnStart(string[] args) {...} protected override void OnStop() {...} protected override void OnPause() {...} protected override void OnContinue() {...} } |
ServiceControl 類(lèi)創(chuàng)建一系列 CWorker 對(duì)象,即,為需要處理的每個(gè)消息隊(duì)列創(chuàng)建 CWorker 類(lèi)的一個(gè)實(shí)例。根據(jù)定義中處理隊(duì)列所需的線程數(shù)目,CWorker 類(lèi)依次創(chuàng)建了一系列的 CWorkerThread 對(duì)象。CWorkerThread 類(lèi)創(chuàng)建的一個(gè)處理線程將執(zhí)行實(shí)際的服務(wù)工作。
使用 CWorker 和 CWorkerThread 類(lèi)的主要目的是確認(rèn)服務(wù)控件 Start、Stop、Pause 和 Continue 命令。因?yàn)檫@些進(jìn)程必須是無(wú)阻塞的,命令操作最終將在后臺(tái)處理線程上執(zhí)行。
CWorkerThread 是一個(gè)抽象類(lèi),被 CWorkerThreadAppSpecific 、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 繼承。這些類(lèi)以不同的方式處理消息。前兩個(gè)類(lèi)通過(guò)給另一隊(duì)列發(fā)送消息來(lái)處理消息(其不同之處在于確定接收隊(duì)列路徑的方式),最后一個(gè)類(lèi)則使用消息屬性來(lái)調(diào) 用組件方法。
.NET 框架內(nèi)部的錯(cuò)誤處理是以基類(lèi) Exception 為基礎(chǔ)的。當(dāng)系統(tǒng)引發(fā)或捕獲錯(cuò)誤時(shí),這些錯(cuò)誤必須是從 Exception 中導(dǎo)出的類(lèi)。CWorkerThreadException 類(lèi)就是這樣一種實(shí)現(xiàn),它通過(guò)附加額外屬性(用于定義服務(wù)是否應(yīng)繼續(xù)運(yùn)行)來(lái)擴(kuò)展基類(lèi)。
最后,應(yīng)用程序包含兩種結(jié)構(gòu)。這些值類(lèi)型定義了輔助進(jìn)程或線程的運(yùn)行時(shí)參數(shù),以簡(jiǎn)化 CWorker 和 CWorkerThread 對(duì)象的結(jié)構(gòu)。使用值類(lèi)型結(jié)構(gòu)(而不是引用類(lèi)型類(lèi))能夠確保這些運(yùn)行時(shí)參數(shù)維護(hù)的是數(shù)值(而不是引用)。
IWebMessage 接口
CWorkerThread 的實(shí)現(xiàn)之一是一個(gè)調(diào)用組件方法的類(lèi)。這個(gè)名為 CWorkerThreadAssembly 的類(lèi)使用 IWebMessage 接口來(lái)定義服務(wù)和組件之間的約定。
與當(dāng)前版本的 Microsoft Visual Studio? 不同,C# 接口可以在任何語(yǔ)言中顯式定義,而不需要?jiǎng)?chuàng)建和編譯 IDL 文件。C# IWebMessage 接口的定義如下:
public interface IWebMessage { WebMessageReturn Process(string sMessageLabel, string sMessageBody, int iAppSpecific); void Release(); } |
ATL 代碼中的 Process 方法是為處理消息而指定的。Process 方法的返回代碼定義為枚舉類(lèi)型 WebMessageReturn:
public enum WebMessageReturn { ReturnGood, ReturnBad, ReturnAbort } |
枚舉的定義如下:Good 表示繼續(xù)處理,Bad 表示將消息寫(xiě)入錯(cuò)誤隊(duì)列,Abort 表示終止處理。Release 方法為服務(wù)提供了輕松清除類(lèi)實(shí)例的途徑。因?yàn)閮H在垃圾回收的過(guò)程中才調(diào)用類(lèi)實(shí)例的析構(gòu)函數(shù),所以確保所有占用昂貴資源(例如數(shù)據(jù)庫(kù)連接)的類(lèi)都有一個(gè)能夠 在析構(gòu)之前被調(diào)用的方法,用來(lái)釋放這些資源,這是一種非常好的構(gòu)思。
名稱(chēng)空間
在這里先簡(jiǎn)單介紹一下名稱(chēng)空間。名稱(chēng)空間允許在內(nèi)部和外部表示中將應(yīng)用程序組織成為邏輯元素。服務(wù)內(nèi)的所有代碼都包含在 MSDNMessageService.Service 名稱(chēng)空間內(nèi)。盡管服務(wù)代碼包含在若干文件中,但是由于它們包含在同一名稱(chēng)空間中,因此用戶不需要引用其他文件。
由于 IWebMessage 接口包含在 MSDNMessageService.Interface 名稱(chēng)空間中,因此使用此接口的線程類(lèi)具有一個(gè)接口名稱(chēng)空間。
服務(wù)類(lèi)
應(yīng)用程序的目的是監(jiān)視和處理消息隊(duì)列,每一隊(duì)列在收到消息時(shí)都執(zhí)行不同的進(jìn)程。應(yīng)用程序是作為 Windows 服務(wù)來(lái)實(shí)現(xiàn)的。
ServiceBase 類(lèi)
如前所述,服務(wù)的基本結(jié)構(gòu)是從 ServiceBase 繼承的類(lèi)。重要的方法包括 OnStart、OnStop、OnPause 和 OnContinue,每一個(gè)替代方法都與一個(gè)服務(wù)控制操作直接對(duì)應(yīng)。OnStart 方法的目的是創(chuàng)建 CWorker 對(duì)象,而 CWorker 類(lèi)又創(chuàng)建 CWorkerThread 對(duì)象,然后在該對(duì)象中創(chuàng)建執(zhí)行服務(wù)工作的線程。
服務(wù)的運(yùn)行時(shí)配置(以及 CWorker 和 CWorkerThread 對(duì)象的屬性)是在基于 XML 的配置文件中維護(hù)的。它的名稱(chēng)與創(chuàng)建的 .exe 文件相同,但帶有一個(gè) .cfg 后綴。配置示例如下:
<?xml version="1.0"?> <configuration> <ProcessList> <ProcessDefinition ProcessName="Worker1" ProcessDesc="Message Worker with 2 Threads" ProcessType="AppSpecific" ProcessThreads="2" InputQueue=".\private$\test_load1" ErrorQueue=".\private$\test_error"> <OutputList> <OutputDefinition OutputName=".\private$\test_out11" /> <OutputDefinition OutputName=".\private$\test_out12" /> </OutputList> </ProcessDefinition> <ProcessDefinition ProcessName="Worker2" ProcessDesc="Assembly Worker with 1 Thread" ProcessType="Assembly" ProcessThreads="1" InputQueue=".\private$\test_load2" ErrorQueue=".\private$\test_error"> <OutputList> <OutputDefinition OutputName="C:\MSDNMessageService\MessageExample.dll" /> <OutputDefinition OutputName="MSDNMessageService.MessageSample.ExampleClass"/> </OutputList> </ProcessDefinition> </ProcessList> </configuration> |
對(duì)此信息的訪問(wèn)通過(guò)來(lái)自 System.Configuration 程序集的 ConfigManager 類(lèi)來(lái)管理。靜態(tài) Get 方法返回信息的集合,這些集合將被枚舉以獲得單個(gè)屬性。這些屬性集的設(shè)置決定了輔助對(duì)象的運(yùn)行時(shí)特征。除了這一配置文件,您還應(yīng)該創(chuàng)建定義 XML 文件結(jié)構(gòu)的圖元文件,并在其中引用位于服務(wù)器 machine.cfg 配置文件中的圖元文件:
<?xml version ="1.0"?> <MetaData xmlns="x-schema:CatMeta.xms"> <DatabaseMeta InternalName="MessageService"> <ServerWiring Interceptor="Core_XMLInterceptor"/> <Collection InternalName="Process" PublicName="ProcessList" PublicRowName="ProcessDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> <Property InternalName="ProcessDesc" Type="String" /> <Property InternalName="ProcessType" Type="Int32" DefaultValue="RoundRobin" > <Enum InternalName="RoundRobin" Value="0"/> <Enum InternalName="AppSpecific" Value="1"/> <Enum InternalName="Assembly" Value="2"/> </Property> <Property InternalName="ProcessThreads" Type="Int32" DefaultValue="1" /> <Property InternalName="InputQueue" Type="String" /> <Property InternalName="ErrorQueue" Type="String" /> <Property InternalName="OutputName" Type="String" /> <QueryMeta InternalName="All" MetaFlags="ALL" /> <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> </Collection> <Collection InternalName="Output" PublicName="OutputList" PublicRowName="OutputDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> <Property InternalName="OutputName" Type="String" MetaFlags="PRIMARYKEY" /> <QueryMeta InternalName="All" MetaFlags="ALL" /> <QueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> </Collection> </DatabaseMeta> <RelationMeta PrimaryTable="Process" PrimaryColumns="ProcessName" ForeignTable="Output" ForeignColumns="ProcessName" MetaFlags="USECONTAINMENT"/> </MetaData> |
private Hashtable htWorkers = new Hashtable(); IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new AppDomainSelector()); foreach (IConfigItem ciWorker in cWorkers) { WorkerFormatter sfWorker = new WorkerFormatter(); sfWorker.ProcessName = (string)ciWorker["ProcessName"]; sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"]; sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"]; sfWorker.InputQueue = (string)ciWorker["InputQueue"]; sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"]; // 計(jì)算并定義進(jìn)程類(lèi)型 switch ((int)ciWorker["ProcessType"]) { case 0: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessRoundRobin; break; case 1: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAppSpecific; break; case 2: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAssembly; break; default: throw new Exception("Unknown Processing Type"); } // 執(zhí)行更多的工作以讀取輸出信息 string sProcessName = (string)ciWorker["ProcessName"]; if (htWorkers.ContainsKey(sProcessName)) throw new ArgumentException("Process Name Must be Unique: " + sProcessName); htWorkers.Add(sProcessName, new CWorker(sfWorker)); } |
在這段代碼中沒(méi)有包含的主要信息是輸出數(shù)據(jù)的獲取。每一個(gè)進(jìn)程定義中都有一組相應(yīng)的輸出定義項(xiàng)。該信息是通過(guò)如下的簡(jiǎn)單查詢讀取的:
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" + sfWorker.ProcessName + " AND Selector=appdomain://"; ConfigQuery qQuery = new ConfigQuery(sQuery); IConfigCollection cOutputs = ConfigManager.Get("OutputList", qQuery); int iSize = cOutputs.Count, iLoop = 0; sfWorker.OutputName = new string[iSize]; foreach (IConfigItem ciOutput in cOutputs) sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"]; |
CWorkerThread 和 Cworker 類(lèi)都有相應(yīng)的服務(wù)控制方法,根據(jù)服務(wù)控制操作進(jìn)行調(diào)用。由于 Hashtable 中引用了每一個(gè) CWorker 對(duì)象,因此需要枚舉 Hashtable 的內(nèi)容,以調(diào)用適當(dāng)?shù)姆?wù)控制方法:
foreach (CWorker cWorker in htWorkers.Values) cWorker.Start(); |
類(lèi)似地,實(shí)現(xiàn)的 OnPause、OnContinue 和 OnStop 方法是通過(guò)調(diào)用 CWorker 對(duì)象上的相應(yīng)方法來(lái)執(zhí)行操作的。
CWorker 類(lèi)
CWorker 類(lèi)的主要功能是創(chuàng)建和管理 CWorkerThread 對(duì)象。Start、Stop、Pause 和 Continue 方法調(diào)用相應(yīng)的 CWorkerThread 方法。實(shí)際的 CWorkerThread 對(duì)象是在Start 方法中創(chuàng)建的。與使用 Hashtable 管理輔助對(duì)象引用的 Service 類(lèi)相似,CWorker 使用 ArrayList(簡(jiǎn)單的動(dòng)態(tài)數(shù)組)來(lái)維護(hù)線程對(duì)象的列表。
在這個(gè)數(shù)組內(nèi)部,CWorker 類(lèi)創(chuàng)建了 CWorkerThread 類(lèi)的一個(gè)實(shí)現(xiàn)版本。CWorkerThread 類(lèi)(將在下面討論)是一個(gè)必須繼承的抽象類(lèi)。導(dǎo)出類(lèi)定義了消息的處理方式:
aThreads = new ArrayList(); for (int idx=0; idx<sfWorker.NumberThreads; idx++) { WorkerThreadFormatter wfThread = new WorkerThreadFormatter(); wfThread.ProcessName = sfWorker.ProcessName; wfThread.ProcessDesc = sfWorker.ProcessDesc; wfThread.ThreadNumber = idx; wfThread.InputQueue = sfWorker.InputQueue; wfThread.ErrorQueue = sfWorker.ErrorQueue; wfThread.OutputName = sfWorker.OutputName; // 定義輔助類(lèi)型,并將其插入輔助線程結(jié)構(gòu) CWorkerThread wtBase; switch (sfWorker.ProcessType) { case WorkerFormatter.SFProcessType.ProcessRoundRobin: wtBase = new CWorkerThreadRoundRobin(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAppSpecific: wtBase = new CWorkerThreadAppSpecific(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAssembly: wtBase = new CWorkerThreadAssembly(this, wfThread); break; default: throw new Exception("Unknown Processing Type"); } // 添加對(duì)數(shù)組的調(diào)用 aThreads.Insert(idx, wtBase); } |
一旦所有的對(duì)象都已創(chuàng)建,就可以通過(guò)調(diào)用每個(gè)線程對(duì)象的 Start 方法來(lái)啟動(dòng)它們:
foreach(CWorkerThread cThread in aThreads) cThread.Start(); |
Stop、Pause 和 Continue 方法在 foreach 循環(huán)里執(zhí)行的操作類(lèi)似。Stop 方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this); |
在類(lèi)析構(gòu)函數(shù)中將調(diào)用 Stop 方法,這樣,在沒(méi)有顯式調(diào)用 Stop 方法的情況下也可以正確地終止對(duì)象。如果調(diào)用了 Stop
方法,將不需要析構(gòu)函數(shù)。SuppressFinalize 方法能夠防止調(diào)用對(duì)象的 Finalize 方法(析構(gòu)函數(shù)的實(shí)際實(shí)現(xiàn))。
CWorkerThread 抽象類(lèi)
CWorkerThread 是一個(gè)由 CWorkerThreadAppSpecifc、CWorkerThreadRoundRobin 和
CWorkerThreadAssembly 繼承的抽象類(lèi)。無(wú)論如何處理消息,隊(duì)列的大部分處理是相同的,所以 CWorkerThread
類(lèi)提供了這一功能。這個(gè)類(lèi)提供了抽象方法(必須被實(shí)際方法替代)以管理資源和處理消息。
類(lèi)的工作再一次通過(guò) Start、Stop、Pause 和 Continue 方法來(lái)實(shí)現(xiàn)。在 Start 方法中引用了輸入和錯(cuò)誤隊(duì)列。在 .NET 框架中,消息由 System.Messaging 名稱(chēng)空間處理:
// 嘗試打開(kāi)隊(duì)列,并設(shè)置默認(rèn)的讀寫(xiě)屬性 MessageQueue mqInput = new MessageQueue(sInputQueue); mqInput.MessageReadPropertyFilter.Body = true; mqInput.MessageReadPropertyFilter.AppSpecific = true; MessageQueue mqError = new MessageQueue(sErrorQueue); // 如果使用 MSMQ COM,則將格式化程序設(shè)置為 ActiveX mqInput.Formatter = new ActiveXMessageFormatter(); mqError.Formatter = new ActiveXMessageFormatter(); |
一旦定義了消息隊(duì)列引用,即會(huì)創(chuàng)建一個(gè)線程用于實(shí)際的處理函數(shù)(稱(chēng)為 ProcessMessages)。在 .NET 框架中,使用 System.Threading 名稱(chēng)空間很容易實(shí)現(xiàn)線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages)); procMessage.Start(); |
ProcessMessages 函數(shù)是基于 Boolean 值的處理循環(huán)。當(dāng)數(shù)值設(shè)為 False,處理循環(huán)將終止。因此,線程對(duì)象的 Stop 方法只設(shè)置這一 Boolean 值,然后關(guān)閉打開(kāi)的消息隊(duì)列,并加入帶有主線程的線程:
// 加入服務(wù)線程和處理線程 bRun = false; procMessage.Join(); // 關(guān)閉打開(kāi)的消息隊(duì)列 mqInput.Close(); mqError.Close(); |
Pause 方法只設(shè)置一個(gè) Boolean 值,使處理線程休眠半秒鐘:
if (bPause) Thread.Sleep(500); |
最后,每一個(gè) Start、Stop、Pause 和 Continue 方法將調(diào)用抽象的 OnStart、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實(shí)現(xiàn)的類(lèi)提供了掛鉤,以捕獲和釋放所需的資源。
ProcessMessages 循環(huán)具有如下基本結(jié)構(gòu):
1、接收 Message。
2、如果 Message 具有成功的 Receive,則調(diào)用抽象 ProcessMessage 方法。
3、如果 Receive 或 ProcessMessage 失敗,將 Message 發(fā)送至錯(cuò)誤隊(duì)列中。
Message mInput; try { // 從隊(duì)列中讀取,并等候 1 秒 mInput = mqInput.Receive(new TimeSpan(0,0,0,1)); } catch (MessageQueueException mqe) { // 將消息設(shè)置為 null mInput = null; // 查看錯(cuò)誤代碼,了解是否超時(shí) if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B { // 如果未超時(shí),發(fā)出一個(gè)錯(cuò)誤并記錄錯(cuò)誤號(hào) LogError("Error: " + mqe.Message); throw mqe; } } if (mInput != null) { // 得到一個(gè)要處理的消息,調(diào)用處理消息抽象方法 try { ProcessMessage(mInput); } // 捕獲已知異常狀態(tài)的錯(cuò)誤 catch (CWorkerThreadException ex) { ProcessError(mInput, ex.Terminate); } // 捕獲未知異常,并調(diào)用 Terminate catch { ProcessError(mInput, true); } } |
ProcessError 方法將錯(cuò)誤的消息發(fā)送至錯(cuò)誤隊(duì)列。另外,它也可能引發(fā)異常來(lái)終止線程。如果ProcessMessage 方法引發(fā)了終止錯(cuò)誤或 CWorkerThreadException 類(lèi)型,它將執(zhí)行此操作。
CworkerThread 導(dǎo)出類(lèi)
任何從 CWorkerThread 中繼承的類(lèi)都必須提供 OnStart、OnStop、OnPause、OnContinue 和 ProcessMessage 方法。OnStart 和 OnStop 方法獲取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時(shí)釋放和重新獲取這些資源。ProcessMessage 方法應(yīng)該處理消息,并在出現(xiàn)失敗事件時(shí)引發(fā) CWorkerThreadException 異常。
由于 CWorkerThread 構(gòu)造函數(shù)定義運(yùn)行時(shí)參數(shù),導(dǎo)出類(lèi)必須調(diào)用基類(lèi)構(gòu)造函數(shù):
public CWorkerThreadDerived(CWorker v_cParent, WorkerThreadFormatter v_wfThread) : base (v_cParent, v_wfThread) {} |
導(dǎo)出類(lèi)提供了兩種類(lèi)型的處理:將消息發(fā)送至另一隊(duì)列,或者調(diào)用組件方法。接收和發(fā)送消息的兩種實(shí)現(xiàn)使用了循環(huán)技術(shù)或應(yīng)用程序偏移(保留在消息 AppSpecific 屬性中),作為使用哪一隊(duì)列的決定因素。此方案中的配置文件應(yīng)該包括隊(duì)列路徑的列表。實(shí)現(xiàn)的 OnStart 和 OnStop 方法應(yīng)該打開(kāi)和關(guān)閉對(duì)這些隊(duì)列的引用:
iQueues = wfThread.OutputName.Length; mqOutput = new MessageQueue[iQueues]; for (int idx=0; idx<iQueues; idx++) { mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]); mqOutput[idx].Formatter = new ActiveXMessageFormatter(); } |
在這些方案中,消息的處理很簡(jiǎn)單:將消息發(fā)送必要的輸出隊(duì)列。在循環(huán)情況下,這個(gè)進(jìn)程為:
try { mqOutput[iNextQueue].Send(v_mInput); } catch (Exception ex) { // 如果錯(cuò)誤強(qiáng)制終止異常 throw new CWorkerThreadException(ex.Message, true); } // 計(jì)算下一個(gè)隊(duì)列號(hào) iNextQueue++; iNextQueue %= iQueues; |
后一種調(diào)用帶消息參數(shù)的組件的實(shí)現(xiàn)方法比較有趣。ProcessMessage 方法使用 IWebMessage 接口調(diào)入一個(gè) .NET 組件。OnStart 和 OnStop 方法獲取和釋放此組件的引用。
此方案中的配置文件應(yīng)該包含兩個(gè)項(xiàng)目:完整的類(lèi)名和類(lèi)所在文件的位置。按照 IWebMessage 接口中的定義,在組件上調(diào)用 Process 方法。
要獲取對(duì)象引用,需要使用 Activator.CreateInstance 方法。此函數(shù)需要一個(gè)程序集類(lèi)型。在這里,它是從程序集文件路徑和類(lèi)名中導(dǎo)出的。一旦獲取對(duì)象引用,它將被放入合適的接口:
private IWebMessage iwmSample; private string sFilePath, sTypeName; // 保存程序集路徑和類(lèi)型名稱(chēng) sFilePath = wfThread.OutputName[0]; sTypeName = wfThread.OutputName[1]; // 獲取對(duì)必要對(duì)象的引用 Assembly asmSample = Assembly.LoadFrom(sFilePath); Type typSample = asmSample.GetType(sTypeName); object objSample = Activator.CreateInstance(typSample); // 定義給對(duì)象的必要接口 iwmSample = (IWebMessage)objSample; 獲取對(duì)象引用后,ProcessMessage 方法將在 IWebMessage 接口上調(diào)用 Process 方法: WebMessageReturn wbrSample; try { // 定義方法調(diào)用的參數(shù) string sLabel = v_mInput.Label; string sBody = (string)v_mInput.Body; int iAppSpecific = v_mInput.AppSpecific; // 調(diào)用方法并捕捉返回代碼 wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific); } catch (InvalidCastException ex) { // 如果在消息內(nèi)容中發(fā)生錯(cuò)誤,則強(qiáng)制發(fā)出一個(gè)非終止異常 throw new CWorkerThreadException(ex.Message, false); } catch (Exception ex) { // 如果錯(cuò)誤調(diào)用程序集,則強(qiáng)制發(fā)出終止異常 throw new CWorkerThreadException(ex.Message, true); } // 如果沒(méi)有錯(cuò)誤,則檢查對(duì)象調(diào)用的返回狀態(tài) switch (wbrSample) { case WebMessageReturn.ReturnBad: throw new CWorkerThreadException ("Unable to process message: Message marked bad", false); case WebMessageReturn.ReturnAbort: throw new CWorkerThreadException ("Unable to process message: Process terminating", true); default: break; } |
提供的示例組件將消息正文寫(xiě)入數(shù)據(jù)庫(kù)表。如果捕獲到嚴(yán)重?cái)?shù)據(jù)庫(kù)錯(cuò)誤,您可能希望終止處理過(guò)程,但是在這里,僅僅將消息標(biāo)記為錯(cuò)誤的消息。
由于此示例中創(chuàng)建的類(lèi)實(shí)例可能會(huì)獲取并保留昂貴的數(shù)據(jù)庫(kù)資源,所以用 OnPause 和 OnContinue 方法釋放和重新獲取對(duì)象引用。
CworkerThread 導(dǎo)出類(lèi)
任何從 CWorkerThread 中繼承的類(lèi)都必須提供 OnStart、OnStop、OnPause、OnContinue 和 ProcessMessage 方法。OnStart 和 OnStop 方法獲取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時(shí)釋放和重新獲取這些資源。ProcessMessage 方法應(yīng)該處理消息,并在出現(xiàn)失敗事件時(shí)引發(fā) CWorkerThreadException 異常。
由于 CWorkerThread 構(gòu)造函數(shù)定義運(yùn)行時(shí)參數(shù),導(dǎo)出類(lèi)必須調(diào)用基類(lèi)構(gòu)造函數(shù):
public CWorkerThreadDerived(CWorker v_cParent, WorkerThreadFormatter v_wfThread) : base (v_cParent, v_wfThread) {} |
導(dǎo)出類(lèi)提供了兩種類(lèi)型的處理:將消息發(fā)送至另一隊(duì)列,或者調(diào)用組件方法。接收和發(fā)送消息的兩種實(shí)現(xiàn)使用了循環(huán)技術(shù)或應(yīng)用程序偏移(保留在消息 AppSpecific 屬性中),作為使用哪一隊(duì)列的決定因素。此方案中的配置文件應(yīng)該包括隊(duì)列路徑的列表。實(shí)現(xiàn)的 OnStart 和 OnStop 方法應(yīng)該打開(kāi)和關(guān)閉對(duì)這些隊(duì)列的引用:
iQueues = wfThread.OutputName.Length; mqOutput = new MessageQueue[iQueues]; for (int idx=0; idx<iQueues; idx++) { mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]); mqOutput[idx].Formatter = new ActiveXMessageFormatter(); } |
在這些方案中,消息的處理很簡(jiǎn)單:將消息發(fā)送必要的輸出隊(duì)列。在循環(huán)情況下,這個(gè)進(jìn)程為:
try { mqOutput[iNextQueue].Send(v_mInput); } catch (Exception ex) { // 如果錯(cuò)誤強(qiáng)制終止異常 throw new CWorkerThreadException(ex.Message, true); } // 計(jì)算下一個(gè)隊(duì)列號(hào) iNextQueue++; iNextQueue %= iQueues; |
后一種調(diào)用帶消息參數(shù)的組件的實(shí)現(xiàn)方法比較有趣。ProcessMessage 方法使用 IWebMessage 接口調(diào)入一個(gè) .NET 組件。OnStart 和 OnStop 方法獲取和釋放此組件的引用。
此方案中的配置文件應(yīng)該包含兩個(gè)項(xiàng)目:完整的類(lèi)名和類(lèi)所在文件的位置。按照 IWebMessage 接口中的定義,在組件上調(diào)用 Process 方法。
要獲取對(duì)象引用,需要使用 Activator.CreateInstance 方法。此函數(shù)需要一個(gè)程序集類(lèi)型。在這里,它是從程序集文件路徑和類(lèi)名中導(dǎo)出的。一旦獲取對(duì)象引用,它將被放入合適的接口:
private IWebMessage iwmSample; private string sFilePath, sTypeName; // 保存程序集路徑和類(lèi)型名稱(chēng) sFilePath = wfThread.OutputName[0]; sTypeName = wfThread.OutputName[1]; // 獲取對(duì)必要對(duì)象的引用 Assembly asmSample = Assembly.LoadFrom(sFilePath); Type typSample = asmSample.GetType(sTypeName); object objSample = Activator.CreateInstance(typSample); // 定義給對(duì)象的必要接口 iwmSample = (IWebMessage)objSample; 獲取對(duì)象引用后,ProcessMessage 方法將在 IWebMessage 接口上調(diào)用 Process 方法: WebMessageReturn wbrSample; try { // 定義方法調(diào)用的參數(shù) string sLabel = v_mInput.Label; string sBody = (string)v_mInput.Body; int iAppSpecific = v_mInput.AppSpecific; // 調(diào)用方法并捕捉返回代碼 wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific); } catch (InvalidCastException ex) { // 如果在消息內(nèi)容中發(fā)生錯(cuò)誤,則強(qiáng)制發(fā)出一個(gè)非終止異常 throw new CWorkerThreadException(ex.Message, false); } catch (Exception ex) { // 如果錯(cuò)誤調(diào)用程序集,則強(qiáng)制發(fā)出終止異常 throw new CWorkerThreadException(ex.Message, true); } // 如果沒(méi)有錯(cuò)誤,則檢查對(duì)象調(diào)用的返回狀態(tài) switch (wbrSample) { case WebMessageReturn.ReturnBad: throw new CWorkerThreadException ("Unable to process message: Message marked bad", false); case WebMessageReturn.ReturnAbort: throw new CWorkerThreadException ("Unable to process message: Process terminating", true); default: break; } |
提供的示例組件將消息正文寫(xiě)入數(shù)據(jù)庫(kù)表。如果捕獲到嚴(yán)重?cái)?shù)據(jù)庫(kù)錯(cuò)誤,您可能希望終止處理過(guò)程,但是在這里,僅僅將消息標(biāo)記為錯(cuò)誤的消息。
由于此示例中創(chuàng)建的類(lèi)實(shí)例可能會(huì)獲取并保留昂貴的數(shù)據(jù)庫(kù)資源,所以用 OnPause 和 OnContinue 方法釋放和重新獲取對(duì)象引用。
苦笑枯 2007-01-19 00:13 發(fā)表評(píng)論
文章來(lái)源:http://www.aygfsteel.com/kuxiaoku/articles/94804.html