JAVA—咖啡館

          ——?dú)g迎訪問rogerfan的博客,常來《JAVA——咖啡館》坐坐,喝杯濃香的咖啡,彼此探討一下JAVA技術(shù),交流工作經(jīng)驗(yàn),分享JAVA帶來的快樂!本網(wǎng)站部分轉(zhuǎn)載文章,如果有版權(quán)問題請(qǐng)與我聯(lián)系。

          BlogJava 首頁 新隨筆 聯(lián)系 聚合 管理
            447 Posts :: 145 Stories :: 368 Comments :: 0 Trackbacks
          最近在做一個(gè)銀行的生產(chǎn)數(shù)據(jù)脫敏系統(tǒng),今天寫代碼時(shí)遇到了一個(gè)“瓶頸”,脫敏系統(tǒng)需要將生產(chǎn)環(huán)境上Infoxmix里的數(shù)據(jù)原封不動(dòng)的Copy到另一臺(tái) Oracle數(shù)據(jù)庫服務(wù)器上,然后對(duì)Copy后的數(shù)據(jù)作些漂白處理。為了將人為干預(yù)的因素降到最低,在系統(tǒng)設(shè)計(jì)時(shí)采用Java代碼對(duì)數(shù)據(jù)作Copy,思路 如圖:



              首 先在代碼與生產(chǎn)庫間建立一個(gè)Connection,將讀取到的數(shù)據(jù)放在ResultSet對(duì)象,然后再與開發(fā)庫建立一個(gè)Connection。從 ResultSet取出數(shù)據(jù)后通過TestConnection插入到開發(fā)庫,以此來實(shí)現(xiàn)Copy。代碼寫完后運(yùn)行程序,速度太慢了,一秒鐘只能Copy 一千條數(shù)據(jù),生產(chǎn)庫上有上億條數(shù)據(jù),按照這個(gè)速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個(gè)人干活總比一個(gè)人干活速度要快。
              假設(shè)生產(chǎn)庫有1萬條數(shù)據(jù),我開5個(gè)線程,每個(gè)線程分2000條數(shù)據(jù),同時(shí)向開發(fā)庫里插數(shù)據(jù),Oracle支持高并發(fā)這樣的話速度至少會(huì)提高好多倍,按照這 個(gè)思路重新進(jìn)行了編碼,批處理設(shè)置為1萬條一提交,統(tǒng)計(jì)插入數(shù)量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運(yùn)行,傳輸速度飛快CPU利用率在70%~90%,現(xiàn)在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數(shù)據(jù)一條不落地全部Copy到目標(biāo)庫。

          在查詢的時(shí)候我用了如下語句
          1. String queryStr = "SELECT * FROM xx";  
          2. ResultSet coreRs = PreparedStatement.executeQuery(queryStr);  

          實(shí)習(xí)生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內(nèi)存不溢出了么?Java在設(shè)計(jì)的時(shí)候已經(jīng)考慮到這個(gè)問題了,并沒有查詢出所有的數(shù)據(jù),而是只查詢了一部分?jǐn)?shù)據(jù)放到ResultSet,數(shù)據(jù)“用完”它 會(huì)自動(dòng)查詢下一批數(shù)據(jù),你可以用setFetchSize(int rows)方法設(shè)置一個(gè)建議值給ResultSet,告訴它每次從數(shù)據(jù)庫Fetch多少條數(shù)據(jù)。但我不贊成,因?yàn)镴DBC驅(qū)動(dòng)會(huì)根據(jù)實(shí)際情況自動(dòng)調(diào)整 Fetch的數(shù)量。另外性能也與網(wǎng)線的帶寬有直接的關(guān)系。
          相關(guān)代碼
            1 package com.dlbank.domain;  
            2   
            3 import java.sql.Connection;  
            4 import java.sql.PreparedStatement;  
            5 import java.sql.ResultSet;  
            6 import java.sql.Statement;  
            7 import java.util.List;  
            8 import java.util.concurrent.atomic.AtomicLong;  
            9   
           10 import org.apache.log4j.Logger;  
           11   
           12 /** 
           13  *<p>title: 數(shù)據(jù)同步類 </p>   
           14  *<p>Description: 該類用于將生產(chǎn)核心庫數(shù)據(jù)同步到開發(fā)庫</p>   
           15  *@author Tank Zhang  
           16  */  
           17 public class CoreDataSyncImpl implements CoreDataSync {  
           18       
           19     private List<String> coreTBNames; //要同步的核心庫表名  
           20     private ConnectionFactory connectionFactory;  
           21     private Logger log = Logger.getLogger(getClass());  
           22       
           23     private AtomicLong currentSynCount = new AtomicLong(0L); //當(dāng)前已同步的條數(shù)  
           24       
           25     private int syncThreadNum;  //同步的線程數(shù)  
           26   
           27     @Override  
           28     public void syncData(int businessType) throws Exception {  
           29           
           30         for (String tmpTBName : coreTBNames) {  
           31             log.info("開始同步核心庫" + tmpTBName + "表數(shù)據(jù)");  
           32             // 獲得核心庫連接  
           33             Connection coreConnection = connectionFactory.getDMSConnection(4);  
           34             Statement coreStmt = coreConnection.createStatement();  
           35             //為每個(gè)線程分配結(jié)果集  
           36             ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
           37             coreRs.next();  
           38             //總共處理的數(shù)量  
           39             long totalNum = coreRs.getLong(1);  
           40             //每個(gè)線程處理的數(shù)量  
           41             long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
           42             log.info("共需要同步的數(shù)據(jù)量:"+totalNum);  
           43             log.info("同步線程數(shù)量:"+syncThreadNum);  
           44             log.info("每個(gè)線程可處理的數(shù)量:"+ownerRecordNum);  
           45             // 開啟五個(gè)線程向目標(biāo)庫同步數(shù)據(jù)  
           46             for(int i=0; i < syncThreadNum; i ++){  
           47                 StringBuilder sqlBuilder = new StringBuilder();  
           48                 //拼裝后SQL示例  
           49                 //Select * From dms_core_ds Where id between 1 And 657398  
           50                 //Select * From dms_core_ds Where id between 657399 And 1314796  
           51                 //Select * From dms_core_ds Where id between 1314797 And 1972194  
           52                 //Select * From dms_core_ds Where id between 1972195 And 2629592  
           53                 //Select * From dms_core_ds Where id between 2629593 And 3286990  
           54                 //..  
           55                 sqlBuilder.append("Select * From ").append(tmpTBName)  
           56                         .append(" Where id between " ).append(i * ownerRecordNum +1)  
           57                         .append( " And ")  
           58                         .append((i * ownerRecordNum + ownerRecordNum));  
           59                 Thread workThread = new Thread(  
           60                         new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
           61                 workThread.setName("SyncThread-"+i);  
           62                 workThread.start();  
           63             }  
           64             while (currentSynCount.get() < totalNum);  
           65             //休眠一會(huì)兒讓數(shù)據(jù)庫有機(jī)會(huì)commit剩余的批處理(只針對(duì)JUnit單元測試,因?yàn)閱卧獪y試完成后會(huì)關(guān)閉虛擬器,使線程里的代碼沒有機(jī)會(huì)作提交操作);  
           66             //Thread.sleep(1000 * 3);  
           67             log.info( "核心庫"+tmpTBName+"表數(shù)據(jù)同步完成,共同步了" + currentSynCount.get() + "條數(shù)據(jù)");  
           68         }  
           69     }// end for loop  
           70       
           71     public void setCoreTBNames(List<String> coreTBNames) {  
           72         this.coreTBNames = coreTBNames;  
           73     }  
           74   
           75     public void setConnectionFactory(ConnectionFactory connectionFactory) {  
           76         this.connectionFactory = connectionFactory;  
           77     }  
           78       
           79     public void setSyncThreadNum(int syncThreadNum) {  
           80         this.syncThreadNum = syncThreadNum;  
           81     }  
           82       
           83     //數(shù)據(jù)同步線程  
           84     final class WorkerHandler implements Runnable {  
           85         ResultSet coreRs;  
           86         String queryStr;  
           87         int businessType;  
           88         String targetTBName;  
           89         public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
           90             this.queryStr = queryStr;  
           91             this.businessType = businessType;  
           92             this.targetTBName = targetTBName;  
           93         }  
           94         @Override  
           95         public void run() {  
           96             try {  
           97                 //開始同步  
           98                 launchSyncData();  
           99             } catch(Exception e){  
          100                 log.error(e);  
          101                 e.printStackTrace();  
          102             }  
          103         }  
          104         //同步數(shù)據(jù)方法  
          105         void launchSyncData() throws Exception{  
          106             // 獲得核心庫連接  
          107             Connection coreConnection = connectionFactory.getDMSConnection(4);  
          108             Statement coreStmt = coreConnection.createStatement();  
          109             // 獲得目標(biāo)庫連接  
          110             Connection targetConn = connectionFactory.getDMSConnection(businessType);  
          111             targetConn.setAutoCommit(false);// 設(shè)置手動(dòng)提交  
          112             PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
          113             ResultSet coreRs = coreStmt.executeQuery(queryStr);  
          114             log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
          115             int batchCounter = 0//累加的批處理數(shù)量  
          116             while (coreRs.next()) {  
          117                 targetPstmt.setString(1, coreRs.getString(2));  
          118                 targetPstmt.setString(2, coreRs.getString(3));  
          119                 targetPstmt.setString(3, coreRs.getString(4));  
          120                 targetPstmt.setString(4, coreRs.getString(5));  
          121                 targetPstmt.setString(5, coreRs.getString(6));  
          122                 targetPstmt.addBatch();  
          123                 batchCounter++;  
          124                 currentSynCount.incrementAndGet();//遞增  
          125                 if (batchCounter % 10000 == 0) { //1萬條數(shù)據(jù)一提交  
          126                     targetPstmt.executeBatch();  
          127                     targetPstmt.clearBatch();  
          128                     targetConn.commit();  
          129                 }  
          130             }  
          131             //提交剩余的批處理  
          132             targetPstmt.executeBatch();  
          133             targetPstmt.clearBatch();  
          134             targetConn.commit();  
          135             //釋放連接   
          136             connectionFactory.release(targetConn, targetPstmt,coreRs);  
          137         }  
          138     }  
          139 }  

          posted on 2010-11-26 14:11 rogerfan 閱讀(987) 評(píng)論(0)  編輯  收藏 所屬分類: 【Java知識(shí)】
          主站蜘蛛池模板: 鄂温| 玉溪市| 安国市| 连城县| 绥阳县| 上蔡县| 辽阳市| 比如县| 台安县| 黄平县| 安达市| 宁陵县| 阿图什市| 莱芜市| 镇原县| 永宁县| 武平县| 东平县| 宣城市| 额尔古纳市| 甘洛县| 山西省| 托克逊县| 武清区| 张掖市| 渝中区| 绥芬河市| 枣强县| 老河口市| 尖扎县| 长泰县| 新绛县| 双峰县| 泰兴市| 黄陵县| 灵丘县| 葫芦岛市| 新安县| 苍溪县| 凤冈县| 亚东县|