月掛夜中央

          懶惰程序員

          常用鏈接

          統(tǒng)計

          最新評論

          springside3.*中l(wèi)og4j和java.util.concurrent的結(jié)合使用

                  在springside3.*中的showcase案例中,有一個把log4j的日志存入數(shù)據(jù)庫的演示,下面是我對這個案例的學習筆記。
          1、我們首先來看下log4j相關(guān)日志的配置:
          #Async Database Appender (Store business message)
          log4j.appender.DB
          =org.springside.examples.showcase.log.appender.QueueAppender
          log4j.appender.DB.QueueName
          =dblog

          #Demo level with Async Database appender 
          log4j.logger.DBLogExample
          =INFO,Console,DB
          log4j.additivity.DBLogExample
          =false

          其中org.springside.examples.showcase.log.appender.QueueAppender就是對ssLog4j日志的一個擴展,而日志的event(里面是日志的內(nèi)容)是存放在一個BlockingQueue中,當有多個日志需要分別存入不同的地方時,就根據(jù)QueryName來區(qū)分。
          2、接下來看一下org.springside.examples.showcase.log.appender.QueueAppender里面的內(nèi)容:
          /**
           * Copyright (c) 2005-2009 springside.org.cn
           *
           * Licensed under the Apache License, Version 2.0 (the "License");
           * 
           * $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
           
          */

          package org.springside.examples.showcase.log.appender;

          import java.util.concurrent.BlockingQueue;

          import org.apache.log4j.helpers.LogLog;
          import org.apache.log4j.spi.LoggingEvent;
          import org.springside.examples.showcase.queue.QueuesHolder;

          /**
           * 輕量級的Log4j異步Appender.
           * 
           * 將所有消息放入QueueManager所管理的Blocking Queue中.
           * 
           * 
          @see QueuesHolder
           * 
           * 
          @author calvin
           
          */

          public class QueueAppender extends org.apache.log4j.AppenderSkeleton {

              
          protected String queueName;

              
          protected BlockingQueue<LoggingEvent> queue;

              
          /**
               * AppenderSkeleton回調(diào)函數(shù), 事件到達時將時間放入Queue.
               
          */

              @Override
              
          public void append(LoggingEvent event) {
                  
          if (queue == null{
                      queue 
          = QueuesHolder.getQueue(queueName);
                  }


                  
          boolean sucess = queue.offer(event);

                  
          if (sucess) {
                      LogLog.debug(
          "put event to queue success:" + new LoggingEventWrapper(event).convertToString());

                  }
           else {
                      LogLog.error(
          "Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
                  }

              }


              
          /**
               * AppenderSkeleton回調(diào)函數(shù),關(guān)閉Logger時的清理動作.
               
          */

              
          public void close() {
              }


              
          /**
               * AppenderSkeleton回調(diào)函數(shù), 設置是否需要定義Layout.
               
          */

              
          public boolean requiresLayout() {
                  
          return false;
              }


              
          /**
               * Log4j根據(jù)getter/setter從log4j.properties中注入同名參數(shù).
               
          */

              
          public String getQueueName() {
                  
          return queueName;
              }


              
          /**
               * 
          @see #getQueueName()
               
          */

              
          public void setQueueName(String queueName) {
                  
          this.queueName = queueName;
              }

          }

          這是對Log4j擴展的標準做法,繼承abstract class AppenderSkeleton,實現(xiàn)它的abstract  protected   void append(LoggingEvent event) 方法。
          而這個方法的實現(xiàn)很簡單,就是根據(jù)queueName從queuesHolder中取出一個BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到這個BlockingQueue中去,關(guān)于queuesHolder,下面會講到。到這為止,log4j的活就完成了,下面的都是concurrent的事了。
          3、讓我們轉(zhuǎn)到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
              <!-- 消息Queue管理器-->
              
          <bean class="org.springside.examples.showcase.queue.QueuesHolder">
                  
          <property name="queueSize" value="1000" />
              
          </bean>

              
          <!-- 讀出Queue中日志消息寫入數(shù)據(jù)庫的任務 -->
              
          <bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
                  
          <property name="queueName" value="dblog" />
                  
          <property name="batchSize" value="10" />
                  
          <property name="sql">
                      
          <value>
                          insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
                          values(:thread_name,:logger_name,:log_time,:level,:message)
                      
          </value>
                  
          </property>
              
          </bean>

          我們先從簡單的下手,先看QueuesHolder:
          private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息隊列

          /**
               * 根據(jù)queueName獲得消息隊列的靜態(tài)函數(shù).
               * 如消息隊列還不存在, 會自動進行創(chuàng)建.
               
          */

              
          public static <T> BlockingQueue<T> getQueue(String queueName) {
                  BlockingQueue queue 
          = queueMap.get(queueName);

                  
          if (queue == null{
                      BlockingQueue newQueue 
          = new LinkedBlockingQueue(queueSize);

                      
          //如果之前消息隊列還不存在,放入新隊列并返回Null.否則返回之前的值.
                      queue = queueMap.putIfAbsent(queueName, newQueue);
                      
          if (queue == null{
                          queue 
          = newQueue;
                      }

                  }

                  
          return queue;
              }
          其實這個類很簡單,就是一個map,key就是上面log4j配置文件中的queueName,value就是一個BlockingQueue,這樣就可以存放多個日志queue,做不同的處理。
          4、下面這個是重頭戲,先把JdbcLogWriter的代碼全貼出來:
          /**
           * Copyright (c) 2005-2009 springside.org.cn
           *
           * Licensed under the Apache License, Version 2.0 (the "License");
           * 
           * $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
           
          */

          package org.springside.examples.showcase.log.appender;

          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;

          import javax.annotation.Resource;
          import javax.sql.DataSource;

          import org.apache.log4j.spi.LoggingEvent;
          import org.springframework.dao.DataAccessException;
          import org.springframework.dao.DataAccessResourceFailureException;
          import org.springframework.jdbc.core.namedparam.SqlParameterSource;
          import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
          import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
          import org.springframework.transaction.PlatformTransactionManager;
          import org.springframework.transaction.TransactionStatus;
          import org.springframework.transaction.support.TransactionCallbackWithoutResult;
          import org.springframework.transaction.support.TransactionTemplate;
          import org.springside.examples.showcase.queue.BlockingConsumer;

          import com.google.common.collect.Lists;
          import com.google.common.collect.Maps;

          /**
           * 將Queue中的log4j event寫入數(shù)據(jù)庫的消費者任務.
           * 
           * 即時阻塞的讀取Queue中的事件,達到緩存上限后使用Jdbc批量寫入模式.
           * 如需換為定時讀取模式,繼承于PeriodConsumer稍加改造即可.
           * 
           * 
          @see BlockingConsumer
           * 
           * 
          @author calvin
           
          */

          public class JdbcLogWriter extends BlockingConsumer {

              
          protected String sql;
              
          protected int batchSize = 10;

              
          protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
              
          protected SimpleJdbcTemplate jdbcTemplate;
              
          protected TransactionTemplate transactionTemplate;

              
          /**
               * 帶Named Parameter的insert sql.
               * 
               * Named Parameter的名稱見AppenderUtils中的常量定義.
               
          */

              
          public void setSql(String sql) {
                  
          this.sql = sql;
              }


              
          /**
               * 批量讀取事件數(shù)量, 默認為10.
               
          */

              
          public void setBatchSize(int batchSize) {
                  
          this.batchSize = batchSize;
              }


              
          /**
               * 根據(jù)注入的DataSource創(chuàng)建jdbcTemplate.
               
          */

              @Resource
              
          public void setDataSource(DataSource dataSource) {
                  jdbcTemplate 
          = new SimpleJdbcTemplate(dataSource);
              }


              
          /**
               * 根據(jù)注入的PlatformTransactionManager創(chuàng)建transactionTemplate.
               
          */

              @Resource
              
          public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager) {
                  transactionTemplate 
          = new TransactionTemplate(defaultTransactionManager);
              }


              
          /**
               * 消息處理函數(shù),將消息放入buffer,當buffer達到batchSize時執(zhí)行批量更新函數(shù).
               
          */

              @Override
              
          protected void processMessage(Object message) {
                  LoggingEvent event 
          = (LoggingEvent) message;
                  eventsBuffer.add(event);

                  
          if (logger.isDebugEnabled()) {
                      logger.debug(
          "get event: {}"new LoggingEventWrapper(event).convertToString());
                  }


                  
          //已到達BufferSize則執(zhí)行批量插入操作
                  if (eventsBuffer.size() >= batchSize) {
                      updateBatch();
                  }

              }


              
          /**
               * 將Buffer中的事件列表批量插入數(shù)據(jù)庫.
               
          */

              @SuppressWarnings(
          "unchecked")
              
          public void updateBatch() {
                  
          try {
                      
          //分析事件列表, 轉(zhuǎn)換為jdbc批處理參數(shù).
                      int i = 0;
                      Map[] paramMapArray 
          = new HashMap[eventsBuffer.size()];
                      
          for (LoggingEvent event : eventsBuffer) {
                          paramMapArray[i
          ++= parseEvent(event);
                      }

                      
          final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);

                      
          //執(zhí)行批量插入,如果失敗調(diào)用失敗處理函數(shù).
                      transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                          @Override
                          
          protected void doInTransactionWithoutResult(TransactionStatus status) {
                              
          try {
                                  jdbcTemplate.batchUpdate(getActualSql(), batchParams);
                                  
          if (logger.isDebugEnabled()) {
                                      
          for (LoggingEvent event : eventsBuffer) {
                                          logger.debug(
          "saved event: {}"new LoggingEventWrapper(event).convertToString());
                                      }

                                  }

                              }
           catch (DataAccessException e) {
                                  status.setRollbackOnly();
                                  handleDataAccessException(e, eventsBuffer);
                              }

                          }

                      }
          );

                      
          //清除已完成的Buffer
                      eventsBuffer.clear();
                  }
           catch (Exception e) {
                      logger.error(
          "批量提交任務時發(fā)生錯誤.", e);
                  }

              }


              
          /**
               * 退出清理函數(shù),完成buffer中未完成的消息.
               
          */

              @Override
              
          protected void clean() {
                  
          if (!eventsBuffer.isEmpty()) {
                      updateBatch();
                  }

                  logger.debug(
          "cleaned task {}"this);
              }


              
          /**
               * 分析Event, 建立Parameter Map, 用于綁定sql中的Named Parameter.
               
          */

              
          protected Map<String, Object> parseEvent(LoggingEvent event) {
                  Map
          <String, Object> parameterMap = Maps.newHashMap();
                  LoggingEventWrapper eventWrapper 
          = new LoggingEventWrapper(event);

                  parameterMap.put(
          "thread_name", eventWrapper.getThreadName());
                  parameterMap.put(
          "logger_name", eventWrapper.getLoggerName());
                  parameterMap.put(
          "log_time", eventWrapper.getDate());
                  parameterMap.put(
          "level", eventWrapper.getLevel());
                  parameterMap.put(
          "message", eventWrapper.getMessage());
                  
          return parameterMap;
              }


              
          /**
               * 可被子類重載的數(shù)據(jù)訪問錯誤處理函數(shù),如將出錯的事件持久化到文件.
               
          */

              
          protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch) {
                  
          if (e instanceof DataAccessResourceFailureException) {
                      logger.error(
          "database connection error", e);
                  }
           else {
                      logger.error(
          "other database error", e);
                  }


                  
          for (LoggingEvent event : errorEventBatch) {
                      logger.error(
          "event insert to database error, ignore it: "
                              
          + new LoggingEventWrapper(event).convertToString(), e);
                  }

              }


              
          /**
               * 可被子類重載的sql提供函數(shù),可對sql語句進行特殊處理,如日志表的表名可帶日期后綴 LOG_2009_02_31.
               
          */

              
          protected String getActualSql() {
                  
          return sql;
              }

          }

          這個類的作用有
          1)當沒有處理的日志在1000以內(nèi)時,不停執(zhí)行日志的處理(設置在QueuesHolder中),超過1000,就報錯(見QueueAppender的append方法).
          2)每次都把一條日志放到buffer中,達到10條時開始批量的把日志入數(shù)據(jù)庫,條數(shù)和入庫的sql都寫在上面的spring配置文件中。
          可以看到,主要的方法就是processMessage。那么,這個processMessage方法是在哪里被調(diào)用的呢?
          在上面的JdbcLogWriter類的代碼中可以看到,它繼承自BlockingConsumer,我們看看BlockingConsumer里面有些什么:
          /**
           * Copyright (c) 2005-2009 springside.org.cn
           *
           * Licensed under the Apache License, Version 2.0 (the "License");
           * 
           * $Id$
           
          */

          package org.springside.examples.showcase.queue;

          /**
           * 采用即時阻塞讀取Queue中消息策略的Consumer.
           
          */

          public abstract class BlockingConsumer extends QueueConsumer {

              
          /**
               * 線程執(zhí)行函數(shù),阻塞獲取消息并調(diào)用processMessage()進行處理.
               
          */

              
          public void run() {
                  
          //循環(huán)阻塞獲取消息直到線程被中斷.
                  try {
                      
          while (!Thread.currentThread().isInterrupted()) {
                          Object message 
          = queue.take();
                          processMessage(message);
                      }

                  }
           catch (InterruptedException e) {
                      
          // Ignore.
                  }
           finally {
                      
          //退出線程前調(diào)用清理函數(shù).
                      clean();
                  }

              }


              
          /**
               * 消息處理函數(shù).
               
          */

              
          protected abstract void processMessage(Object message);

              
          /**
               * 退出清理函數(shù).
               
          */

              
          protected abstract void clean();
          }
          很明顯,BlockingConsumer肯定是繼承自Thread類或者實現(xiàn)于Runnable接口的線程類,在線程啟動的時候processMessage方法被調(diào)用。當我們需要別的需要處理日志內(nèi)容時,就可以繼承BlockingConsumer寫自己的processMessage來處理日志了。
          5、下面,讓我們看看這個線程類是怎么啟動的吧??匆幌翨lockingConsumer就知道,它其實還繼承于另外一個類QueueConsumer:
          /**
           * Copyright (c) 2005-2009 springside.org.cn
           *
           * Licensed under the Apache License, Version 2.0 (the "License");
           * 
           * $Id$
           
          */

          package org.springside.examples.showcase.queue;

          import java.io.BufferedInputStream;
          import java.io.BufferedOutputStream;
          import java.io.EOFException;
          import java.io.File;
          import java.io.FileInputStream;
          import java.io.FileOutputStream;
          import java.io.IOException;
          import java.io.ObjectInputStream;
          import java.io.ObjectOutputStream;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.TimeUnit;

          import javax.annotation.PostConstruct;
          import javax.annotation.PreDestroy;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springside.modules.utils.ThreadUtils;
          import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;

          /**
           * 單線程消費Queue中消息的任務基類.
           * 
           * 定義了QueueConsumer的啟動關(guān)閉流程.
           * 
           * TODO:支持多線程執(zhí)行.
           * 
           * 
          @see QueuesHolder
           * 
           * 
          @author calvin
           
          */

          @SuppressWarnings(
          "unchecked")
          public abstract class QueueConsumer implements Runnable {

              
          protected Logger logger = LoggerFactory.getLogger(getClass());

              
          protected String queueName;
              
          protected int shutdownTimeout = Integer.MAX_VALUE;

              
          protected boolean persistence = true;
              
          protected String persistencePath = System.getProperty("java.io.tmpdir"+ File.separator + "queue";
              
          protected Object persistenceLock = new Object(); //用于在backup與restore間等待的鎖.

              
          protected BlockingQueue queue;
              
          protected ExecutorService executor;

              
          /**
               * 任務所消費的隊列名稱.
               
          */

              
          public void setQueueName(String queueName) {
                  
          this.queueName = queueName;
              }


              
          /**
               * 停止任務時最多等待的時間, 單位為毫秒.
               
          */

              
          public void setShutdownTimeout(int shutdownTimeout) {
                  
          this.shutdownTimeout = shutdownTimeout;
              }


              
          /**
               * 在JVM關(guān)閉時是否需要持久化未完成的消息到文件.
               
          */

              
          public void setPersistence(boolean persistence) {
                  
          this.persistence = persistence;
              }


              
          /**
               * 系統(tǒng)關(guān)閉時將隊列中未處理的消息持久化到文件的目錄,默認為系統(tǒng)臨時文件夾下的queue目錄.
               
          */

              
          public void setPersistencePath(String persistencePath) {
                  
          this.persistencePath = persistencePath;
              }


              
          /**
               * 任務初始化函數(shù).
               
          */

              @PostConstruct
              
          public void start() throws IOException, ClassNotFoundException, InterruptedException {

                  queue 
          = QueuesHolder.getQueue(queueName);

                  
          if (persistence) {
                      
          synchronized (persistenceLock) {
                          restoreQueue();
                      }

                  }


                  executor 
          = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
                  executor.execute(
          this);
              }


              
          /**
               * 任務結(jié)束函數(shù).
               
          */

              @PreDestroy
              
          public void stop() throws IOException {
                  
          try {
                      ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);
                  }
           finally {
                      
          if (persistence) {
                          
          synchronized (persistenceLock) {
                              backupQueue();
                          }

                      }

                  }


              }


              
          /**
               * 保存隊列中的消息到文件.
               
          */

              
          protected void backupQueue() throws IOException {
                  List list 
          = new ArrayList();
                  queue.drainTo(list);

                  
          if (!list.isEmpty()) {
                      ObjectOutputStream oos 
          = null;
                      
          try {
                          File file 
          = new File(getPersistenceDir(), getPersistenceFileName());
                          oos 
          = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
                          
          for (Object message : list) {
                              oos.writeObject(message);
                          }

                          logger.info(
          "隊列{}已持久化{}個消息到{}"new Object[] { queueName, list.size(), file.getAbsolutePath() });
                      }
           finally {
                          
          if (oos != null{
                              oos.close();
                          }

                      }

                  }
           else {
                      logger.debug(
          "隊列{}為空,不需要持久化 .", queueName);
                  }

              }


              
          /**
               * 載入持久化文件中的消息到隊列.
               
          */

              
          protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException {
                  ObjectInputStream ois 
          = null;
                  File file 
          = new File(getPersistenceDir(), getPersistenceFileName());

                  
          if (file.exists()) {
                      
          try {
                          ois 
          = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
                          
          int count = 0;
                          
          while (true{
                              
          try {
                                  Object message 
          = ois.readObject();
                                  queue.put(message);
                                  count
          ++;
                              }
           catch (EOFException e) {
                                  
          break;
                              }

                          }

                          logger.info(
          "隊列{}已從{}中恢復{}個消息."new Object[] { queueName, file.getAbsolutePath(), count });
                      }
           finally {
                          
          if (ois != null{
                              ois.close();
                          }

                      }

                      file.delete();
                  }
           else {
                      logger.debug(
          "隊列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
                  }

              }


              
          /**
               * 獲取持久化文件路徑.
               * 持久化文件默認路徑為java.io.tmpdir/queue/隊列名.
               * 如果java.io.tmpdir/queue/目錄不存在,會進行創(chuàng)建.
               
          */

              
          protected File getPersistenceDir() {
                  File parentDir 
          = new File(persistencePath + File.separator);
                  
          if (!parentDir.exists()) {
                      parentDir.mkdirs();
                  }

                  
          return parentDir;
              }


              
          /**
               * 獲取持久化文件的名稱,默認為queueName,可重載.
               
          */

              
          protected String getPersistenceFileName() {
                  
          return queueName;
              }

          }

          這里終于可以確信JdbcLogWriter是一個實現(xiàn)了Runnable的線程類了。我們先略過那些保存日志到文件的方法,關(guān)注它的啟動方法start()。在start方法中,用到了concurrent包的Executors來執(zhí)行線程任務。所以整個的過程是這樣的:
          1、spring隨應用啟動,創(chuàng)建QueuesHolder靜態(tài)類用于存放多種queueName的日志queue;創(chuàng)建JdbcLogWriter開始啟動線程,不停循環(huán)處理日志。
          2、log4j隨應用啟動,并產(chǎn)生日志,把日志存到queue中(使用offer方法)。
          3、JdbcLogWriter不停的把日志從queue中移出來(使用take方法)。
          3、每當有10條日志生成,JdbcLogWriter的updateBatch方法就把日志批量入庫,這個工作在processMesage方法里面。

          這就是springside日志入庫的整個過程了,茲以為記。
          4、



          我的微博 http://t.sina.com.cn/1401900445

          posted on 2011-02-13 21:20 月掛夜中央 閱讀(2215) 評論(0)  編輯  收藏 所屬分類: java咖啡杯

          主站蜘蛛池模板: 定襄县| 蓬溪县| 桑植县| 日照市| 抚远县| 隆德县| 西藏| 韶山市| 宾川县| 望都县| 精河县| 宁化县| 深圳市| 泾川县| 平罗县| 枣强县| 太湖县| 大丰市| 南陵县| 南京市| 当涂县| 友谊县| 景宁| SHOW| 拜城县| 澄迈县| 东乡县| 清水县| 长宁区| 河西区| 平山县| 康乐县| 阳谷县| 务川| 株洲县| 翼城县| 田东县| 阿图什市| 济阳县| 祁东县| 年辖:市辖区|