badqiu

          XPer
          隨筆 - 46, 文章 - 3, 評(píng)論 - 195, 引用 - 0
          數(shù)據(jù)加載中……

          rapid-framework工具類介紹一: 異步IO類

          在一些特殊的場(chǎng)合,我們可能需要使用異步的IO來(lái)大幅提高性能.
          如日志信息收集.

          而rapid-framework提供的異步IO類,使用生產(chǎn)者/消費(fèi)者的多線程同步模式及Decorator模式,如同使用正常的IO一樣,只需套多一層AsyncWriter/AsyncOutputStream,即可將普通IO轉(zhuǎn)換為異步IO來(lái)使用.打開(kāi)一個(gè)異步IO后,將會(huì)在后臺(tái)開(kāi)啟一個(gè)異步的線程來(lái)寫(xiě)數(shù)據(jù).

          異步的Writer使用:

          1 BufferedWriter?writer? = ? new ?BufferedWriter( new ?AsyncWriter( new ?FileWriter( " c:/debug.log " )));???
          2 writer.write( " xxxxx " );??



          異步的OutputStream使用:

          1 BufferedOutputStream?output? = ? new ?BufferedOutputStream( new ?AsyncOutputStream( new ?FileOutputStream( " c:/debug.log " )));???
          2 output.write( " foo " .getBytes());??

          ?

          在output使用完確保output被close,因?yàn)樵赾lose時(shí),會(huì)強(qiáng)制異步線程將數(shù)據(jù)全部寫(xiě)入最終的targetOutput. 而調(diào)用flush()方法則是空操作,不會(huì)寫(xiě)數(shù)據(jù).

          異步IO使用tip(1):

          可以將BufferedWriter/BufferedOutputStream的緩沖區(qū)加大,以減少寫(xiě)入次數(shù).

          異步IO使用tip(2):

          在close異步IO時(shí)也放在一個(gè)單獨(dú)的線程中,因?yàn)樵趯?shí)際應(yīng)用場(chǎng)景中,close異步IO可能是十分耗時(shí)的操作.

          AsyncWriter源碼:

          ??1?public?class?AsyncWriter?extends?Writer?{
          ??2?
          ??3?????private?static?Log?log?=?LogFactory.getLog(AsyncWriter.class);
          ??4?????
          ??5?????private?static?final?int?DEFAULT_QUEUE_CAPACITY?=?50000;
          ??6?????private?final?static?char[]?CLOSED_SIGNEL?=?new?char[0];
          ??7?????
          ??8?????private?Writer?out;
          ??9?????private?DataProcessorThread?dataProcessor;
          ?10?????private?boolean?isClosed?=?false;
          ?11?????private?BlockingQueue<char[]>?queue?;
          ?12?????
          ?13?????private?AsyncExceptinHandler?asyncExceptinHandler?=?new?DefaultAsyncExceptinHandler();
          ?14?????private?static?long?threadSeqNumber;
          ?15?????private?static?synchronized?long?nextThreadID()?{
          ?16?????????return?++threadSeqNumber;
          ?17?????}
          ?18?????
          ?19?????private?class?DataProcessorThread?extends?Thread?{
          ?20?????????
          ?21?????????private?boolean?enabled?=?true;
          ?22?????????private?boolean?hasRuned?=?false;
          ?23?????????DataProcessorThread()?{
          ?24?????????????super("AsyncWriter.DataProcessorThread-"+nextThreadID());
          ?25?????????????setDaemon(true);
          ?26?????????}
          ?27?
          ?28?????????public?void?run()?{
          ?29?????????????hasRuned?=?true;
          ?30?????????????while?(this.enabled?||?!queue.isEmpty())?{
          ?31?????????????????
          ?32?????????????????char[]?buf;
          ?33?????????????????try?{
          ?34?????????????????????buf?=?queue.take();
          ?35?????????????????}?catch?(InterruptedException?e)?{
          ?36?//????????????????????e.printStackTrace();
          ?37?????????????????????continue;
          ?38?????????????????}
          ?39?????????????????
          ?40?????????????????if(buf?==?CLOSED_SIGNEL)?{
          ?41?????????????????????return;
          ?42?????????????????}
          ?43?????????????????
          ?44?????????????????try?{
          ?45?????????????????????out.write(buf);
          ?46?????????????????}?catch?(IOException?e)?{
          ?47??????????????????????asyncExceptinHandler.handle(e);
          ?48?????????????????}
          ?49?????????????}
          ?50?????????}
          ?51?????}
          ?52?
          ?53?????public?AsyncWriter(Writer?out)?{
          ?54?????????this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY?+?1);
          ?55?????}
          ?56?????
          ?57?????public?AsyncWriter(Writer?out,int?queueCapacity)?{
          ?58?????????this(out,queueCapacity,Thread.NORM_PRIORITY?+?1);
          ?59?????}
          ?60?????
          ?61?????public?AsyncWriter(Writer?out,int?queueCapacity,int?dataProcesserThreadPriority)?{
          ?62?????????this(out,new?ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
          ?63?????}
          ?64?????
          ?65?????public?AsyncWriter(Writer?out,BlockingQueue?queue,int?dataProcesserThreadPriority)?{
          ?66?????????if(out?==?null)?throw?new?NullPointerException();
          ?67?????????if(queue?==?null)?throw?new?NullPointerException();
          ?68?????????
          ?69?????????this.queue?=?queue;
          ?70?????????this.dataProcessor?=?new?DataProcessorThread();
          ?71?????????if(dataProcesserThreadPriority?!=?Thread.NORM_PRIORITY)?{
          ?72?????????????this.dataProcessor.setPriority(dataProcesserThreadPriority);
          ?73?????????}
          ?74?????????this.dataProcessor.start();
          ?75?????????this.out?=?out;
          ?76?????}
          ?77?????
          ?78?????public?AsyncWriter(Writer?out,AsyncExceptinHandler?handler)?{
          ?79?????????this(out);
          ?80?????????setAsyncExceptinHandler(handler);
          ?81?????}
          ?82?
          ?83?????public?void?write(char[]?buf,?int?offset,?int?length)?throws?IOException?{
          ?84?????????synchronized?(lock)?{
          ?85?????????????if(isClosed)?throw?new?IOException("already?closed");
          ?86?????????????try?{
          ?87?????????????????queue.put(BufferCopyUtils.copyBuffer(buf,?offset,?length));
          ?88?????????????}?catch?(InterruptedException?e)?{
          ?89?????????????????throw?new?IOException("AsyncWriter?occer?error",e);
          ?90?????????????}
          ?91?????????}
          ?92?????}
          ?93?
          ?94?????public?void?close()?throws?IOException?{
          ?95?????????synchronized?(lock)?{
          ?96?????????????try?{
          ?97?????????????????isClosed?=?true;
          ?98?????????????????dataProcessor.enabled?=?false;
          ?99?????????????????if(queue.isEmpty())?{
          100?????????????????????queue.offer(CLOSED_SIGNEL);
          101?????????????????}
          102?????????????????
          103?????????????????try?{
          104?????????????????????dataProcessor.join();
          105?????????????????}?catch?(InterruptedException?e)?{
          106?????????????????????//ignore
          107?????????????????}
          108?????????????????
          109?????????????????if(!dataProcessor.hasRuned)?{
          110?????????????????????dataProcessor.run();
          111?????????????????}
          112?????????????}finally?{
          113?????????????????out.close();
          114?????????????}
          115?????????}
          116?????}
          117?????
          118?????public?void?flush()?throws?IOException?{
          119?????}
          120?
          121?????protected?void?finalize()?throws?Throwable?{
          122?????????super.finalize();
          123?????????if(!isClosed)?{
          124?????????????log.warn("AsyncWriter?not?close:"+this);
          125?????????????close();
          126?????????}
          127?????}
          128?
          129?????public?void?setAsyncExceptinHandler(AsyncExceptinHandler?asyncExceptinHandler)?{
          130?????????if(asyncExceptinHandler?==?null)?throw?new?NullPointerException();
          131?????????this.asyncExceptinHandler?=?asyncExceptinHandler;
          132?????}
          133?
          134?}

          ?

          ?

          rapid-framework網(wǎng)站:
          http://code.google.com/p/rapid-framework

          ?

          在線javadoc:
          http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/

          ?

          posted on 2009-05-08 01:22 badqiu 閱讀(1503) 評(píng)論(0)  編輯  收藏


          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 阿坝| 班戈县| 白水县| 和硕县| 中方县| 同江市| 清涧县| 福海县| 辛集市| 尚志市| 赤城县| 河东区| 航空| 永清县| 铁岭县| 平谷区| 保德县| 奇台县| 安多县| 平南县| 鸡西市| 德惠市| 甘肃省| 郎溪县| 垣曲县| 苍梧县| 哈尔滨市| 石楼县| 钟山县| 额敏县| 繁昌县| 长顺县| 昌图县| 分宜县| 长宁县| 静海县| 辉县市| 尼木县| 彰武县| 东乡县| 连城县|