這里L(fēng)Z再做最后一次沖刺,如果能找到線索,就繼續(xù),找不到,就找華生。
今天問了Leader,To be Cruel
因?yàn)閜opulateMap作用域是private,所以它只能在本地被調(diào)用。
我們看看都有誰(shuí)調(diào)用了它!
1, public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
2, public void setDiscardMap(Map map) {
discardMap.clear();
populateMap(map, discardMap);
}
從前面大家知道這個(gè)populateMap是將它的第一個(gè)參數(shù)里面值傳給它第二個(gè)參數(shù)里面的Map里面。
我們的線索就是我們的數(shù)據(jù)傳到了哪里,我們就找哪里。
現(xiàn)在傳到了Map里,那我們就找Map看誰(shuí)和它有聯(lián)系。
有三個(gè)方法和processMap有關(guān)系:
1,public List getProcessDestinations(IMessageProcessor processor) {
List l = (List) processMap.get(processor);
return l != null ? l : Collections.EMPTY_LIST;
}
注意它返回的是一個(gè)list,它將傳進(jìn)來(lái)的參數(shù)作為key,然后返回一個(gè)MessageProcessor的List。
2, protected IMessageProcessor getIfAlreadyAutoboxed(Object processor){
Object boxed = null;
Iterator ite = processMap.keySet().iterator();
while(ite.hasNext()){
Object key = ite.next();
if(! (key instanceof Node)) continue;
Node node = (Node) key;
if(node.getProcessor().equals(processor)){
boxed = node;
break;
}
}
return (IMessageProcessor) boxed;
}
作用是得到如果已經(jīng)封裝過了。
得到processMap的key集合,然后迭代,如果key是Node(第一次出場(chǎng))的實(shí)例,就繼續(xù)下一次迭代,否則
將key(強(qiáng)制)轉(zhuǎn)化為Node。如果node.getProcessor()==processor,那就把node賦值給boxed。中斷迭代
。返回的是一個(gè)IMessageProcessor。
3, public Object clone() throws CloneNotSupportedException {
RoutingMap obj = (RoutingMap) super.clone();
obj.processors = Collections.unmodifiableSet(processors);
obj.processMap = Collections.unmodifiableMap(processMap);
obj.discardMap = Collections.unmodifiableMap(discardMap);
obj.exceptionMap = Collections.unmodifiableMap(exceptionMap);
return obj;
}
這個(gè)先不詳細(xì)描述了
我們可以試著把重點(diǎn)放在第一個(gè)方法上,選中第一個(gè)方法,CTRL+ALT+H
發(fā)現(xiàn)出現(xiàn)了一個(gè)新的view,call hierarchy.
一共有4個(gè)調(diào)用。
先看第一個(gè)方法 在新類TopologyAnalyser中
private void buildTopology(IRoutingMap map) {
topology=new HashMap();
Collection processors=map.getMessageProcessors();
Iterator it=processors.iterator();
//Get the direct connections
while (it.hasNext()) {
IMessageProcessor processor=(IMessageProcessor)it.next();
link(processor,map.getProcessDestinations(processor));
link(processor,map.getDiscardDestinations(processor));
}
//Now get the indirect ones
it=processors.iterator();
}
調(diào)用了RoutingMap里面的
public Collection getMessageProcessors() {
return Collections.unmodifiableCollection(processors);
}
這里面的
Collections.unmodifiableCollection返回指定 collection 的不可修改視圖,此方法允許模塊為用戶提
供對(duì)內(nèi)部 collection 的“只讀”訪問。在返回的 collection 上執(zhí)行的查詢操作將“讀完”指定的
collection。試圖修改返回的 collection(不管是直接修改還是通過其迭代器進(jìn)行修改)將導(dǎo)致拋出
UnsupportedOperationException
processors值是xml傳來(lái)的所有的IMessageProcessor。
迭代出所有的IMessageProcessor,
根據(jù)提供的IMessageProcessor key得到value List<IMessageProcessor>。
link(processor,map.getProcessDestinations(processor));將源處理器和目標(biāo)處理器連接起來(lái)。
查找link()的源
private void link(IMessageProcessor src,List destinations) {
TopologyInfo srcTi=getTopologyInfo(src);
srcTi.addOutputs(destinations);
Iterator it=destinations.iterator();
while(it.hasNext()) {
IMessageProcessor destination=(IMessageProcessor)it.next();
TopologyInfo dstTi=getTopologyInfo(destination);
dstTi.addInput(src);
}
}
TopololyInfo 是類TopologyAnalyser中一個(gè)內(nèi)部類,作用就是得到輸入和輸出,并將輸入和輸出
的IMessageProcessor放入list中。
class TopologyInfo {
private List inputs=new ArrayList();
private List outputs=new ArrayList();
public List getInputs() {
return inputs;
}
public List getOutputs(){return outputs;}
public void addInput(IMessageProcessor input) {
inputs.add(input);
}
public void addOutput(IMessageProcessor output) {outputs.add(output);}
//public void addInputs(Collection inputs) {this.inputs.addAll(inputs);}
public void addOutputs(Collection outputs) {this.outputs.addAll(outputs);}
}
我們?cè)倩仡^看link()的源,將目的處理器給了srcTi的outputs List.源處理器給了inputs List。
在這里源和目的終于連接上了。
那么這個(gè)類TopologyAnalyser,在link()中要初始化,
首先調(diào)用
private TopologyInfo getTopologyInfo(IMessageProcessor processor) {
TopologyInfo topologyInfo;
if (!topology.containsKey(processor)) {
topologyInfo=new TopologyInfo();
topology.put(processor, topologyInfo);
}
else {
topologyInfo=(TopologyInfo)topology.get(processor);
}
return topologyInfo;
}
如果在Map topology中包含源IMessageProcessor,則將topology中的目標(biāo)IMessageProcessor放
入topologyInfo中,如果不包含IMessageProcessor,則創(chuàng)建一個(gè)新的TopologyInfo實(shí)例,然后將空
的topologyInfo和源ImessageProcessor一起放入topology Map。返回topologyInfo.
好了,老子不廢話了,真金不怕賣錢。
Router bean下有property 名叫processors,所以應(yīng)該也有個(gè)set方法,不然沒有辦法設(shè)置值。我們找到
org.openadaptor.core.router.Router類。
不負(fù)眾望
public void setProcessors(List processorList){
// create process map from list of processors
if ((processorList==null) || (processorList.isEmpty())) {
throw new RuntimeException("Null or empty processorList is not permitted");
}
Object[] processors=processorList.toArray(new Object[processorList.size()]);
Map processMap = new HashMap();
for (int i=1;i<processors.length;i++){
processMap.put(processors[i-1],processors[i]);
}
setProcessMap(processMap);
}
果然有,大家通過簡(jiǎn)單閱讀代碼可以看到這就是個(gè)將List設(shè)置到Map的過程,那么Map有什么用呢?大家再
仔細(xì)看看,不要著急,對(duì)了,看到了吧,那個(gè)for(;;),它是將一個(gè)處理器接著一個(gè)處理器塞
到HashMap里面,前面的是順序用個(gè)鮮明的矩陣圖來(lái)形容吧
1 2
2 3
3 4
4 5
發(fā)現(xiàn)什么規(guī)律?右邊的減左邊的等于1,- -,你真不是蓋的。
大家想想HashMap的結(jié)構(gòu)是什么樣的?<key,value>!
對(duì)!
當(dāng)它循著這個(gè)Map的key找的時(shí)候會(huì)找到value,然后又以這個(gè)value為key接著找下一個(gè)value,循環(huán)直到這
個(gè)Map的最后一個(gè)值。這樣就會(huì)把每個(gè)processor都執(zhí)行一遍。
那么有人會(huì)問- -!為什么說會(huì)這樣執(zhí)行,你怎么知道啊?
好的,來(lái)看
setProcessors()里最后一行是什么?
setProcessMap()
好,繼續(xù)找它,對(duì),還在Router里
public void setProcessMap(Map map) {
if (processMapConfigured) {
throw new RuntimeException("Only one of processMap and processors properties may be
configured");
}
processMapConfigured=true;
routingMap.setProcessMap(map);
}
對(duì),繼續(xù)
這里設(shè)置了processMapConfigured=true;這是為什么呢?LZ下次為你揭開這個(gè)神秘的擦腳布。
先把注意力放在routingMap里,它的本類是RoutingMap
好,找到
RoutingMap,找到setProcessMap()
public void setProcessMap(Map map) {
processMap.clear();
populateMap(map, processMap);
}
注意populateMap(map,processMap)在執(zhí)行前,processMap是空的
在這里我們發(fā)現(xiàn)了processMap的定義,Sets the processMap which defines how to route output from
one adaptor component to anothers.
是一個(gè)定義怎樣將一個(gè)適配器組件的輸出轉(zhuǎn)發(fā)到另外一個(gè)的集合
首先它把processMap清空了,然后populateMap(map,processMap)
哇擦,天長(zhǎng)地久有時(shí)盡 程序綿綿無(wú)絕期
找到populateMap(map,processMap)
private void populateMap(Map map, Map checkedMap) {
map = autoboxer.autobox(map);
for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
verifyEntryKeyIsIMessageProcessor(entry);
IMessageProcessor fromProcessor = (IMessageProcessor) entry.getKey();
List processorList = autoboxIMessageProcessorList(entry.getValue());
processors.add(fromProcessor);
processors.addAll(processorList);
checkedMap.put(fromProcessor, processorList);
}
}
大家仔細(xì)閱讀代碼發(fā)現(xiàn)這就是個(gè)將map中的數(shù)據(jù)先verifyEntryKeyIsIMessageProcessor(entry);
確定map的key是否是MessageProcessor,不是就報(bào)錯(cuò)退出了,是就將他強(qiáng)制轉(zhuǎn)換為IMessageProcessor,
然后自動(dòng)封裝,這里真j8操淡,TMD前面設(shè)置到Map里面的時(shí)候就可以了,現(xiàn)在還要設(shè)置到set中,還來(lái)個(gè)
什么自動(dòng)封箱,封你媽啊,不過他們也有道理,如果list里面嵌套了另外一個(gè)list就把第二
個(gè)list的MessageProcessor給漏掉了。現(xiàn)在checkMap里面已經(jīng)有了所有的MessageProcessor,LZ就要看看
他們要干什嗎?
OpenAdaptor是為達(dá)到快速構(gòu)建數(shù)據(jù)集成功能的適配器框架,用Java實(shí)現(xiàn),抽象數(shù)據(jù)遷移的過程,用3類組
件完成數(shù)據(jù)遷移,通過對(duì)組件的連接和配置達(dá)到快速構(gòu)建、高度復(fù)用的目的,快速地滿足企業(yè)數(shù)據(jù)集成的
要求,具有標(biāo)準(zhǔn)化、快速開發(fā)、靈活、易于定制,高度復(fù)用特點(diǎn)。
OpenAdaptor原理
在2個(gè)不同的業(yè)務(wù)數(shù)據(jù)庫(kù)之間進(jìn)行數(shù)據(jù)遷移的時(shí)候,經(jīng)歷三個(gè)過程:數(shù)據(jù)抽象,數(shù)據(jù)轉(zhuǎn)換,數(shù)據(jù)加
載。OpenAdaptor框架將3個(gè)階段抽象為3類不同的組件:sources,pipes,sinks,通過這3類內(nèi)置的組件
分別完成各階段工作。
在數(shù)據(jù)抽取階段,對(duì)應(yīng)sources組件,完成與源端的協(xié)議連接,讀取數(shù)據(jù)并將數(shù)據(jù)轉(zhuǎn)化為DataObject數(shù)組
,根據(jù)需要對(duì)數(shù)據(jù)加密/壓縮;
數(shù)據(jù)轉(zhuǎn)化階段,對(duì)應(yīng)pipes組件,完成數(shù)據(jù)過濾、數(shù)據(jù)轉(zhuǎn)化、異常處理等作業(yè);
數(shù)據(jù)加載對(duì)應(yīng)sinks組件,完成建立與目的端的協(xié)議連接,將數(shù)據(jù)轉(zhuǎn)化為目的端的數(shù)據(jù)格式,數(shù)據(jù)解壓/解
密,將數(shù)據(jù)寫入目的端。
source類型有3種:
callback類型是指數(shù)據(jù)到來(lái)時(shí)間觸發(fā)source讀取數(shù)據(jù);
listen類型表示source監(jiān)聽數(shù)據(jù),例如source從socket讀數(shù)據(jù)。
poll類型表示每隔一段時(shí)間久讀取數(shù)據(jù)。
在run()方法中不同類型的source分別調(diào)用runCallback()、runListen()、runPoll。
sourceProcess()方法對(duì)讀取到的DataObject[]進(jìn)行處理,并調(diào)用processMessage(),該方法會(huì)循環(huán)
調(diào)用管道線里下游組件的processMessage()。
AbstractWriter將DataObject[]轉(zhuǎn)化為字符串的一個(gè)Writer類,如果sink制定了一個(gè)DoStringWriter,則
用改writer的實(shí)例根據(jù)DataObject[]產(chǎn)生字符串,否則產(chǎn)生默認(rèn)的XML格式字符串。
OpenAdaptor采用的是Spring的技術(shù),所以建議你在學(xué)習(xí)OpenAdaptor前,先學(xué)習(xí)Spring,至少要了
解Spring的AOP和IOC,知道Spring的工作原理,你就明白了一半OpenAdaptor的調(diào)用過程。
受限我們先看一個(gè)OpeanAdaptor的一個(gè)XML文件,為什么它要使用XML文件,而不是txt或者doc呢?這是因
為,XML文件是用來(lái)存儲(chǔ)數(shù)據(jù)的,重在數(shù)據(jù)本身。而且她使用標(biāo)簽來(lái)存儲(chǔ)數(shù)據(jù),每個(gè)標(biāo)簽必須同時(shí)有結(jié)束
可開始標(biāo)簽。并且標(biāo)簽必須按照合適的順序進(jìn)行嵌套,這就使得它可以用來(lái)表示一些有層次的數(shù)據(jù)類型,
例如java中的類。還有一個(gè)重要原因是,Spring的特性(Service層,DAO層獨(dú)立出現(xiàn),在編碼時(shí)既沒有實(shí)
例化對(duì)象,也沒有設(shè)置依賴關(guān)系,而把它交給Spring,由Spring在運(yùn)行階段實(shí)例化、組裝對(duì)象)決定它必
須要使用到XML文件,在XML文件中配置各個(gè)組件。
首先,我們看一下OpenAdaptor的XML文件中的內(nèi)容:
先看<beans>,yes,你很聰明,那是一個(gè)標(biāo)準(zhǔn)的Spring XML文件開頭,好了這下你更確信了,所
謂OpenAdaptor不過是建立在Spring基礎(chǔ)上的一個(gè)有特定功能的Java功能組件包。
好了,我們接著看,大家發(fā)現(xiàn)了么?LZ是個(gè)天才。
下面第一個(gè)組件式Adaptor 注意有很多Adaptor,這個(gè)是org.openadaptor.core.adaptor.Adaptor,那么我
們可以發(fā)現(xiàn)(當(dāng)然LZ是在看了很多個(gè)XML文件后發(fā)現(xiàn)的),每個(gè)XML文件都有這個(gè)Adaptor,那么這
個(gè)Adaptor究竟是個(gè)神馬東東?
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router" />
</bean>
1。LZ發(fā)現(xiàn)這個(gè)Adaptor里面有個(gè)property屬性,那么我們就想吧,什么里面有屬性這一說,大家不知道
學(xué)習(xí)Java有多深入,但是我可以告訴大家,屬性絕對(duì)是個(gè)抽象的東西,因?yàn)榫唧w的不叫屬性,叫 值
。(⊙o⊙)哦!到這里你發(fā)現(xiàn)原來(lái)是這啊,那不就是個(gè)類嗎?類里面有屬性,當(dāng)類生成具體對(duì)象 的
時(shí)候,可以給屬性賦值,對(duì)頭,LZ說。
那么既然是屬性,肯定有設(shè)置它的方法啊?樓主決定給這么問的童鞋獎(jiǎng)勵(lì)一個(gè)臭豆腐,屬性如果不
能設(shè)置,就好比你只是說我有臭豆腐,可是沒法吃。LZ郁悶。
好,我們就打開這個(gè)類,看看誰(shuí)掉淚。注意這個(gè)臭豆腐名叫messageProcessor。
當(dāng)當(dāng)當(dāng)當(dāng)
public void setMessageProcessor(final IMessageProcessor processor) {
if (this.processor != null) {
throw new RuntimeException("message processor has already been set");
}
this.processor = processor;
/* if the processor has metrics treat them as 'global' Adaptor ones */
if(processor instanceof IRecordableComponent){
this.metrics = ((IRecordableComponent) processor).getMetrics();
String enabled = metrics.isMetricsEnabled() ? "on" : "off";
log.info("Metrics are " + enabled);
}
}
瓦擦
這不是我編的。真的有這個(gè)嘞。好,如果你翻開看這個(gè)setMessageProcessor,行了,別吐。所
以我們不看丫的。那么我們知道就行了,它初始化的第一
個(gè)Bean是org.openadaptor.core.adaptor.Adaptor,它的一個(gè)屬性是messageProcessor,它……等等
,有值,是的有值。值是多少?Router!Router?好的,Router是什么?我靠,Ruter是另外一 個(gè)Bean
,傻狗答道。
2。Router
介于傻狗同學(xué)的成就,我們今天請(qǐng)他作為特邀嘉賓,你好,傻狗!你好,樓豬!- -,啊,那個(gè)你向大家
介紹一下自己吧!(⊙_⊙),大家好,我是Router。大家 ⊙﹏⊙b汗
- <bean id="Router" class="org.openadaptor.core.router.Router">
- <property name="processors">
- <list>
<ref bean="Servlet" />
<ref bean="TickerFilter" />
<ref bean="XMLConverter" />
<ref bean="Validator" />
<ref bean="Writer" />
</list>
</property>
<property name="exceptionProcessor" ref="ExceptionHandler" />
</bean>
哇,大家一陣狂吐!
待續(xù)
OpenAdaptor是Dresdner Kleinwort Wasserstein 開發(fā)的一款open source的產(chǎn)品,主要面向不同系統(tǒng)中的數(shù)據(jù)傳遞。 它可以很富方便地將一個(gè)系統(tǒng)中的數(shù)據(jù)輸出(Source),經(jīng)過一定的轉(zhuǎn)化(Pipe),輸出到其他系統(tǒng)(Sink)。
Source 數(shù)據(jù)獲取對(duì)象,是一個(gè)接口,可以根據(jù)數(shù)據(jù)獲取來(lái)源不同,實(shí)現(xiàn)很多子類,比如:FileSource、JMSSource等。它本身并沒有定義任何方法,但是擴(kuò)展了線程,所以它的核心方法應(yīng)該就是線程的run方法。
Pipe 對(duì)source獲取的數(shù)據(jù)進(jìn)行處理,比如過濾、重新組織等,比如:FilterPipe。最上層是一個(gè)抽象類,核心方法是processMessage()
Sink 數(shù)據(jù)發(fā)布,將整合后的數(shù)據(jù),根據(jù)配置文件,發(fā)布出去,比如:FileSink、JMSSink等。定義了一個(gè)接口,核心方法是processMessage()
Controller 整個(gè)消息傳遞的控制類,是一個(gè)線程,負(fù)責(zé)對(duì)上面所說的三個(gè)對(duì)象的調(diào)用,完成一次消息的傳遞,每一次消息的傳遞都會(huì)新建一個(gè)線程實(shí)例。定義了一個(gè)接口。
DataObject 傳遞消息數(shù)據(jù)的對(duì)象,定義了一個(gè)接口。
Message 對(duì)DataObject對(duì)象集合的封裝,用來(lái)傳遞消息。
LDAP是輕量目錄訪問協(xié)議,英文全稱是Lightweight Directory Access Protocol,一般都簡(jiǎn)稱為L(zhǎng)DAP。它是基于X.500標(biāo)準(zhǔn)的,但是簡(jiǎn)單多了并且可以根據(jù)需要定制。與X.500不同,LDAP支持TCP/IP,這對(duì)訪問Internet是必須的。LDAP的核心規(guī)范在RFC中都有定義,所有與LDAP相關(guān)的RFC都可以在LDAPman RFC網(wǎng)頁(yè)中找到。
詳細(xì)簡(jiǎn)單說來(lái),LDAP是一個(gè)得到關(guān)于人或者資源的集中、靜態(tài)數(shù)據(jù)的快速方式、LDAP是一個(gè)用來(lái)發(fā)布目錄信息到許多不同資源的協(xié)議。通常它都作為一個(gè)集中的地址本使用,不過根據(jù)組織者的需要,它可以做得更加強(qiáng)大。詳細(xì)信息參照http://baike.baidu.com/view/159263.htm
下面是幾個(gè)例子
eg1:csv to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for csv to database.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processMap">
<map>
<entry key-ref="ReaderA" value-ref="MapConverter"/>
<entry key-ref="MapConverter" value-ref="Writer"/>
</map>
</property>
</bean>
<bean id="ReaderA" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/inputA.csv" />
</bean>
<bean id="MapConverter" class="org.openadaptor.auxil.convertor.delimited.DelimitedStringToOrderedMapConvertor"> <description>將讀取csv文件的數(shù)據(jù)轉(zhuǎn)換成map類型;以下對(duì)應(yīng)的行屬性值,是和數(shù)據(jù)庫(kù)中表trade1中相對(duì)應(yīng)的。 以下三個(gè)屬性name,number,text分別和數(shù)據(jù)庫(kù)中的字段一致,且和下面的outputColumns的數(shù)值一致,反之,報(bào)錯(cuò)! </description>
<property name="fieldNames">
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
<property name="outputColumns">
<description>以下對(duì)應(yīng)的行屬性值,是和數(shù)據(jù)庫(kù)中表trade1中相對(duì)應(yīng)的。以下三個(gè)屬性name,number,text分別和數(shù)據(jù)庫(kù)中的字段一致,且和上面的map的key值一致,反之,報(bào)錯(cuò)!
</description>
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
</property>
</bean>
</beans>
eg2:database to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step09.xml 1508 2008-06-02 14:06:48Z cawthorng $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step09.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for database1 to database2
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="readerConnector"/>
<ref bean="writerConnector"/>
</list>
</property>
</bean>
<bean id="sourceJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="targetJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="readerConnector" class="org.openadaptor.auxil.connector.jdbc.reader.JDBCReadConnector">
<description>Reader which polls database using configured SQL.</description>
<property name="jdbcConnection" ref="sourceJdbcConnection"/>
<property name="sql">
<value>
SELECT side as BUYSELL,stock as TICKER,price as PRICE
FROM TRADE
</value>
</property>
</bean>
<bean id="writerConnector" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="targetJdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE2" />
</bean>
</property>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="trade2"/> <!-- -->
<property name="outputColumns">
<list>
<value>side</value><!--?? -->
<value>price</value>
<value>stock</value>
</list>
</property>
</bean>
<bean id="ExceptionHandler" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector"> <property name="filename" value="output/print.txt"/>
</bean>
</beans>
eg3:xml to csv
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to csv
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="XmlToMap"/>
<ref bean="FormatterScript"/>
<ref bean="maptostring"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/trades.xml"/>
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.StringReader"/>
</property>
</bean>
<bean id="XmlToMap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor"/>
<bean id="maptostring" class="org.openadaptor.auxil.convertor.delimited.OrderedMapToDelimitedStringConvertor"/>
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<description>Convert Dom4j Document inton org.w3c.dom.Document</description>
<property name="script">
<value>
var trade=oa_data.get('Trade'); oa_data.put('SIDE',trade.get('buySell')); oa_data.put('STOCK',trade.get('ticker'));
oa_data.put('PRICE',trade.get('price'));
oa_data.remove('Trade'); //No longer needed
</value>
</property>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector">
<property name="filename" value="output/dengjianli.csv"/>
</bean>
</beans>
eg4:xml to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to database
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader" />
<ref bean="xmltomap" />
<ref bean="FormatterScript" />
<ref bean="writer" />
</list>
</property>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<description>Read entire contents of input file</description>
<property name="filename" value="input/trades.xml" />
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.StringReader" />
</property>
</bean>
<bean id="xmltomap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor" />
<bean id="maptostring" class="org.openadaptor.auxil.convertor.delimited.OrderedMapToDelimitedStringConvertor" />
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<property name="script">
<value>
var trade=oa_data.get('Trade');
oa_data.put('SIDE',trade.get('buySell'));
oa_data.put('STOCK',trade.get('ticker'));
oa_data.put('PRICE',trade.get('price'));
oa_data.remove('Trade');
</value>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
</bean>
</property>
</bean>
</beans>
eg5:txt to database
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for xml to database
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router">
</property>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="FileReader" />
<ref bean="writerConnector" />
</list>
</property>
<property name="exceptionProcessor" ref="ExceptionHandler" />
</bean>
<bean id="FileReader" class="org.openadaptor.core.node.ReadNode">
<property name="connector">
<ref bean="readerConnector" />
</property>
<property name="processor">
<ref bean="mapconverter" />
</property>
</bean>
<bean id="readerConnector" class="org.openadaptor.auxil.connector.iostream.reader.FileReadConnector">
<property name="filename" value="input/data.txt" />
<property name="dataReader">
<bean class="org.openadaptor.auxil.connector.iostream.reader.string.LineReader" />
</property>
</bean>
<bean id="mapconverter" class="org.openadaptor.auxil.convertor.delimited.DelimitedStringToOrderedMapConvertor"> <property name="delimiter" value="," />
<property name="fieldNames">
<list>
<value>SIDE</value>
<value>STOCK</value>
<value>PRICE</value>
</list>
</property>
</bean>
<bean id="targetJdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<bean id="writerConnector" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="targetJdbcConnection" />
<property name="writer">
<ref bean="writer" />
</property>
</bean>
<bean id="writer" class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE1" />
</bean>
<bean id="ExceptionHandler" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector">
<property name="filename" value="output/puberror.txt" />
</bean>
</beans>
以上例子 都可以選擇同一個(gè)用戶 在同一個(gè)用戶下創(chuàng)建3個(gè)結(jié)構(gòu)相同的表 sql如下
create table TRADE
(
SIDE VARCHAR2(50),
STOCK VARCHAR2(50),
PRICE NUMBER(20)
)
tablespace TI_DATA
pctfree 10
initrans 1
maxtrans 255
storage
(
initial 64K
minextents 1
maxextents unlimited
);
在運(yùn)行以上實(shí)例的時(shí)候請(qǐng)參照如下目錄下的 說明
D:\openadaptor-3.4.5-bin\openadaptor-3.4.5\example\tutorial\index.htm只需創(chuàng)建需要的xml文件并以 index.htm里面說的創(chuàng)建好環(huán)境后使用 java org.openadaptor.spring.SpringAdaptor -config step01.xml 在命令行模式下運(yùn)行,注意以上都與隊(duì)列無(wú)關(guān),所以如果要使用隊(duì)列的話,提供下面一個(gè)實(shí)例,
隊(duì)列牽涉到發(fā)送端和接收端,所以需要配置兩個(gè)xml文件,開兩個(gè)cmd窗口運(yùn)行,值得注意的是要在setclasspath.bat中添加一些需要的jar包,并且在運(yùn)行前,在命令行下運(yùn)行 setclasspath.bat文件,我的是這樣的
set CLASSPATH=.
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor-spring.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\openadaptor-depends.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\hsqldb.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\jbossall-client.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\ojdbc14.jar
set CLASSPATH=%CLASSPATH%;..\..\lib\3rdparty\sqljdbc.jar
@echo on
使用兩個(gè)xml文件從 database to database
,xml文件名自己取
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step07.xml 1285 2008-03-05 14:31:22Z higginse $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step07.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for step 7 of the tutorial.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="JndiConnection" class="org.openadaptor.auxil.connector.jndi.JNDIConnection">
<property name="initialContextFactory" value="org.jnp.interfaces.NamingContextFactory"/>
<property name="providerUrl" value="jnp://localhost:1099"/>
</bean>
<bean id="JmsConnection" class="org.openadaptor.auxil.connector.jms.JMSConnection">
<property name="jndiConnection" ref="JndiConnection"/>
<property name="connectionFactoryName" value="ConnectionFactory"/>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti1_user"/>
<property name="password" value="ti1_user"/>
</bean>
<!--<bean id="Reader" class="org.openadaptor.auxil.connector.jms.JMSReadConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
<property name="transacted" value="true"/>
</bean>-->
<bean id="Reader" class="org.openadaptor.auxil.connector.jdbc.reader.JDBCReadConnector">
<description>Reader which polls database using configured SQL.</description>
<property name="jdbcConnection" ref="JdbcConnection"/>
<!-- batch size of 0 or less means process all rows in one message batch. -->
<!-- batch size of one means process one row per message and so on -->
<property name="batchSize" value="0"/>
<property name="resultSetConverter">
<bean class="org.openadaptor.auxil.connector.jdbc.reader.xml.ResultSetToXMLConverter"/>
</property>
<property name="sql">
<value>
SELECT side as BUYSELL,stock as TICKER,price as PRICE
FROM TRADE
</value>
</property>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jms.JMSWriteConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
</bean>
<!--<bean id="Writer" class="org.openadaptor.auxil.connector.iostream.writer.FileWriteConnector"/>-->
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<!--
$Id: step09.xml 1508 2008-06-02 14:06:48Z cawthorng $
$HeadURL: https://openadaptor3.openadaptor.org/svn/openadaptor3/tags/3.4.5/example/tutorial/step09.xml $
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
<description>
Adaptor for step 9 of the tutorial.
</description>
<bean id="Adaptor" class="org.openadaptor.core.adaptor.Adaptor">
<property name="messageProcessor" ref="Router"/>
</bean>
<bean id="Router" class="org.openadaptor.core.router.Router">
<property name="processors">
<list>
<ref bean="Reader"/>
<ref bean="XmlToMap"/>
<ref bean="FormatterScript"/>
<ref bean="Writer"/>
</list>
</property>
</bean>
<bean id="JndiConnection" class="org.openadaptor.auxil.connector.jndi.JNDIConnection">
<property name="initialContextFactory" value="org.jnp.interfaces.NamingContextFactory"/>
<property name="providerUrl" value="jnp://localhost:1099"/>
</bean>
<bean id="JmsConnection" class="org.openadaptor.auxil.connector.jms.JMSConnection">
<property name="jndiConnection" ref="JndiConnection"/>
<property name="connectionFactoryName" value="ConnectionFactory"/>
</bean>
<bean id="Reader" class="org.openadaptor.auxil.connector.jms.JMSReadConnector">
<property name="destinationName" value="queue/testQueue"/>
<property name="jmsConnection" ref="JmsConnection"/>
<property name="transacted" value="true"/>
</bean>
<bean id="XmlToMap" class="org.openadaptor.auxil.convertor.xml.XmlToOrderedMapConvertor"/>
<bean id="FormatterScript" class="org.openadaptor.auxil.processor.script.ScriptProcessor">
<property name="script">
<value>
<![CDATA[
var trade=oa_data.get('row');
oa_data.put('SIDE',trade.get('BUYSELL'));
oa_data.put('STOCK',trade.get('TICKER'));
oa_data.put('PRICE',trade.get('PRICE'));
oa_data.remove('row'); //No longer needed
]]>
</value>
</property>
</bean>
<bean id="JdbcConnection" class="org.openadaptor.auxil.connector.jdbc.JDBCConnection">
<property name="driver" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@192.168.1.121:1521:orcl"/>
<property name="username" value="ti2_user"/>
<property name="password" value="ti2_user"/>
</bean>
<bean id="Writer" class="org.openadaptor.auxil.connector.jdbc.writer.JDBCWriteConnector">
<property name="jdbcConnection" ref="JdbcConnection" />
<property name="writer">
<bean class="org.openadaptor.auxil.connector.jdbc.writer.map.MapTableWriter">
<property name="tableName" value="TRADE" />
</bean>
</property>
</bean>
</beans>
有問題歡迎討論