badqiu

          XPer
          隨筆 - 46, 文章 - 3, 評論 - 195, 引用 - 0
          數據加載中……

          rapid-framework工具類介紹一: 異步IO類

          在一些特殊的場合,我們可能需要使用異步的IO來大幅提高性能.
          如日志信息收集.

          而rapid-framework提供的異步IO類,使用生產者/消費者的多線程同步模式及Decorator模式,如同使用正常的IO一樣,只需套多一層AsyncWriter/AsyncOutputStream,即可將普通IO轉換為異步IO來使用.打開一個異步IO后,將會在后臺開啟一個異步的線程來寫數據.

          異步的Writer使用:

          1 BufferedWriter?writer? = ? new ?BufferedWriter( new ?AsyncWriter( new ?FileWriter( " c:/debug.log " )));???
          2 writer.write( " xxxxx " );??



          異步的OutputStream使用:

          1 BufferedOutputStream?output? = ? new ?BufferedOutputStream( new ?AsyncOutputStream( new ?FileOutputStream( " c:/debug.log " )));???
          2 output.write( " foo " .getBytes());??

          ?

          在output使用完確保output被close,因為在close時,會強制異步線程將數據全部寫入最終的targetOutput. 而調用flush()方法則是空操作,不會寫數據.

          異步IO使用tip(1):

          可以將BufferedWriter/BufferedOutputStream的緩沖區(qū)加大,以減少寫入次數.

          異步IO使用tip(2):

          在close異步IO時也放在一個單獨的線程中,因為在實際應用場景中,close異步IO可能是十分耗時的操作.

          AsyncWriter源碼:

          ??1?public?class?AsyncWriter?extends?Writer?{
          ??2?
          ??3?????private?static?Log?log?=?LogFactory.getLog(AsyncWriter.class);
          ??4?????
          ??5?????private?static?final?int?DEFAULT_QUEUE_CAPACITY?=?50000;
          ??6?????private?final?static?char[]?CLOSED_SIGNEL?=?new?char[0];
          ??7?????
          ??8?????private?Writer?out;
          ??9?????private?DataProcessorThread?dataProcessor;
          ?10?????private?boolean?isClosed?=?false;
          ?11?????private?BlockingQueue<char[]>?queue?;
          ?12?????
          ?13?????private?AsyncExceptinHandler?asyncExceptinHandler?=?new?DefaultAsyncExceptinHandler();
          ?14?????private?static?long?threadSeqNumber;
          ?15?????private?static?synchronized?long?nextThreadID()?{
          ?16?????????return?++threadSeqNumber;
          ?17?????}
          ?18?????
          ?19?????private?class?DataProcessorThread?extends?Thread?{
          ?20?????????
          ?21?????????private?boolean?enabled?=?true;
          ?22?????????private?boolean?hasRuned?=?false;
          ?23?????????DataProcessorThread()?{
          ?24?????????????super("AsyncWriter.DataProcessorThread-"+nextThreadID());
          ?25?????????????setDaemon(true);
          ?26?????????}
          ?27?
          ?28?????????public?void?run()?{
          ?29?????????????hasRuned?=?true;
          ?30?????????????while?(this.enabled?||?!queue.isEmpty())?{
          ?31?????????????????
          ?32?????????????????char[]?buf;
          ?33?????????????????try?{
          ?34?????????????????????buf?=?queue.take();
          ?35?????????????????}?catch?(InterruptedException?e)?{
          ?36?//????????????????????e.printStackTrace();
          ?37?????????????????????continue;
          ?38?????????????????}
          ?39?????????????????
          ?40?????????????????if(buf?==?CLOSED_SIGNEL)?{
          ?41?????????????????????return;
          ?42?????????????????}
          ?43?????????????????
          ?44?????????????????try?{
          ?45?????????????????????out.write(buf);
          ?46?????????????????}?catch?(IOException?e)?{
          ?47??????????????????????asyncExceptinHandler.handle(e);
          ?48?????????????????}
          ?49?????????????}
          ?50?????????}
          ?51?????}
          ?52?
          ?53?????public?AsyncWriter(Writer?out)?{
          ?54?????????this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY?+?1);
          ?55?????}
          ?56?????
          ?57?????public?AsyncWriter(Writer?out,int?queueCapacity)?{
          ?58?????????this(out,queueCapacity,Thread.NORM_PRIORITY?+?1);
          ?59?????}
          ?60?????
          ?61?????public?AsyncWriter(Writer?out,int?queueCapacity,int?dataProcesserThreadPriority)?{
          ?62?????????this(out,new?ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
          ?63?????}
          ?64?????
          ?65?????public?AsyncWriter(Writer?out,BlockingQueue?queue,int?dataProcesserThreadPriority)?{
          ?66?????????if(out?==?null)?throw?new?NullPointerException();
          ?67?????????if(queue?==?null)?throw?new?NullPointerException();
          ?68?????????
          ?69?????????this.queue?=?queue;
          ?70?????????this.dataProcessor?=?new?DataProcessorThread();
          ?71?????????if(dataProcesserThreadPriority?!=?Thread.NORM_PRIORITY)?{
          ?72?????????????this.dataProcessor.setPriority(dataProcesserThreadPriority);
          ?73?????????}
          ?74?????????this.dataProcessor.start();
          ?75?????????this.out?=?out;
          ?76?????}
          ?77?????
          ?78?????public?AsyncWriter(Writer?out,AsyncExceptinHandler?handler)?{
          ?79?????????this(out);
          ?80?????????setAsyncExceptinHandler(handler);
          ?81?????}
          ?82?
          ?83?????public?void?write(char[]?buf,?int?offset,?int?length)?throws?IOException?{
          ?84?????????synchronized?(lock)?{
          ?85?????????????if(isClosed)?throw?new?IOException("already?closed");
          ?86?????????????try?{
          ?87?????????????????queue.put(BufferCopyUtils.copyBuffer(buf,?offset,?length));
          ?88?????????????}?catch?(InterruptedException?e)?{
          ?89?????????????????throw?new?IOException("AsyncWriter?occer?error",e);
          ?90?????????????}
          ?91?????????}
          ?92?????}
          ?93?
          ?94?????public?void?close()?throws?IOException?{
          ?95?????????synchronized?(lock)?{
          ?96?????????????try?{
          ?97?????????????????isClosed?=?true;
          ?98?????????????????dataProcessor.enabled?=?false;
          ?99?????????????????if(queue.isEmpty())?{
          100?????????????????????queue.offer(CLOSED_SIGNEL);
          101?????????????????}
          102?????????????????
          103?????????????????try?{
          104?????????????????????dataProcessor.join();
          105?????????????????}?catch?(InterruptedException?e)?{
          106?????????????????????//ignore
          107?????????????????}
          108?????????????????
          109?????????????????if(!dataProcessor.hasRuned)?{
          110?????????????????????dataProcessor.run();
          111?????????????????}
          112?????????????}finally?{
          113?????????????????out.close();
          114?????????????}
          115?????????}
          116?????}
          117?????
          118?????public?void?flush()?throws?IOException?{
          119?????}
          120?
          121?????protected?void?finalize()?throws?Throwable?{
          122?????????super.finalize();
          123?????????if(!isClosed)?{
          124?????????????log.warn("AsyncWriter?not?close:"+this);
          125?????????????close();
          126?????????}
          127?????}
          128?
          129?????public?void?setAsyncExceptinHandler(AsyncExceptinHandler?asyncExceptinHandler)?{
          130?????????if(asyncExceptinHandler?==?null)?throw?new?NullPointerException();
          131?????????this.asyncExceptinHandler?=?asyncExceptinHandler;
          132?????}
          133?
          134?}

          ?

          ?

          rapid-framework網站:
          http://code.google.com/p/rapid-framework

          ?

          在線javadoc:
          http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/

          ?

          posted on 2009-05-08 01:22 badqiu 閱讀(1501) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網站導航:
           
          主站蜘蛛池模板: 本溪市| 饶河县| 嘉黎县| 博白县| 盐池县| 界首市| 河池市| 山西省| 阿拉善盟| 陆丰市| 游戏| 自治县| 信阳市| 巩义市| 彩票| 泸州市| 香河县| 轮台县| 松溪县| 会泽县| 祁阳县| 西藏| 缙云县| 房产| 金溪县| 千阳县| 沁水县| 崇义县| 来凤县| 修文县| 皮山县| 报价| 栾城县| 泗阳县| 宁津县| 杭锦旗| 绥棱县| 保德县| 六枝特区| 胶南市| 二连浩特市|