qileilove

          blog已經轉移至github,大家請訪問 http://qaseven.github.io/

          談disruptor的單線程數據庫操作

          對遠程數據庫的操作,采用disruptor能夠很好解決死鎖,
            首先是定義一個抽象類,實現Runnable接口
          public abstract class  Task implements Runnable  {
          public Task(){}
          }
          public class TaskEvent {
          private Task tk;
          public Task getTask() {
          return tk;
          }
          public void setTask(Task tk) {
          this.tk = tk;
          }
          public final static EventFactory<TaskEvent> EVENT_FACTORY = new EventFactory<TaskEvent>() {
          public TaskEvent newInstance() {
          return new TaskEvent();
          }
          };
          public class TaskEventHandler implements EventHandler<TaskEvent> {
          //  執行接口函數onEvent執行
          public void onEvent(TaskEvent event, long sequence,
          boolean endOfBatch) throws Exception {
          event.getTask().run();
          }
          }
          }
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.ScheduledExecutorService;
          import java.util.concurrent.TimeUnit;
          import com.zhenhai.bonecp.CustomThreadFactory;
          import com.zhenhai.disruptor.BatchEventProcessor;
          import com.zhenhai.disruptor.RingBuffer;
          import com.zhenhai.disruptor.SequenceBarrier;
          import com.zhenhai.disruptor.YieldingWaitStrategy;
          import com.zhenhai.disruptor.dsl.ProducerType;
          /**
          *     使用方法
          DisruptorHelper.initAndStart();
          Task tt=new Taska();
          DisruptorHelper.produce(tt);
          DisruptorHelper.shutdown();
          *
          *
          */
          public class DisruptorHelper {
          /**
          * ringbuffer容量,最好是2的N次方
          */
          private static final int BUFFER_SIZE = 1024 * 1;
          private static int group=2;
          private RingBuffer<TaskEvent> ringBuffer[];
          private SequenceBarrier sequenceBarrier[];
          private TaskEventHandler handler[];
          private BatchEventProcessor<TaskEvent> batchEventProcessor[];
          private  static DisruptorHelper instance;
          private static boolean inited = false;
          private static ScheduledExecutorService taskTimer=null;
          //JDK 創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
          private    ExecutorService execute[];
          //啟動監視線程
          static {
          System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!");
          instance = new DisruptorHelper();
          instance.init();
          inited = true;
          System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!");
          }
          **
          * 靜態類
          * @return
          */
          private DisruptorHelper(){ }
          /**
          * 初始化
          */
          private void init(){
          execute=new ExecutorService[group];
          ringBuffer=new RingBuffer[group];
          sequenceBarrier=new SequenceBarrier[group];
          handler=new TaskEventHandler[group];
          batchEventProcessor=new BatchEventProcessor[group];
          ////////////////定時執行////////////////
          //初始化ringbuffer,存放Event
          for(int i=0;i<group;i++){
          ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
          sequenceBarrier[i] = ringBuffer[i].newBarrier();
          handler[i] = new TaskEventHandler();
          batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);
          ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
          execute[i]= Executors.newSingleThreadExecutor();
          execute[i].submit(instance.batchEventProcessor[i]);
          }
          this.taskTimer =  Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));
          inited = true;
          }
          /**
          * 執行定時器
          * @param tk
          */
          private void produce(int index,Task tk){
          //System.out.println("index:="+index);
          if(index<0||index>=group) {
          System.out.println("out of group index:="+index);
          return;
          }
          // if capacity less than 10%, don't use ringbuffer anymore
          System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
          if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
          System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
          // do something
          }else {
          long sequence = ringBuffer[index].next();
          //將狀態報告存入ringBuffer的該序列號中
          ringBuffer[index].get(sequence).setTask(tk);
          //通知消費者該資源可以消費
          ringBuffer[index].publish(sequence);
          }
          }
          /**
          * 獲得容器的capacity的數量
          * @param index
          * @return
          */
          private long  remainingcapacity(int index){
          //System.out.println("index:="+index);
          if(index<0||index>=group) {
          System.out.println("out of group index:="+index);
          return 0L;
          }
          long capacity= ringBuffer[index].remainingCapacity();
          return capacity;
          }
          private void shutdown0(){
          for(int i=0;i<group;i++){
          execute[i].shutdown();
          }
          }
          ////////////////////////////////下面是靜態方法提供調用////////////////////////////////////////////////////////
          /**
          * 直接消費
          * @param tk
          */
          public static void addTask(int priority,Task tk){
          instance.produce(priority,tk);
          }
          /**
          * 定時消費
          * @param tk
          * @param delay
          * @param period
          */
          public static void scheduleTask(int priority,Task tk,long delay,long period){
          Runnable timerTask = new ScheduledTask(priority, tk);
          taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
          }
          /**
          * 定點執行
          * @param tk
          * @param hourse
          * @param minus
          * @param sec
          * @return
          */
          public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)
          {
          Runnable timerTask = new ScheduledTask(priority, tk);
          //每天2:30分執行
          long delay = Helper.calcDelay(hourse,minus,sec);
          long period = Helper.ONE_DAY;
          System.out.println("delay:"+(delay/1000)+"secs");
          taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
          return timerTask;
          }
          //對定時執行的程序進行分裝
          private static class ScheduledTask implements Runnable
          {
          private int priority;
          private Task task;
          ScheduledTask(int priority, Task task)
          {
          this.priority = priority;
          this.task = task;
          }
          public void run()
          {
          try{
          instance.produce(priority,task);
          }catch(Exception e){
          System.out.println("catch exception in DisruptorHelper!");
          }
          }
          }
          public static long getRemainingCapatiye(int index){
          return instance.getRemainingCapatiye(index);
          }
          public static void shutdown(){
          if(!inited){
          throw new RuntimeException("Disruptor還沒有初始化!");
          }
          instance.shutdown0();
          }
          }

          posted on 2014-05-15 11:53 順其自然EVO 閱讀(804) 評論(0)  編輯  收藏 所屬分類: 測試學習專欄

          <2014年5月>
          27282930123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          導航

          統計

          常用鏈接

          留言簿(55)

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 岳阳市| 延寿县| 乃东县| 青冈县| 昌邑市| 阿城市| 尤溪县| 剑川县| 曲麻莱县| 资溪县| 泗水县| 保山市| 开远市| 万盛区| 定西市| 拉萨市| 湖南省| 年辖:市辖区| 漯河市| 云霄县| 铜陵市| 栖霞市| 伊宁县| 两当县| 宝丰县| 阳原县| 锦屏县| 新民市| 乐都县| 佛冈县| 交口县| 咸丰县| 巴彦淖尔市| 都安| 乐陵市| 桑日县| 德钦县| 元谋县| 神木县| 普格县| 禄丰县|