[IBM]構(gòu)建輕量級 Batch 框架處理 DB2 Content Manager 8.3 大量數(shù)據(jù)導(dǎo)入
本文介紹了如何使用多線程來構(gòu)建輕量級 Batch 框架,將大量的數(shù)據(jù)遷移到 IBM DB2 Content Manager 8.3 中。通過本文的學(xué)習(xí),讀者可以了解如何通過使用多線程調(diào)用 IBM DB2 Content Manager API 構(gòu)建的框架來啟動,暫停,恢復(fù),停止,放緩等操作。
在用 API 導(dǎo)入大量數(shù)據(jù)的過程中,如果沒有框架很難有效的對整個(gè)過程控制,僅僅通過日志來分析解決問題總是很浪費(fèi)時(shí)間,并且效率不太理想。
本文的內(nèi)容放在了如何使用多線程和配置文件來構(gòu)建 Batch 框架來處理大數(shù)量導(dǎo)入的問題。
隨著 IBM DB2 Content Manager(簡稱 IBM CM)產(chǎn)品的不斷成熟,越來越多的內(nèi)容管理系統(tǒng)需要遷移到 IBM CM 中來,這些需要遷移的數(shù)據(jù)通常首先把結(jié)構(gòu)化的內(nèi)容導(dǎo)到文本文件中,與之相對應(yīng)的圖像和 pdf 文件通常放在對應(yīng)的文件夾中,圖像和 pdf 對應(yīng)的文件夾路徑也通常存放在文本文件中,然后遷移程序遍歷文本文件,把對應(yīng)的 Item 遷移到 IBM CM 中。這些需要遷移的數(shù)據(jù)通常都有幾百 G,如何有效的控制遷移過程是一個(gè)很大的挑戰(zhàn),因此我們必須構(gòu)建一個(gè)輕量級的 batch 處理框架來控制整個(gè)數(shù)據(jù)的遷移周期,記錄處理過程中的錯(cuò)誤,保證數(shù)據(jù)的一致性。
同時(shí),在用 API 導(dǎo)入數(shù)據(jù)的過程中,被導(dǎo)入數(shù)據(jù)總是千邊萬化,無效的映射導(dǎo)入數(shù)據(jù)和 DB2 Content Manager 的項(xiàng),導(dǎo)致工作變得復(fù)雜,同時(shí)使的設(shè)計(jì)和代碼冗余,并且使重用,維護(hù)和擴(kuò)展履步為艱難。
為了克服所提到的挑戰(zhàn),這個(gè) batch 框架必須要有以下功能:
- 用戶出于不影響生產(chǎn)環(huán)境性能的考慮,可以暫時(shí)停止數(shù)據(jù)的遷移,或者減緩遷移處理的頻率,即框架必須具有 suspend 和 slowdown 功能。
- 用戶可以讓暫停處理的系統(tǒng)繼續(xù)處理,即框架必須具有 resume 功能。
- 用戶可以讓系統(tǒng)停止處理,修改某些配置,然后繼續(xù)處理,即框架必須有 re-start 功能。
- 處理過程中發(fā)生的錯(cuò)誤,警告系統(tǒng)必須記錄下來,用戶可以根據(jù)這些記錄來修正數(shù)據(jù)。
- 通過配置文件建立規(guī)則來解決數(shù)據(jù)千邊萬化的問題。
要使框架有交互性,我們必須有三個(gè)個(gè)線程:客戶端線程,服務(wù)端線程,工作線程。客戶端線程負(fù)責(zé)發(fā)出工作指令,服務(wù)端線程接受這些指令并調(diào)用工作線程來做實(shí)際的工作。對于客戶端和服務(wù)器交互,在沒有 web 服務(wù)器支持的情況下,我們可以采用一種古老但是很有效的做法:socket 編程。 Java socket 對象的 accept 方法會一直阻塞直到客戶端有程序輸入,當(dāng)客戶端有新的命令輸入的時(shí)候,服務(wù)器端從 socket 中讀出命令,然后執(zhí)行命令。下面是示例程序,Client.java 代表客戶端程序,Server.java 代表服務(wù)器端程序,Worker.java 代表工作程序 ,Config.java 代表系統(tǒng)中一些參數(shù)配置。
清單 1. 客戶端程序
package com.ibm.batch.sample; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.Socket; import org.apache.log4j.Logger; public class Client { private Config config = null; public void setConfig(Config config) { this.config = config; } private Logger logger = Logger.getLogger(Client.class); public void sendCommand(String command) { Socket socket = null; OutputStream out = null; BufferedWriter writer = null; try { // establish the connection with server. socket = new Socket(config.getHost(), config.getSocketPort()); out = socket.getOutputStream(); writer = new BufferedWriter(new OutputStreamWriter(out)); // send the command to server writer.write(command); writer.flush(); } catch (IOException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } } |
清單 2. 服務(wù)器端程序
package com.ibm.batch.sample; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; import com.ibm.batch.sample.util.ResourceUtils; public class Server { private Config config = null; private boolean processing = true; private Worker worker = null; public void setConfig(Config config) { this.config = config; } public static void main(String[] args) { Server server = new Server(); // create the work thread Worker worker = server.createWorker(args); worker.start(); server.receiveAndExecuteCommand(); } private Worker createWorker(String[] args) { Worker worker = new Worker(); this.worker = worker; return worker; } /** * receive the command from client and execute the command. the method is * keeping running until client send the 'stop' command. * * @throws Exception */ public void receiveAndExecuteCommand() { ServerSocket serverSocket = buildSocketConnection(); // loop until client send 'stop' command while (processing) { Socket socket = null; try { socket = serverSocket.accept(); String commandLine = readCommandFromSocket(socket); executeCommand(commandLine); } catch (Exception e) { throw new RuntimeException(e); } finally { ResourceUtils.closeSocket(socket); } } } private void executeCommand(String commandLine) { // TODO Auto-generated method stub } /** * read the command from the socket * * @param socket * @return */ private String readCommandFromSocket(Socket socket) { InputStream in = null; BufferedReader bufferedReader = null; String commandLine = null; try { in = socket.getInputStream(); bufferedReader = new BufferedReader(new InputStreamReader(in)); commandLine = bufferedReader.readLine(); } catch (IOException e) { throw new RuntimeException(e); } finally { ResourceUtils.closeInputStream(in); ResourceUtils.closeReader(bufferedReader); } return commandLine; } /** * build the socket. * * @return */ private ServerSocket buildSocketConnection() { // prepare the socket for client to connect. ServerSocket serverSocket; try { serverSocket = new ServerSocket(config.getSocketPort()); } catch (java.net.BindException e1) { throw new RuntimeException("Socket port already in use.", e1); } catch (IOException ioe) { throw new RuntimeException(ioe); } return serverSocket; } } |
清單 3. 工作程序
package com.ibm.batch.sample; import org.apache.log4j.Logger; public class Worker extends Thread { Logger logger = Logger.getLogger(Worker.class); /** * the main method for create item function. */ public void run() { createItem(); } /** * do the real job */ private void createItem() { } } |
大數(shù)量的數(shù)據(jù)遷移一般是在周末或者晚上進(jìn)行,但是如果客戶的歷史數(shù)據(jù)太大,在周末或者晚上數(shù)據(jù)可能處理不完,為了不影響生產(chǎn)環(huán)境的性能,我們必須能夠在客戶的工作時(shí)間暫緩處理或者降低處理的頻率,把 cpu 等資源讓給客戶程序,也就是說處理線程 worker 的工作可以 suspend 或者 slowdow 。為了讓 worker 線程知道需要 suspend 當(dāng)前處理,我們可以在 worker 內(nèi)部設(shè)置一個(gè)布爾變量 isSuspend,當(dāng)程序在循環(huán)創(chuàng)建 CM item 的時(shí)候,我們每次都判斷一下這個(gè)布爾變量 isSuspend,當(dāng)其為 ture 的時(shí)候,程序就調(diào)用線程的 wait 方法中斷當(dāng)前線程的處理,wait 方法還可以接受一個(gè)以微秒為單位的時(shí)間參數(shù),當(dāng)時(shí)間到達(dá) wait 指定的時(shí)間的時(shí)候,程序繼續(xù)創(chuàng)建 CM Item 。為了多線程之間的變量可見性,我們必須把 worker 的 isSuspend 變量和 suspendTime 設(shè)置為 volatile 。同理我們設(shè)置一個(gè)布爾變量 isSlowdown 以及 slowdowTime 。示例程序如下:
清單 4. 工作程序
package com.ibm.batch.sample; import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import org.apache.log4j.Logger; import com.ibm.batch.sample.util.ResourceUtils; public class Worker extends Thread { Logger logger = Logger.getLogger(Worker.class); private volatile boolean isSlowdown = false; private volatile Double slowdownTime; private volatile boolean isSuspend; private volatile Double suspendTime; public void setSlowdown(boolean isSlowdown) { this.isSlowdown = isSlowdown; } public void setSlowdownTime(Double slowdownTime) { this.slowdownTime = slowdownTime; } public void setSuspend(boolean isSuspend) { this.isSuspend = isSuspend; } public void setSuspendTime(Double suspendTime) { this.suspendTime = suspendTime; } public boolean isSlowdown() { return isSlowdown; } public Double getSlowdownTime() { return slowdownTime; } public boolean isSuspend() { return isSuspend; } public Double getSuspendTime() { return suspendTime; } protected Object semaphore = new Object(); private Config config; public void setConfig(Config config) { this.config = config; } |
清單 5. 主方法
/** * the main method for create item function. */ public void run() { BufferedReader reader = null; try { reader = getFileReader(); String oneLine = null; while ((oneLine = reader.readLine()) != null) { if (isSlowdown()) { sleep4GivenTime(); } if (isSuspend()) { suspend4SomeTime(); } createItem(oneLine); } } catch (Exception e) { throw new RuntimeException(e); } finally { ResourceUtils.closeReader(reader); } } /** * current thread sleep for some time,the unit is minute. */ protected void sleep4GivenTime() { try { Thread.sleep((long) (slowdownTime.doubleValue() * 1000)); } catch (InterruptedException e) { // do nothing } } |
清單 6.Suspend 方法
/** * suspend working for given time. */ protected void suspend4SomeTime() { synchronized (semaphore) { try { Double suspendTime = getSuspendTime(); if (suspendTime != null) { double suspendTimeDouble = suspendTime.doubleValue() * 60 * 1000; semaphore.wait((long) suspendTimeDouble); } else { semaphore.wait(); } } catch (InterruptedException e) { // tell user that the processing started logger.info("suspend is over,system is continue processing ."); } } } /** * do the real job * * @throws Exception */ private void createItem(String oneLine) throws Exception { } private BufferedReader getFileReader() throws FileNotFoundException { String fileName = config.getFileName(); File processingFile = new File(fileName); BufferedReader reader = new BufferedReader(new FileReader( processingFile)); return reader; } } |
在程序暫停處理以后,我們可以提前終止 suspend,讓框架繼續(xù)處理,也就是框架必須有 resume 功能。我們調(diào)用 Worker.java 對象上的 notify 方法來實(shí)現(xiàn)這個(gè)功能,示例如下:
清單 7.Resume
public class Worker extends Thread { /** * resume the working. */ public void continueWorking() { cleanSuspend(); synchronized (semaphore) { semaphore.notify(); } } } |
有時(shí)候用戶因?yàn)橐恍┰颍ɡ缧薷呐渲梦募┫胪V钩绦虻膱?zhí)行,所以框架必須有 stop 的功能,但是 stop 的時(shí)候我們必須注意記錄程序處理到的行數(shù),這樣客戶再開始執(zhí)行的時(shí)候能夠從上次執(zhí)行的斷點(diǎn)繼續(xù)執(zhí)行,也就是框架具備了 re-start 功能,這是 batch 程序必須具備的一種很重要的功能,re-start 功能有多種實(shí)現(xiàn)方法,我們這里采取一種簡單的方法,在 stop 的時(shí)候,把當(dāng)前處理的記錄到一個(gè)文本文件中去,下次啟動的時(shí)候從上次最后處理的對象開始進(jìn)行處理。所以我們在 Worker.java 中增加一個(gè) keepProcessing 布爾變量,在循環(huán)創(chuàng)建 CM Item 的時(shí)候 , 我們每次都判斷一下這個(gè)值是否為 true,如果為 false 的話,我們就停止循環(huán)處理,在 Worker.java 中還要增加一個(gè) moveReaderToLastProcess 方法,把 reader 重新定向到上次處理點(diǎn)。
清單 8. 停止和重啟
public class Worker extends Thread { private volatile boolean keepProcessing; public boolean isKeepProcessing() { return keepProcessing; } public void setKeepProcessing(boolean keepProcessing) { this.keepProcessing = keepProcessing; } /** * the main method for create item function. */ public void run() { BufferedReader reader = null; try { long lastProcessedRow = config.getLastProcessedRow(); reader = moveReaderToLastProcess(lastProcessedRow); String oneLine = null; connectToCM(); while (((oneLine = reader.readLine()) != null) && isKeepProcessing()) { if (isSlowdown()) { sleep4GivenTime(); } if (isSuspend()) { suspend4SomeTime(); } createItem(oneLine); lastProcessedRow++; } logCurrentProcessingLine(lastProcessedRow); } catch (Exception e) { throw new RuntimeException(e); } finally { ResourceUtils.closeReader(reader); } } private void logCurrentProcessingLine(long lastProcessedRow) { config.setLastProcessedRow(lastProcessedRow); } /** * move current reader position to last process postion * @return * @throws IOException */ private BufferedReader moveReaderToLastProcess(long lastProcessedRow) throws IOException { // get the file reader BufferedReader reader = getFileReader(); // move the reader to the start row -1. int count = 0; while (count < lastProcessedRow-1) { reader.readLine(); count++; } return reader; } } |
剛才我們調(diào)用的 createItem 方法是直接拋出異常的,但是這樣的處理實(shí)際上是錯(cuò)誤的,因?yàn)樵?batch 處理過程中,我們不希望在處理某一個(gè) item 出錯(cuò)導(dǎo)致剩余的 item 不再處理,所以我們在 catch 里面對異常進(jìn)行分類處理,我們 catch 住非檢查異常(runtime exception),通常非檢查異常是不可以恢復(fù)的,所以我們直接拋出,讓程序結(jié)束處理。對于其余的異常,我們只是在日志中記錄下來,并不拋出。在全部處理結(jié)束以后,用戶可以檢查日志來進(jìn)行相應(yīng)的處理。示例代碼如下:
清單 9. 錯(cuò)誤處理
public class Worker extends Thread { /** * do the real job * * @throws Exception */ private void createItem(String oneLine) throws Exception { try { //create the item from one line }catch (RuntimeException e) { throw e; }catch (Exception e) { logger.error(e.getMessage(),e); } } } |
下面的內(nèi)容放在了如何使用配置文件來處理導(dǎo)入的問題。
通過調(diào)用和運(yùn)行 API 來處理數(shù)據(jù)的導(dǎo)入,我們首先定義一個(gè)基本信息的配置文件,用來制定連接的信息,其他配置文件的目錄,工作的目錄等有關(guān)導(dǎo)入需要的參數(shù)。然后定義導(dǎo)入數(shù)據(jù)和 DB2 Content Manager 的項(xiàng)的映射配置文件。配置文件定義結(jié)束后,我們就可以調(diào)用API來啟動相應(yīng)的導(dǎo)入流程,在程序運(yùn)行過程中,可以動態(tài)的更改配置,從而有效的處理導(dǎo)入的任務(wù)。
在開發(fā)過程中,您可以靈活地定義各種配置文件以便實(shí)現(xiàn)多種導(dǎo)入規(guī)則,同時(shí)在程序運(yùn)行中進(jìn)行數(shù)據(jù)校驗(yàn),以防止冗余和非法數(shù)據(jù)被錯(cuò)誤導(dǎo)入。
下面的一些配置和代碼示例,以此介紹了如何定義配置文件,然后管理 API 來完成導(dǎo)入的任務(wù)。
定義基本信息配置文件:在該文件中,須先設(shè)定 IBM DB2 Content Manager 的一些連接參數(shù), 如:
contentManagerDatabase=iCMnlsdb // 定義調(diào)用的數(shù)據(jù)庫名字 contentManagerUsername=iCMadmin // 定義用戶名 contentManagerPassword= password // 定義連接密碼 contentManagerSchema=ICMADMIN // 定義具體的 schema |
您可以在代碼中用以上參數(shù)來實(shí)現(xiàn)對 IBM DB2 Content Manager 的連接,代碼示例:
DKDatastoreICM dsICM = new DKDatastoreICM(); // 創(chuàng)建連接 dsICM.connect("iCMnlsdb", "iCMadmin", "password", "SCHEMA=ICMADMIN"); |
還需指定哪個(gè)文件夾存放映射文件,以及需導(dǎo)入的數(shù)據(jù)文件,如:
mappingFilePath=config/rapid/mapping // 映射文件路徑 dataFileFolder=config/rapid/data // 數(shù)據(jù)文件路徑 |
也可定義一些參數(shù)來增強(qiáng)該導(dǎo)入的流程控制,如:
runPhase=2 // 指定是第二階段導(dǎo)入,在導(dǎo)入時(shí)需更新已有的數(shù)據(jù) |
定義映射文件:該配置文件主要用于將用戶想要導(dǎo)入的數(shù)據(jù)映射到 IBM DB2 Content Manager 的 Item Type 中,您可自由定制該文件,使用戶遵循您定義的規(guī)范順利完成數(shù)據(jù)遷移。如:
C001.del=c01 C002.del=c01 |
該定義中 C001.del 和 C002.del 是需要導(dǎo)入的數(shù)據(jù)文件,c01 是對應(yīng)的 Item Type 名字。這種定義方法可實(shí)現(xiàn)將多個(gè)數(shù)據(jù)文件導(dǎo)入同一個(gè) Item Type 中。
具體的對應(yīng)關(guān)系如下:
position=1|name=COMPANYNAME position=2|name=COMPANYID position=3|name=INPUTVALUE position=-1|name=SPECIALVALUE|value=C1 |
這個(gè)映射關(guān)系反映了數(shù)據(jù)文件中列數(shù)和 Item Type 中 attribute 的關(guān)系,如第一列在 Item Type 中代表了名字為 COMPANYNAME 的 attribute 。您也可定義一些特殊規(guī)則,如將 position 設(shè)為負(fù)數(shù),以便反映該列是一個(gè)特殊的 attribute, 它的值是固定的。 比如將 position 設(shè)為 -1 時(shí),名為 SPECIALVALUE 的 attribute 的值總是為 C1 。
若您想實(shí)現(xiàn)將一個(gè)數(shù)據(jù)文件導(dǎo)入多個(gè) Item Type 中,可在數(shù)據(jù)文件中加入一個(gè)特殊列,在映射文件中指定該列的列數(shù),以及當(dāng)該列的值和多種 Item Type 的映射關(guān)系。如:
C003.del(position:3) |
這樣,C003.del 就不是單一的對應(yīng)一個(gè) Item Type,而是先去取第三列 INPUTVALUE 的值,再去對應(yīng)表中查找到關(guān)聯(lián)的 Item Type 。該對應(yīng)表可設(shè)成:
Value1=c01 Value2=c02 |
若第三列 INPUTDOCID 的值為 Value1 時(shí),其對應(yīng)的 Item Type 為 c01,同樣的當(dāng)值為 Value2 時(shí),會將該行數(shù)據(jù)導(dǎo)入到 c02 的 Item Type 中。
調(diào)用 API 完成操作的代碼示例:在編寫代碼過程中,需要調(diào)用 DB2 Content Manager 的 API 來完成 Item Type 以及它包含的 attribute 的創(chuàng)建。上文已給出了通過參數(shù)來連接 Content Manager 的方法,下面的示例代碼用得到的 DKDatastoreICM 來實(shí)現(xiàn)具體的操作:
清單 10. API 調(diào)用
// Create an item / DDO / Root Component DKDDO ddo = dsICM.createDDO("S_withChild", itemPropertyOrSemanticType); //createDDO(<Overall Item Type>, <Item Property / Semantic Type>); // Adding Multivalue Attributes to DDOs, multiple type can be used, //here just give some example ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_varchar"), "this is a string value"); //string ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_date"), java.sql.Date.valueOf("2001-08-12")); //date ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_double"), new Double("123")); //double |
通過本文的介紹,相信您對多線程構(gòu)建的 Batch 框架實(shí)現(xiàn)大量數(shù)據(jù)遷移的過程,和通過配置文件的管理的 API 實(shí)現(xiàn)數(shù)據(jù)導(dǎo)入的過程也有了一定的了解和學(xué)習(xí)。您可靈活地實(shí)現(xiàn)一對一,一對多,多對多等各種映射關(guān)系,您也可以利用多線程來實(shí)現(xiàn)其他的功能的開發(fā),編寫出更加富有創(chuàng)造性的軟件。
學(xué)習(xí)
- 通過 developerWorks Information Management 專區(qū):在這里可以學(xué)到更多關(guān)于 Information Management 的知識。還可以找到技術(shù)文檔、how-to 文章、培訓(xùn)、下載、產(chǎn)品信息等。
- 在 IBM DB2 Content Manager Enterprise Edition Version 8.3 publication library 可以找到更多文檔。
- 參考“IBM DB2 Content Manager 信息中心”,了解更多 Content Manager 的相關(guān)內(nèi)容。
- 通過 Information Management 專區(qū) CM 專題,了解更多有關(guān) CM 的產(chǎn)品和技術(shù)資源。
獲得產(chǎn)品和技術(shù)
- 使用可直接從 developerWorks 下載的 IBM 試用軟件 構(gòu)建您的下一個(gè) Linux 開發(fā)項(xiàng)目。
posted on 2011-12-28 17:16 段旭 閱讀(447) 評論(0) 編輯 收藏 所屬分類: FRAMEWORK 、DB問題處理