莊周夢蝶

          生活、程序、未來
             :: 首頁 ::  ::  :: 聚合  :: 管理

          寫一個簡單的工作流(四)資源的處理

          Posted on 2007-10-13 17:15 dennis 閱讀(704) 評論(0)  編輯  收藏 所屬分類: java 、涂鴉my open-source
              昨天晚上搞到深夜,終于將資源模塊搞定。到今天已經(jīng)完成的功能包括:
          1.四種基本路由:順序、選擇、并行、循環(huán)
          2.流程定義文件和系統(tǒng)配置文件的讀取和解析
          3.使用內(nèi)存作為流程數(shù)據(jù)和案例數(shù)據(jù)存儲的MemoryWorkFlowDAO的開發(fā)
          4.資源模塊的開發(fā)
          5.并發(fā)情況下的正確性測試等

              計劃中的功能:
          1.一個GUI的流程定義工具,這個不急,也還沒想好用什么做,web還是桌面?
          2.各個數(shù)據(jù)庫版本的WorkFlowDAO的開發(fā),將流程數(shù)據(jù)和案例數(shù)據(jù)保存在數(shù)據(jù)庫中。
          3.更多的測試和example試驗(yàn)。

              回到資源這個概念,工作流中工作項(xiàng)(work item)的由資源來驅(qū)動的,這個資源(resource)可能是用戶、角色、定時時間或者某個事件消息。在標(biāo)準(zhǔn)petri網(wǎng)中,工作項(xiàng)對應(yīng)于transition(變遷),變遷都是自動的,不需要所謂資源來驅(qū)動,顯然,這與工作流系統(tǒng)不同。具體到insect workflow(我取的名字,小巧之意),每個transition都有一個resource,用于驅(qū)動自身的firing,所有的resource都實(shí)現(xiàn)Resource接口:
          public interface Resource extends Serializable {

              
          public void start(Transition transition, Token token, Object args);
                     
              
          public ResourceType getType();

              
          public long getId();

          }
              每個資源都有一個類型,以及這個類型中獨(dú)一無二的id,start方法用于驅(qū)動transtion的firing。一般情況下,你不需要實(shí)現(xiàn)這個接口,只要繼承這個接口的抽象實(shí)現(xiàn)類AbstractResource,AbstractResource的start方法默認(rèn)實(shí)現(xiàn)是首先調(diào)用模板方法doAction(稍后解釋),然后檢查觸發(fā)條件,如果通過就直接調(diào)用transition的fire方法:
          public abstract class AbstractResource implements Resource {
                   
              
          public void start(Transition transition, Token token, Object args) {
                  doAction(transition, token, args);

                  
          if (transition.getCondition() != null
                          
          && !transition.getCondition().check(token))
                      
          throw new ConditionException(transition.getName()
                              
          + " transition沒有滿足觸發(fā)條件");
                  transition.fire(token, args);
              }
              
          public abstract void doAction(Transition transition, Token token,
                      Object args)
          ;
                 
          }

              Transtion類的fire方法有三個操作組成:從輸入庫所移走token,往輸出庫所放入token,回調(diào)handler:
              public void fire(Token token, Object args) {
                  removeTokenFromInputs(token);
                  addTokenToOutputs(token);
                  invokeHandler(token, args);
              }
              那么具體的資源顯然要實(shí)現(xiàn)AbstractResource中的doAction抽象方法,系統(tǒng)內(nèi)置了五種資源:自動資源(AutoResource)、用戶(User)、用戶組(Group)、定時器(TimerResource)和事件監(jiān)聽器(ObserverResource)。顯然,AutoResource、User和Group的doAction方法不需要做任何事情:
          public class User extends AbstractResource {
              
          protected Group group;
                      
              @Override
              
          public void doAction(Transition transition, Token token, Object arg){
              }

          }
             
              而TimerResource就需要做特殊處理了,比如我們要達(dá)到這樣的效果:節(jié)點(diǎn)1狀態(tài)已經(jīng)處于就緒,可以被觸發(fā),可我們希望在就緒后延遲半分鐘再觸發(fā),或者在晚上10點(diǎn)觸發(fā)等等。這樣的定時需求很常見,我采用了jdk5引入的ScheduledExecutorService來處理。系統(tǒng)中啟動這樣一個線程池,每個類似上面的請求都提交給這個線程池來處理,那么TimerResource就需要進(jìn)行相應(yīng)的修改:
          public abstract class TimerResource extends AbstractResource {

              
          protected int pool_size;

              
          protected static ScheduledExecutorService scheduledExecutorService;

              @Override
              
          public long getId() {
                  
          // TODO Auto-generated method stub
                  return Common.TIMER_RESOURCE_ID;
              }

              
          public TimerResource() {
                  
          this.pool_size = 5;
                  scheduledExecutorService 
          = Executors.newScheduledThreadPool(pool_size);
              }

              
          public static void shutdownPool() {
                  
          if (scheduledExecutorService != null)
                      scheduledExecutorService.shutdown();
              }

              
          public final void start(Transition transition, Token token, Object args)
                      
          throws InterruptedException {
                  
          if (transition.getCondition() != null
                          
          && !transition.getCondition().check(token))
                      
          throw new ConditionException(transition.getName()
                              
          + " transition沒有滿足觸發(fā)條件");
                  transition.removeTokenFromInputs(token);
                  doAction(transition, token, args);
              }

              
          protected class ChangeRunner implements Runnable {
                  
          private Transition transition;

                  
          private Token token;

                  
          private Object[] args;

                  
          public ChangeRunner(Transition transition, Token token, Object args) {
                      
          this.transition = transition;
                      
          this.token = token;
                      
          this.args = args;
                  }

                  
          public void run() {
                      
          if (transition.getCondition() != null
                              
          && !transition.getCondition().check(token))
                          
          throw new ConditionException(transition.getName()
                                  
          + " transition沒有滿足觸發(fā)條件");
                      transition.addTokenToOutputs(token);
                      Object real_args[] 
          = new Object[args.length - 2];
                      
          for (int i = 0; i < real_args.length; i++)
                          real_args[i] 
          = args[i + 2];
                      transition.invokeHandler(token, real_args);
                      
          try {
                          
          // 回調(diào)
                          ((WorkFlowAlgorithm) args[1]).enabledTraversing(token
                                  .getWorkFlow());
                          ((WorkFlowManager) args[
          0]).doAction(token.getId());

                      } 
          catch (InterruptedException e) {
                          Thread.currentThread().interrupt();
                      }
                  }
              }
          }
              注意到,start方法不再是直接調(diào)用transition的fire方法,而僅僅是進(jìn)行了第一步操作:移除輸入庫所的place防止重復(fù)提交。后兩步操作都延遲到了提交給線程池的任務(wù)中,也就是代碼中的ChangeRunner類中的run方法。例如TimerResource的子類DelayTimerResource用于處理延遲的觸發(fā),doAction就像這樣:
          public class DelayTimerResource extends TimerResource {
              
              @Override
              
          public void doAction(Transition transition, Token token, Object args){
                  scheduledExecutorService.schedule(
          new ChangeRunner(transition, token,
                          args), 
          this.delay, this.timeUnit);

              }
          }
              延遲的時間,時間單位這些信息都可以在流程定義文件中設(shè)置。事件監(jiān)聽器資源與此類似,ObserverResource實(shí)現(xiàn)了java.util.Observer接口,往輸出庫所放入token和回調(diào)handler兩步操作被放在了update方法中提供給Subject回調(diào)。
             


          主站蜘蛛池模板: 突泉县| 禄劝| 讷河市| 观塘区| 长乐市| 桃园市| 石楼县| 盐津县| 天长市| 平顶山市| 延津县| 昂仁县| 梓潼县| 博罗县| 确山县| 濉溪县| 阜城县| 乐山市| 深州市| 台山市| 葫芦岛市| 双江| 乾安县| 平湖市| 临洮县| 麻江县| 宽城| 叶城县| 大港区| 时尚| 松桃| 府谷县| 波密县| 南丹县| 宁明县| 阳新县| 喀喇沁旗| 弥渡县| 光山县| 乌海市| 闻喜县|