我的家園

          我的家園

          背景:

          ? ? ? 容錯重試機制,是系統的一種自我調節,是對系統魯棒性的一種考量,在很多后臺程序中都經常涉及,特別是基于 ? task的系統中,往往這類系統要處理的事情很多,一個task的完成時間比較長,涉及的環境也比較復雜,出現很多臨時錯誤的概率較大,比如IO讀取出錯,網絡臨時不可用等等,同時這種系統往往對響應時間要求不是很高,更加看重系統的穩定和魯棒性;另外對于依賴于第三方的遠程調用,或者說其他資源的獲取,也常常涉及要考慮容錯重試;讓程序在不人工干預的情況下,處理更多的場景;

          ?

          適用場景:

          ? ? 從上面的描述我們可以總結它的適用場景:

          ? ? 1、系統對響應時間不太關心;

          ? ? 2、系統對魯棒性要求較高;

          ? ? 3、系統涉及遠程調用,資源獲??;

          ?

          設計目標:

          ? ? 1、通用性:希望作為一個獨立的模塊,為需要的程序提高方便的使用;

          ? ? 2、輕量級:希望是一個輕量級的實現,不對使用的系統有太強的侵入性;

          ? ? 3、細粒度:重試以方法為單位,而不用重試整個task;

          ?

          概要設計:

          ? ? ?要保持重試模塊的獨立性,不侵入到原有的系統中,首先的面臨的問題是,需要重試的數據從何而來,我們很容易想到DB,那么整個系統應該分為兩大塊,一是client,負責將需要重試的數據放入DB,這個也就是各個應用程序要做的事情;第二是server,負責將DB中數據取出,做相應的處理;大家可以看到這是一個典型的生產者-消費者模式;

          ?

          ?


          ?

          這里區分應用程序和容錯服務器,只是概念上的,因為容錯服務器事實上必須依賴于引用程序(需要執行部分引用程序);所以在實際應用中,一般在一臺虛擬機上,如果是應用本身在多臺服務器上的話,可以通過配置項決定是否啟用容錯重試功能;

          ?

          詳細設計-UML圖:

          ?

          ?

          1、系統的UML類圖


          ?

          2、類圖說明:

          ?

          ? ?1)、從上圖可以看出系統主要可以分為三大塊的內容,第一塊是TaskExecutor 類以上部分,通過spring的TaskExecutor與下面的模塊弱依賴,這一塊主要負責從數據庫去取出需要處理的、并且應該自己(loadbalance)處理的數據;講數據封裝在一個Notify中,交給NotifyServerServices處理;

          ? ?2)、第二塊為TaskExecutor和RetryHandlerStrategy之前的部分;這一部分主要關注容錯重試處理后的工作;如:成功則刪除DB中的記錄,否則,負責判斷是否還需要重試,間隔時間等;

          ? ?3)、第三塊為每個系統自己實現的各種RetryHandlerStrategy類;他們負責真正的重試工作;這里所有的類,可以看成是一個sever,對于client端來說,是非常簡單的,因為它只需要講數據插入到數據庫,可以通過一個clientService提供一個createNotify方法,供應用調用;

          ?

          詳細設計-數據庫設計:

          ?

          1、一共需要兩張表,task、task_history;兩張字段完全一樣:

          ?

          字段 類型 描述 可空 默認值
          task_id varchar2 PK、UUID NOT
          create_time DATE 創建時間 NOT
          handle_time DATE 任務待執行的時間 NOT
          task_handler varchar2 任務處理器類型 NOT
          load_balance_num number 負責均衡值 NOT 0
          task_parameter varchar2 執行任務的參數,json格式 YES
          retry_count number 已經重試的次數 NOT 0
          retry_reason varchar2 失敗的原因 YES

          ?

          說明:

          1)、load_balance_num:當使用集群的時候可以考慮,可以通過上圖的LoadBalanceNumStrategy類來控制他的值,比如平均分布,比如按照機器的性能使用權重分布;

          2)、task_parameter:這個用來保存重試的參數,可以約定為一種格式,自己方便解析就好了,比如json、xml等;

          3)、retry_count:當系統要求我最多重試5次的時候可以使用這個參數;當5次后還是失敗,直接移動要歷史表中,人工處理;

          4)、handle_time:當然系統要求第二次重試的時候時間間隔30分鐘的時候使用;當處理失敗的時候更新這個時間;

          5)、task_handler:任務處理器類型,比如上面類圖中的RetryHandler,通過spring,得到RetryHandler的實例來做處理;

          ?

          關鍵類偽代碼:

          ?

          從上面的設計圖可以發現主要只有兩個類,即是:NotifyScheduleMainExecutor和NotifyServerServiceImpl,其他的都是一些策略類;這里偽代碼描述這個兩個類的邏輯,策略類可以自己選擇不同的實現;

          ?

          1、NotifyScheduleMainExecutor:

          ?

          ?

          if(NotifyHandlerStrategy != null){
            獲取本機待處理的handler的列表;
          }
          if(LoadBalanceNumStrategy != null){
            獲取本機待處理的loand_balance_num的列表
          }
          if(NotifyMaxNumPerJobStrategy != null){
            獲取本機每次調度的處理的最大的notify記錄數
          }
          執行輪詢語句,提取待處理的任務的列表
          for(對每一個notify){
            if(NotifyIdCacheStrategy已經包含該ID){
               說明線程已經在執行,  
             }else{
                放入cache;
                TaskExecutor.excute(new notifyExecutor(notify,notifyServerService))
             }
          }

          ? 分析一下這里的查詢sql:

          ?

          基礎的sql =  select * from notify where handle_time <=sysdate ;
          if(handlerlist 不為空)
          {
            sql+=sql+ and hander in (handlerlist)
          } 
          if(loadbalancenumlist不為空)
          {
            sql+=sql+ and load_balance_num in (loadbalancenumlist)
          } 
          if(maxnum不為空)
          {
            sql+=sql+ and rownum<=maxnum
          } 

          ?

          2、?NotifyServerServiceImpl

          ?

          處理結果=success;
          errormessage=null;
          根據notify的task_handler得到處理的handler;
          try{
              handler.invoke(notify.getparameter())返回notifyHandlerResult
              if(notifyHandlerResult == null){
                 throw exception;
              }else if(notifyHandlerResult==失?。﹞
                  處理結果=fail;
                  errormessage=原因;
              }
          }cath(){
              使用NotifyHandlerExceptionStrategy處理;返回notifyHandlerResult
              if(notifyHandlerResult == null){
                   處理結果=exception;
                  errormessage=原因;
              }else if(notifyHandlerResult==失?。﹞
                  處理結果=fail;
                  errormessage=原因;
              }
          }
          try{
              if(notifyHandlerResult=success){
                  清除DB的數據;
              }else{
                  得到已經重試的次數oldRetryCount;
                  得到上一次執行的時間oldExecuteTime;
                  根據NotifyRetryStrategy類返回重試策略的結果 notifyRetryResult;
                  if(需要重試){
                       重試次數+1;
                       計算下一次時間;
                       設置上一次失敗原因;
                       更新DB;
                  }else{
                       移動到歷史表中;
                  }    
             }
          }cath{
             
          }finally{
              cache的操作;
          }
          
          

          ?

          使用及client配置:

          ? ? 現在假設有一個應用需要使用容錯機制,需要的操作:

          1、引入二方庫;

          2、在需要容錯的方法里面調用clientService提供的createNotify方法,插入項目的數據;

          3、編寫重試處理類;必須繼承RetryHandlerStrategy接口;

          4、編寫配置文件:整個系統依賴spring,可以分為三個配置文件,一個是client,一個是server,另外是handler,下面給出一個例子:

          ?

          ?

          client.xml

          ?

          <?xml version="1.0" encoding="UTF-8"?>
          <beans>
          	<bean id="notifyClientService" class="com.*.service.impl.NotifyClientServiceImpl">
          		<!-- notify.load_balance_num字段值生成、以及調度時where條件中取值的策略實現類,可自行擴展 -->
          		<!-- 當有多臺notify服務器時才有用,用于平衡各臺server間的壓力;一般不用配置 -->
          		<property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" />
          		<property name="notifyDao" ref="notifyDao" />
          	</bean>
          
          	<!-- 生成0、1交替的LOAD_BALANCE_NUM值,適用于2臺Notify服務器 -->
          	<bean id="alternateLoadBalanceNumStrategy" class="com.*.strategy.impl.AlternateLoadBalanceNumStrategyImpl">
          		<!-- 主機名對應的LOAD_BALANCE_NUM列表,多個用,隔開 -->
          		<property name="lbnMapByHostname">
          			<map>
          				<entry key=“dev1" value="0"/>
          				<entry key="dev2" value="1"/>
          			</map>
          		</property>
          	</bean>
          	
          	<bean id="notifyDao" class="com.*.dao.impl.NotifyDaoImpl">
          		<!-- ref可以修改為自己應用中已經配置過的sqlMapClientTemplate bean,要求內部已經嵌入datasource -->
          		<property name="sqlMapClientTemplate" ref="sqlMapTemplate" />
          		<property name="namespace" value="com.*.notify" />
          	</bean>
          </beans>
          ?

          server.xml

          ?

          ?

          <?xml version="1.0" encoding="UTF-8"?>
          <beans >
          	<!-- 該文件是Notify Server運行時需要的配置文件,加載了該文件,調度就會自動執行; -->
          	<!-- 若Client/Server都在同一個應用中,則請在部署時區別加載該文件,否則會導致多臺服務器執行相同的調度任務 -->
          	
          	<!-- 添加對Notify Client的配置支持 -->
          	<import resource="billing-spring-notify-client.xml" />
          	<!-- end for Notify Client -->
          	
          	<!-- 任務從此處開始加載 -->
          	<bean id="notifySpringScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">
          		<property name="scheduledExecutorTasks">
          			<list>
          				<ref bean="retryScheduledExecutorTask" />
          			</list>
          		</property>
          	</bean>
          
          	<!-- 容錯任務 -->
          	<bean id="retryScheduledExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask">
          		<property name="runnable" ref="retryScheduledMainExecutor" />
          		<!-- 初次執行任務delay時間,單位為ms,默認值為0,代表首次加載任務時立即執行;比如1min -->
          		<property name="delay" value="5000" />
          		<!-- 間隔時間,單位為ms,默認值為0,代表任務只執行一次;比如2min -->
          		<property name="period" value="1000" />
          		<!-- 是否采用fixedRate方式進行任務調度,默認為false,即采用fixedDelay方式 -->
          		<!-- fixedRate:定時間隔執行,不管上次任務是否已執行完畢;fixedDelay:每次任務執行完畢之后delay固定的時間 -->
          		<property name="fixedRate" value="true" />
          	</bean>
          	
          	<!-- 容錯主線程 -->
          	<bean id="retryScheduledMainExecutor" class="com.*.NotifyScheduledMainExecutor">
          		<!-- 針對Notify服務端的Service,用于更新Notify重試信息等 -->
          		<property name="notifyServerService" ref="notifyServerService" />
          		<!-- notify.notifyId緩存策略實現類,可自行擴展 -->
          		<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
          		<!-- notify.load_balance_num字段值生成、以及調度時where條件中取值的策略實現類,可自行擴展 -->
          		<!-- 當有多臺notify服務器時才有用,用于平衡各臺server間的壓力;一般不用配置 -->
          		<!-- <property name="loadBalanceNumStrategy" ref="alternateLoadBalanceNumStrategy" /> -->
          		<!-- notify.handler字段值在調度時where條件中取值的策略實現類,可自行擴展 -->
          		<!-- 當有多臺notify服務器時才有用,用于表明某臺server可執行哪些handler;一般不用配置 -->
          		<property name="notifyHandlerStrategy" ref="handerNameByRetry" />
          		<!-- 當有多臺notify服務器時才有用,用于設置某臺server調度時每次讀取的Notify最大數,用于覆蓋maxNum;一般不用配置 -->
          		<!-- <property name="notifyMaxNumPerJobStrategy" ref="defaultNotifyMaxNumPerJobStrategy" /> -->
          		<!-- 用于并發的線程池 -->
          		<property name="notifyTaskExecutor" ref="syncTaskExecutor" />
          		<!-- 每次調度讀取的Notify最大記錄數,默認為1000 -->
          		<property name="maxNum" value="1000" />
          		<property name="notifyDao" ref="notifyDao" />
          	</bean>
          	
          	<!-- 同步處理 -->
          	<bean id="syncTaskExecutor" class="org.springframework.core.task.SyncTaskExecutor">
          	</bean>
          	
          	<bean id="notifyServerService" class="com.*.impl.NotifyServerServiceImpl">
          		<!-- 針對任務執行失敗后Notify如何重試的策略實現類,可自行擴展 -->
          		<property name="notifyRetryStrategy" ref="defaultNotifyRetryStrategy" />
          		<!-- 針對任務執行失敗后異常處理策略實現類,可自行擴展 -->
          		<!-- 默認不對異常進行補救,具體handler實現類中若返回NULL或拋出異常,則均按異常處理,直接將Notify記錄遷移到歷史表中,不進行重試; -->
          		<!-- <property name="notifyHandlerExceptionStrategy" ref="defaultNotifyHandlerExceptionStrategy" /> -->
          		<!-- 描述見notifyScheduledMainExecutor -->
          		<property name="notifyIdCacheStrategy" ref="defaultNotifyIdCacheStrategy" />
          		<!-- 事務模板,需保證能夠找到對應的bean -->
          		<property name="transactionTemplate" ref="transactionTemplate" />
          		<property name="notifyDao" ref="notifyDao" />
          	</bean>
          
          	<!-- 以下幾個default*的bean為系統提供的默認實現,若有需要,可自行擴展,但必須實現相應接口 -->
          	<bean id="defaultNotifyIdCacheStrategy" class="com.*.DefaultNotifyIdCacheStrategyImpl" />
          	<bean id="defaultNotifyHandlerExceptionStrategy" class="com*.impl.DefaultNotifyHandlerExceptionStrategyImpl" />
          	<!--容錯handler-->
          	<bean id="handerNameByRetry" class="com.*.asyn.HandlerNameFilter">
          		<property name="handerNames">
          			<list>
          				<value>retryHandler</value>
          			</list>
          		</property>
          	</bean>
          	
          	<bean id="defaultNotifyMaxNumPerJobStrategy" class="com.*.DefaultNotifyMaxNumPerJobStrategyImpl">
          		<!-- 主機名對應的每次調度讀取Notify記錄的最大值 -->
          		<property name="maxNumPerJobMapByHostname">
          			<map>
          				<entry key="dev1" value="500"/>
          				<entry key="dev2" value="800"/>
          			</map>
          		</property>
          	</bean>
           	<bean id="defaultNotifyRetryStrategy" class="com.*.DefaultNotifyRetryStrategyImpl">
          		<!-- 任務執行失敗之后每次重試的間隔ms數 -->
          		<property name="retryIntervals">
          			<list>
          				<!-- 依次為第一次間隔1min,第二次5min,第三次10min,第四次30min,第五次1h -->
          				<value>60000</value>
          				<value>300000</value>
          				<value>600000</value>
          				<value>1800000</value>
          				<value>3600000</value>
          			</list>
          		</property>
          	</bean>
          	<!-- end default* -->
          </beans>

          handler.xml

          ?

          <?xml version="1.0" encoding="UTF-8"?>
          <beans >
          	
          	<bean id="retryHandler" class="com.*asyn.RetryHandler" />
          </beans>
          ?

          總結:

          現在回去看系統的目標實現情況:

          1、通用性:整個模塊對應用系統的侵入性是很小了,可以打包為一個二方庫,在公司范圍的使用;對于應用來說只增加幾個配置文件,在需要重試的地方,通過通過接口,完全于模塊解耦;

          2、輕量級:很明顯,模塊只是依賴spring,

          3、細粒度:在上面的設計中,并沒有特別強調細粒度,是因為對于選擇多大粒度完全由應用自己決定,應用在自己的重試實現類和方法之間平衡,對模塊來講,沒有任何限制;

          ?

          ?






          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 恩施市| 鄂伦春自治旗| 察隅县| 昔阳县| 巴楚县| 大冶市| 杂多县| 吉林省| 田阳县| 芦山县| 安宁市| 两当县| 库尔勒市| 白山市| 高淳县| 额尔古纳市| 万宁市| 丰城市| 越西县| 云霄县| 汕尾市| 芜湖市| 博湖县| 阳新县| 财经| 阿坝| 梁平县| 江都市| 紫云| 阳新县| 景宁| 兴业县| 葫芦岛市| 峨眉山市| 阜新市| 江永县| 玛纳斯县| 平邑县| 高州市| 萨迦县| 阜新市|