最近在做一個(gè)銀行的生產(chǎn)數(shù)據(jù)脫敏系統(tǒng),今天寫代碼時(shí)遇到了一個(gè)“瓶頸”,脫敏系統(tǒng)需要將生產(chǎn)環(huán)境上Infoxmix里的數(shù)據(jù)原封不動(dòng)的Copy到另一臺(tái) Oracle數(shù)據(jù)庫服務(wù)器上,然后對(duì)Copy后的數(shù)據(jù)作些漂白處理。為了將人為干預(yù)的因素降到最低,在系統(tǒng)設(shè)計(jì)時(shí)采用Java代碼對(duì)數(shù)據(jù)作Copy,思路 如圖:

首 先在代碼與生產(chǎn)庫間建立一個(gè)Connection,將讀取到的數(shù)據(jù)放在ResultSet對(duì)象,然后再與開發(fā)庫建立一個(gè)Connection。從 ResultSet取出數(shù)據(jù)后通過TestConnection插入到開發(fā)庫,以此來實(shí)現(xiàn)Copy。代碼寫完后運(yùn)行程序,速度太慢了,一秒鐘只能Copy 一千條數(shù)據(jù),生產(chǎn)庫上有上億條數(shù)據(jù),按照這個(gè)速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個(gè)人干活總比一個(gè)人干活速度要快。
假設(shè)生產(chǎn)庫有1萬條數(shù)據(jù),我開5個(gè)線程,每個(gè)線程分2000條數(shù)據(jù),同時(shí)向開發(fā)庫里插數(shù)據(jù),Oracle支持高并發(fā)這樣的話速度至少會(huì)提高好多倍,按照這 個(gè)思路重新進(jìn)行了編碼,批處理設(shè)置為1萬條一提交,統(tǒng)計(jì)插入數(shù)量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運(yùn)行,傳輸速度飛快CPU利用率在70%~90%,現(xiàn)在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數(shù)據(jù)一條不落地全部Copy到目標(biāo)庫。
在查詢的時(shí)候我用了如下語句
實(shí)習(xí)生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內(nèi)存不溢出了么?Java在設(shè)計(jì)的時(shí)候已經(jīng)考慮到這個(gè)問題了,并沒有查詢出所有的數(shù)據(jù),而是只查詢了一部分?jǐn)?shù)據(jù)放到ResultSet,數(shù)據(jù)“用完”它 會(huì)自動(dòng)查詢下一批數(shù)據(jù),你可以用setFetchSize(int rows)方法設(shè)置一個(gè)建議值給ResultSet,告訴它每次從數(shù)據(jù)庫Fetch多少條數(shù)據(jù)。但我不贊成,因?yàn)镴DBC驅(qū)動(dòng)會(huì)根據(jù)實(shí)際情況自動(dòng)調(diào)整 Fetch的數(shù)量。另外性能也與網(wǎng)線的帶寬有直接的關(guān)系。
相關(guān)代碼

首 先在代碼與生產(chǎn)庫間建立一個(gè)Connection,將讀取到的數(shù)據(jù)放在ResultSet對(duì)象,然后再與開發(fā)庫建立一個(gè)Connection。從 ResultSet取出數(shù)據(jù)后通過TestConnection插入到開發(fā)庫,以此來實(shí)現(xiàn)Copy。代碼寫完后運(yùn)行程序,速度太慢了,一秒鐘只能Copy 一千條數(shù)據(jù),生產(chǎn)庫上有上億條數(shù)據(jù),按照這個(gè)速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個(gè)人干活總比一個(gè)人干活速度要快。
假設(shè)生產(chǎn)庫有1萬條數(shù)據(jù),我開5個(gè)線程,每個(gè)線程分2000條數(shù)據(jù),同時(shí)向開發(fā)庫里插數(shù)據(jù),Oracle支持高并發(fā)這樣的話速度至少會(huì)提高好多倍,按照這 個(gè)思路重新進(jìn)行了編碼,批處理設(shè)置為1萬條一提交,統(tǒng)計(jì)插入數(shù)量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運(yùn)行,傳輸速度飛快CPU利用率在70%~90%,現(xiàn)在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數(shù)據(jù)一條不落地全部Copy到目標(biāo)庫。
在查詢的時(shí)候我用了如下語句
- String queryStr = "SELECT * FROM xx";
- ResultSet coreRs = PreparedStatement.executeQuery(queryStr);
實(shí)習(xí)生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內(nèi)存不溢出了么?Java在設(shè)計(jì)的時(shí)候已經(jīng)考慮到這個(gè)問題了,并沒有查詢出所有的數(shù)據(jù),而是只查詢了一部分?jǐn)?shù)據(jù)放到ResultSet,數(shù)據(jù)“用完”它 會(huì)自動(dòng)查詢下一批數(shù)據(jù),你可以用setFetchSize(int rows)方法設(shè)置一個(gè)建議值給ResultSet,告訴它每次從數(shù)據(jù)庫Fetch多少條數(shù)據(jù)。但我不贊成,因?yàn)镴DBC驅(qū)動(dòng)會(huì)根據(jù)實(shí)際情況自動(dòng)調(diào)整 Fetch的數(shù)量。另外性能也與網(wǎng)線的帶寬有直接的關(guān)系。
相關(guān)代碼
1 package com.dlbank.domain;
2
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.Statement;
7 import java.util.List;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.apache.log4j.Logger;
11
12 /**
13 *<p>title: 數(shù)據(jù)同步類 </p>
14 *<p>Description: 該類用于將生產(chǎn)核心庫數(shù)據(jù)同步到開發(fā)庫</p>
15 *@author Tank Zhang
16 */
17 public class CoreDataSyncImpl implements CoreDataSync {
18
19 private List<String> coreTBNames; //要同步的核心庫表名
20 private ConnectionFactory connectionFactory;
21 private Logger log = Logger.getLogger(getClass());
22
23 private AtomicLong currentSynCount = new AtomicLong(0L); //當(dāng)前已同步的條數(shù)
24
25 private int syncThreadNum; //同步的線程數(shù)
26
27 @Override
28 public void syncData(int businessType) throws Exception {
29
30 for (String tmpTBName : coreTBNames) {
31 log.info("開始同步核心庫" + tmpTBName + "表數(shù)據(jù)");
32 // 獲得核心庫連接
33 Connection coreConnection = connectionFactory.getDMSConnection(4);
34 Statement coreStmt = coreConnection.createStatement();
35 //為每個(gè)線程分配結(jié)果集
36 ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
37 coreRs.next();
38 //總共處理的數(shù)量
39 long totalNum = coreRs.getLong(1);
40 //每個(gè)線程處理的數(shù)量
41 long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
42 log.info("共需要同步的數(shù)據(jù)量:"+totalNum);
43 log.info("同步線程數(shù)量:"+syncThreadNum);
44 log.info("每個(gè)線程可處理的數(shù)量:"+ownerRecordNum);
45 // 開啟五個(gè)線程向目標(biāo)庫同步數(shù)據(jù)
46 for(int i=0; i < syncThreadNum; i ++){
47 StringBuilder sqlBuilder = new StringBuilder();
48 //拼裝后SQL示例
49 //Select * From dms_core_ds Where id between 1 And 657398
50 //Select * From dms_core_ds Where id between 657399 And 1314796
51 //Select * From dms_core_ds Where id between 1314797 And 1972194
52 //Select * From dms_core_ds Where id between 1972195 And 2629592
53 //Select * From dms_core_ds Where id between 2629593 And 3286990
54 //..
55 sqlBuilder.append("Select * From ").append(tmpTBName)
56 .append(" Where id between " ).append(i * ownerRecordNum +1)
57 .append( " And ")
58 .append((i * ownerRecordNum + ownerRecordNum));
59 Thread workThread = new Thread(
60 new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
61 workThread.setName("SyncThread-"+i);
62 workThread.start();
63 }
64 while (currentSynCount.get() < totalNum);
65 //休眠一會(huì)兒讓數(shù)據(jù)庫有機(jī)會(huì)commit剩余的批處理(只針對(duì)JUnit單元測試,因?yàn)閱卧獪y試完成后會(huì)關(guān)閉虛擬器,使線程里的代碼沒有機(jī)會(huì)作提交操作);
66 //Thread.sleep(1000 * 3);
67 log.info( "核心庫"+tmpTBName+"表數(shù)據(jù)同步完成,共同步了" + currentSynCount.get() + "條數(shù)據(jù)");
68 }
69 }// end for loop
70
71 public void setCoreTBNames(List<String> coreTBNames) {
72 this.coreTBNames = coreTBNames;
73 }
74
75 public void setConnectionFactory(ConnectionFactory connectionFactory) {
76 this.connectionFactory = connectionFactory;
77 }
78
79 public void setSyncThreadNum(int syncThreadNum) {
80 this.syncThreadNum = syncThreadNum;
81 }
82
83 //數(shù)據(jù)同步線程
84 final class WorkerHandler implements Runnable {
85 ResultSet coreRs;
86 String queryStr;
87 int businessType;
88 String targetTBName;
89 public WorkerHandler(String queryStr,int businessType,String targetTBName) {
90 this.queryStr = queryStr;
91 this.businessType = businessType;
92 this.targetTBName = targetTBName;
93 }
94 @Override
95 public void run() {
96 try {
97 //開始同步
98 launchSyncData();
99 } catch(Exception e){
100 log.error(e);
101 e.printStackTrace();
102 }
103 }
104 //同步數(shù)據(jù)方法
105 void launchSyncData() throws Exception{
106 // 獲得核心庫連接
107 Connection coreConnection = connectionFactory.getDMSConnection(4);
108 Statement coreStmt = coreConnection.createStatement();
109 // 獲得目標(biāo)庫連接
110 Connection targetConn = connectionFactory.getDMSConnection(businessType);
111 targetConn.setAutoCommit(false);// 設(shè)置手動(dòng)提交
112 PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
113 ResultSet coreRs = coreStmt.executeQuery(queryStr);
114 log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
115 int batchCounter = 0; //累加的批處理數(shù)量
116 while (coreRs.next()) {
117 targetPstmt.setString(1, coreRs.getString(2));
118 targetPstmt.setString(2, coreRs.getString(3));
119 targetPstmt.setString(3, coreRs.getString(4));
120 targetPstmt.setString(4, coreRs.getString(5));
121 targetPstmt.setString(5, coreRs.getString(6));
122 targetPstmt.addBatch();
123 batchCounter++;
124 currentSynCount.incrementAndGet();//遞增
125 if (batchCounter % 10000 == 0) { //1萬條數(shù)據(jù)一提交
126 targetPstmt.executeBatch();
127 targetPstmt.clearBatch();
128 targetConn.commit();
129 }
130 }
131 //提交剩余的批處理
132 targetPstmt.executeBatch();
133 targetPstmt.clearBatch();
134 targetConn.commit();
135 //釋放連接
136 connectionFactory.release(targetConn, targetPstmt,coreRs);
137 }
138 }
139 }
2
3 import java.sql.Connection;
4 import java.sql.PreparedStatement;
5 import java.sql.ResultSet;
6 import java.sql.Statement;
7 import java.util.List;
8 import java.util.concurrent.atomic.AtomicLong;
9
10 import org.apache.log4j.Logger;
11
12 /**
13 *<p>title: 數(shù)據(jù)同步類 </p>
14 *<p>Description: 該類用于將生產(chǎn)核心庫數(shù)據(jù)同步到開發(fā)庫</p>
15 *@author Tank Zhang
16 */
17 public class CoreDataSyncImpl implements CoreDataSync {
18
19 private List<String> coreTBNames; //要同步的核心庫表名
20 private ConnectionFactory connectionFactory;
21 private Logger log = Logger.getLogger(getClass());
22
23 private AtomicLong currentSynCount = new AtomicLong(0L); //當(dāng)前已同步的條數(shù)
24
25 private int syncThreadNum; //同步的線程數(shù)
26
27 @Override
28 public void syncData(int businessType) throws Exception {
29
30 for (String tmpTBName : coreTBNames) {
31 log.info("開始同步核心庫" + tmpTBName + "表數(shù)據(jù)");
32 // 獲得核心庫連接
33 Connection coreConnection = connectionFactory.getDMSConnection(4);
34 Statement coreStmt = coreConnection.createStatement();
35 //為每個(gè)線程分配結(jié)果集
36 ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);
37 coreRs.next();
38 //總共處理的數(shù)量
39 long totalNum = coreRs.getLong(1);
40 //每個(gè)線程處理的數(shù)量
41 long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));
42 log.info("共需要同步的數(shù)據(jù)量:"+totalNum);
43 log.info("同步線程數(shù)量:"+syncThreadNum);
44 log.info("每個(gè)線程可處理的數(shù)量:"+ownerRecordNum);
45 // 開啟五個(gè)線程向目標(biāo)庫同步數(shù)據(jù)
46 for(int i=0; i < syncThreadNum; i ++){
47 StringBuilder sqlBuilder = new StringBuilder();
48 //拼裝后SQL示例
49 //Select * From dms_core_ds Where id between 1 And 657398
50 //Select * From dms_core_ds Where id between 657399 And 1314796
51 //Select * From dms_core_ds Where id between 1314797 And 1972194
52 //Select * From dms_core_ds Where id between 1972195 And 2629592
53 //Select * From dms_core_ds Where id between 2629593 And 3286990
54 //..
55 sqlBuilder.append("Select * From ").append(tmpTBName)
56 .append(" Where id between " ).append(i * ownerRecordNum +1)
57 .append( " And ")
58 .append((i * ownerRecordNum + ownerRecordNum));
59 Thread workThread = new Thread(
60 new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));
61 workThread.setName("SyncThread-"+i);
62 workThread.start();
63 }
64 while (currentSynCount.get() < totalNum);
65 //休眠一會(huì)兒讓數(shù)據(jù)庫有機(jī)會(huì)commit剩余的批處理(只針對(duì)JUnit單元測試,因?yàn)閱卧獪y試完成后會(huì)關(guān)閉虛擬器,使線程里的代碼沒有機(jī)會(huì)作提交操作);
66 //Thread.sleep(1000 * 3);
67 log.info( "核心庫"+tmpTBName+"表數(shù)據(jù)同步完成,共同步了" + currentSynCount.get() + "條數(shù)據(jù)");
68 }
69 }// end for loop
70
71 public void setCoreTBNames(List<String> coreTBNames) {
72 this.coreTBNames = coreTBNames;
73 }
74
75 public void setConnectionFactory(ConnectionFactory connectionFactory) {
76 this.connectionFactory = connectionFactory;
77 }
78
79 public void setSyncThreadNum(int syncThreadNum) {
80 this.syncThreadNum = syncThreadNum;
81 }
82
83 //數(shù)據(jù)同步線程
84 final class WorkerHandler implements Runnable {
85 ResultSet coreRs;
86 String queryStr;
87 int businessType;
88 String targetTBName;
89 public WorkerHandler(String queryStr,int businessType,String targetTBName) {
90 this.queryStr = queryStr;
91 this.businessType = businessType;
92 this.targetTBName = targetTBName;
93 }
94 @Override
95 public void run() {
96 try {
97 //開始同步
98 launchSyncData();
99 } catch(Exception e){
100 log.error(e);
101 e.printStackTrace();
102 }
103 }
104 //同步數(shù)據(jù)方法
105 void launchSyncData() throws Exception{
106 // 獲得核心庫連接
107 Connection coreConnection = connectionFactory.getDMSConnection(4);
108 Statement coreStmt = coreConnection.createStatement();
109 // 獲得目標(biāo)庫連接
110 Connection targetConn = connectionFactory.getDMSConnection(businessType);
111 targetConn.setAutoCommit(false);// 設(shè)置手動(dòng)提交
112 PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");
113 ResultSet coreRs = coreStmt.executeQuery(queryStr);
114 log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);
115 int batchCounter = 0; //累加的批處理數(shù)量
116 while (coreRs.next()) {
117 targetPstmt.setString(1, coreRs.getString(2));
118 targetPstmt.setString(2, coreRs.getString(3));
119 targetPstmt.setString(3, coreRs.getString(4));
120 targetPstmt.setString(4, coreRs.getString(5));
121 targetPstmt.setString(5, coreRs.getString(6));
122 targetPstmt.addBatch();
123 batchCounter++;
124 currentSynCount.incrementAndGet();//遞增
125 if (batchCounter % 10000 == 0) { //1萬條數(shù)據(jù)一提交
126 targetPstmt.executeBatch();
127 targetPstmt.clearBatch();
128 targetConn.commit();
129 }
130 }
131 //提交剩余的批處理
132 targetPstmt.executeBatch();
133 targetPstmt.clearBatch();
134 targetConn.commit();
135 //釋放連接
136 connectionFactory.release(targetConn, targetPstmt,coreRs);
137 }
138 }
139 }