走自己的路

          路漫漫其修遠兮,吾將上下而求索

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            50 隨筆 :: 4 文章 :: 118 評論 :: 0 Trackbacks
           

          主要介紹如何周期性盡量實時地從RDBMS爬數據然后建索引,不涉及AOPORM Frameworklistener方式。

          先決條件:

          1. Lucene索引是從無到有的,一開始所有數據都是存儲在RDBMSOracle)中。
          2. 數據表有一列是updateTime或稱為lastModifiedTime用來存儲最后一次更新時間,并建有db索引
          3. 主表必須要有主鍵,這個主鍵也用來唯一確定一個Lucene document

          該策略大致可以分為以下幾個部分:

          1.索引結構

          2.初始化索引

          3.增量索引

          4.補償操作

          5.刪除檢測

          6.備份

          7.注意點

          索引結構:

          我們是做全文搜索的,所以db中的用戶需要全文搜索的字段值都會拼接成一個string存儲在lucene的一個字段中稱為content。其他的需要存儲的字段或者字段分析后的token是有意義單詞的我們稱為metadata,每個metadatadocument的一個field

          從索引結構看來,我們有一個主表,不同的主表OID的記錄對應不同的lucene document。主表依賴的所有子表中的數據如果是需要做全文搜索的就需要append在一起然后用Ngram進行分詞處理。有的數據是需要存儲的或有特殊用途的,比如訪問控制,就不分詞或者用特殊的analyzer分詞。

          簡單的介紹一下,具體會在以后的文章中談到。

          初始化索引

          初始化索引,是索引從無到有的一個過程。也是系統第一次初始化時做的事情。初始化時我們會首先獲取當前時間,然后將updateTime早于當前時間的所有數據取出來。最后把當前時間存儲到timeTrace.properties文件中。作為增量索引的起始依據。

          取什么?

          用戶預先在配置文件配置一個主SQL,通過這個sql我們取出需做全文索引的contentmetadata。對于某些特定的metadata不希望被build到全文索引中的,可以單獨配置sql

          如何取,多線程分批取。我們會有一個線程池,每個分批都是作為該線程池的一個task被提交執行。

          ·         查詢出當前時間之前的記錄總數n

          ·         按照rownum分批。假設分批的大小是2000,則需要將rownum0, 200040006000…..max(rownum)的所有OID都一次拿回來。

          Select oid from x where rownum=0 or rownum=2000,………….or rownum=n

          如果直接根據rownum分批取,會出現幻影讀的問題,因為rownum每次查詢都會發生變化,如果有新數據插入,改用當時snapshotOID去取,避免這個問題,因為OID是不變的。有人會說,這樣可能查到新插入的數據,當然是這樣的,但是查到新加入的不會有影響,新加入的也是遲早需要爬的,但是用rownum還會丟失數據。

          ·         根據拿回來的斷點的OID,分批取。比如令先前獲取的rownum2000oid10231,拿回來的4000oid14215

          前兩個分批的sql就是:

          Select * from x where oid >= 0 and oid < 10231;

          Select * from x where oid >=10231 and oid < 14215;

          具體的sqloid是哪張表的哪列都是在配置文件中預先配置的。

          ·         當然每個task還要負責將從db中爬出的數據建索引,建完索引后提交對索引的改動

          ·         所有task都跑完之后,主線程將一開始獲取的當前時間更新到timeTrace.properties文件中。

          當中可能會出現問題,如果出現問題,就需要重新初始化,初始化之前會清除所有已建好的部分臟索引。因為當前時間沒有更新到timeTrace.properties文件中。我們測試下來百萬級的數據這個過程大概需要10分鐘。

          增量索引

          在初始化索引成功后,當時的時間已經被更新到timeTrace.properties。增量索引是從這個時間點開始定期地被觸發執行,可以使用quartz來管理這個timing job。增量索引稱為incrementalIndexService,增量索引服務的不同任務調度之間需要同步執行,用quartzstateful job可以實現,或者使用內存,文件或DB鎖。

          ·         第一步,從timeTrace文件中獲取已爬數據的截至時間Tlast,獲取當前時間Tcurrent

          ·         Select count(*) from X where updateTime > Tlast and updateTime <= Tcurrent, 結果記為n,如果n小于分批的大小就直接爬出這段的索引數據。如果n大于分批大小,就需要將這個n個結果分批.

          ·         這次我們按照時間段分批,如果n/2000 = 3但有余數, 那就說明要分四批拿,將這個時間段Tcurrent-Tlast平均分為四段。每個線程處理其中的某段。

          ·         某線程將它負責的某段數據拿回來之后,首先判斷這個OID是否在index已經存在,如果存在就說明在這個時間段里這條記錄是被用戶update過的,index也做相應的update。如果這個OIDindex中不存在,則說明這條記錄是新加入到db中的,index也做add操作。做完之后提交。

          ·         當所有分批都完成之后,更新timeTrace文件,把時間更新為Tcurrent

          ·         一旦有分批出現問題失敗,整個時間段就認為是不成功的,需要重新爬一遍。

          一般增量服務我們設置的間隔都小于1分鐘,因為需要拿出最實時的數據,而且每次獲取數據的結束時間都是當前時間。保證數據的實時性。

          補償操作

          補償操作在整個爬蟲策略中是最復雜的一個環節。采用增量索引看似天衣無縫,其實還是有風險的。因為記錄到dbupdateTime往往都是有延遲的,一般情況下是java端的時間或是記錄寫入DB的時間,都早于commit時間,但一般數據庫的隔離級別都是read committed。只有在數據被提交后才可能被增量服務看到。這樣的話3點跑的增量服務,先前的結束時間是258分。這時它需要獲取2.583.00之間的數據,但是此時有可能java端正有一條記錄生成,它的updateTime 2.59,但是它一直沒有commit,因為transaction的超時時間是10分鐘。悲劇了發生了,這條數據將永遠不會被爬出來,除非遙遠的將來有人再次更新它。因為這個時間段已經被爬過了,按照增量服務,它是永遠不會再爬timeTrace文件中記錄的時間之前的數據的。

          此時,補償服務隆重登場。它存在的價值就是把所有可能被遺漏的數據都查出來。關鍵點就是要找出在補償服務運行時,哪個時間段的數據是可能被遺漏的而哪個時間段的數據又是永遠不會被丟失的。那個永遠不會丟失的時間段就沒必要再去管它。我們關心的只是可能被遺漏的那段時間段的數據。

          我們來看個例子:

          ·         增量服務每1分鐘跑一次, 周期記為P(N) = 1

          ·         Transaction Timeout時間3分鐘,記為TO

          ·         補償服務每x分鐘跑一次,P(C)  = x >= P(N)

          下圖的第一條時間軸T(N)是增量服務的,第二條是補償服務的T(C)


           

          對于補償服務我們需要確定每次的開始時間Ts,結束時間Te,周期P(C), 算法,初始化值,意外情況,優化等方面。

           

          結束時間Te

          補償服務的目的是對增量索引進行補償,所以它所補償的時間區間一定是增量服務已經處理過的。所以它的結束時間一定是timeTrace文件中最后一次增量服務記錄的時間我們記為Last(N). Last(N)之后的數據或者正在被增量服務處理或者沒有被增量服務處理,如果補償服務去涉及這些數據,那肯定全是要補償的,但是增量服務也會去處理,一個是會重復處理,一旦需要補償,我們會把這條記錄的所有數據都從DB端取過來,建索引,重復的代價也是很大的。為了避免這個代價: Te < Last(N).

           

          如果Te = Last(N), 我們就會拿到最新的需要補償的數據,補償服務的延遲就最小。Te < Last(N), 每次都沒有不嘗到最新的數據,遺漏數據被檢測到就會有延遲,只有等下一次補償服務觸發時才能被檢測出。

           

          開始時間Ts

          上圖中,transaction timeout之前的那個時間段,如果有數據生成,T(N) = 1, 要么在圖上標出的[T(N) =1T(N) = 4]之間被提交,要么transaction超時該數據也不會寫入到DB中。所以Last(N) – TO之前的數據在這一次補償服務的時候已經是完全可見的,肯定都會被補償的。對于下一次來說這塊也是不需要再被補償的。完全可見區(針對下一次補償操作)必須滿足下面兩個條件:

          ·         被增量服務處理過并且更新已經完全對本次補償服務可見

          ·         已經被補償服務處理過

           

          則下一次補償操作就不會再關心Last(N)-TO之前的數據了。我們把上一次補償服務記為Last(C), 而此次的增量服務記為Last(C Last(N)).  則下一次補償服務的開始時間<=Last(N)-TO.因為大于這個時間的所有數據都是需要被補償的。換個表達方式,此次的補償服務的開始時間是由上一次補償服務計算的得到的,為Last(C Last(N)) – TO.  Ts <= Last(C Last(N)) – TO

           

          同時我們需要注意的是,Last(N) – TO Last(N)每次補償服務是必須要檢測的,不然就會有遺漏,因為我們假設了不可見區,前提條件就是每次都會檢測這個區域。所以結束時間: Last(N) – TO < Te < Last(N).

           

          開始時間是由上一次補償服務計算得到的,那這個值就需要保存下來。保存在文件中可以避免系統down掉后丟失。我們也會將這個時間值保存到timeTrace.propertiescompensation屬性上。

           

           

          算法

          補償服務的算法主要目的就是比較出遺漏的數據。為了比較有無遺漏,我們需要把db中的數據和增量服務已經爬過的數據進行比較才知道。我們會在內存中存放增量服務已經爬過的數據的oidupdateTime,在內存中存放是為了提高性能。每次補償服務運行時,也會把完全可見區從內存中清除。

           

          ·         增量服務每次執行后就會將爬出的數據的OIDupdateTime保存在內存中,內存中有一棵二叉排序樹維護OIDupdateTimepair。排序的keyOID。二叉排序樹可以用TreeSet實現。

          ·         補償服務運行時,先創建兩個List一個用來存放需要update數據的oid,記為updateList,另一個用來存放add數據的oid,記為addList

          ·         接著從DB中取出TsTe之間所有數據的OIDupdateTime,也是根據OID排序的。

          ·         計算下一次補償服務的開始時間,Next(Ts) = Last(N) –TO;

          ·         現在就是要比較兩個有序集合。樹的訪問者應該寫成:

          if(OID(C) == OID(N))

               if(updateTime(C) > updateTime (N))

               {

                            更新OID(N)updateTime;

                             updateList.add(OID(N));

                  }

                  else{

                 addList.add(OID(N))

                 }

                  //清除完全可見區,下一次補償服務開始時間之前的數據

                     if(updateTime(N) < Next(Ts)) {

                     tressSetIterator.remove();

                    }

                 

          ·         updateListOID對應的所有數據從db中獲取并update到索引中

          ·         addListOID對應的所有的數據從db中獲取并add到索引中

          ·         Commit索引,并將Next(Ts)記錄到updateTrace文件中

           

          意外情況

          補償服務很久沒有被調度

          一般不會出現,因為我們會將增量服務和補償服務的線程優先級設為相同的。應該會被分時處理。如果很久沒有不會被調度,正確性是可以保證的,因為開始時間都是記錄在文件中的,如果一直沒有跑,只是一下子補償的時間段很長,并不會丟失補償的時間段。但是不排除內存溢出的風險,因為存儲在內存中的treeset在這種情況下會很大。在treeset很大時,我們可以檢測,如果超過一定的節點數,就可以將treeset序列化到一個internal索引中,下次取出來時也是有序的。甚至可以分塊取出比較。

           

          Server突然shutdown

          Server shutdown突然shutdown,線程被interrupt掉,沒有執行完,內存中的樹也沒了。這時就需要每次啟動時,這個時間段內的所有數據都會認為是需要add到索引的,這樣就會出問題。所以需要提前檢測,每次系統啟動時,補償服務需要把這段時間內的所有記錄的OIDupdateTimedb中獲取,直接和索引中的進行比較,比較效率要低一些。但也不會出現數據丟失的情況。

           

           

          初始化值

          系統初始化時,補償服務初始化Ts(被掃描數據的起始時間)是,初始化索引的當前時間-TO   而補償服務本身的開始時間是在增量服務開始之后。之后多少可以調。

           

          優化

          優化的重點放在了以下幾個方向。

          ·         DB壓力

          ·         補償延遲

          ·         消耗內存的大小

          ·         比較次數

           

          以上選項之間有的都是矛盾的,比如說補償延遲要小,則補償服務的P(C)就要小,則查詢DB的次數就增加,對DB壓力就增大。

          所以針對不同的使用情況,比如DB資源,延遲的可接受程度,應用服務器資源等,我們可能需要采用不同的策略,這就要我們的補償策略可調。

          為了可調,我們不僅使一些參數可以配置,而且引入了分級補償服務的方案。在分級方案中,如果分n級,則n-1級的TO輸入值推薦和P(C)是相同的,但也是可調的。

           

          舉個例子:一個三級補償服務,

          第一級:為了使補償的延遲最小,極端情況下我們可以采用和增量服務相同的周期假設為1分鐘,此時TO的輸入值也是周期值。此級的啟動時間也是初始化當前時間記為Tinitial+P(N).

           

          第二級: 業務場景中絕大多數事務都是在3分鐘內完成的,如果TO3分鐘,基本上絕大多數事務都可以及時的補償到。此級的啟動時間是Tinitial+3

           

          第三級:也是最后一級,在App Server中配置的真正的TO10分鐘,為了保證正確性,TO的輸入值一定要是10分鐘,因為只需要保證正確性所以它的頻率也不需要太頻繁周期也設為10分鐘。從前文中可知Last(N) – TO < Te < Last(N). 此級沒有必要多實時,所以Te就取最小值=Last(N)-TO.

           

          我們將這個三級策略和一級策略進行比較,我們假設一級策略的周期為2分鐘。假設整個時間段是10分鐘。

          比較項

          一級(2)

          三級(1, 3, 10)

          DB訪問次數

          5

          14

          延遲

          2

          <2

          內存

          11分鐘數據的OID updateTime

          17

          訪問DB的數據量

          12×5=60

          10+6×10/3+2×10=50

          比較次數

          60

          50


          在這個分級策略中級數n, 每一級的P(c), TO, Te都是可調的,但需要注意最后一級的TO是不可調的必須等于真正的transaction timeout時間,Te的取值范圍是[Last(N)-TO, Last(N)]

           

          調優的依據是我們會記錄每次補償操作的歷史記錄,比如每次補償成功的個數,補償運行的開始,結束時間等。

                        

          刪除檢測

          增量索引服務只是負責updateadd的檢測,它并不判定索引中document對應的記錄在DB中是否已經被刪除,索引中會積累很多在DB中已經被清除的數據。這些document也要及時地從索引中刪除。所以會有一個定期的刪除檢測服務,檢測出那些在索引中有,而在DB中已經被物理刪除的記錄。

           

          刪除檢測服務的步驟:

          l           從索引中分批取出所有OID,根據OID排序

          l           用每個分批的最小值和最大值到DB中取出此OIDDB中存在(沒有被刪除)的所有OID,也根據OID排序

          Select OID from X where oid>12001 and oid<24100 order by oid;

          l           將索引中查處的OID      有序集合和DB中獲得的OID有序集合進行對比,如果DB中沒有索引中有的就添加到deletedList.

          l           deletedList中的所有記錄對應document從索引中刪除

           

          對于軟刪除,它們的狀態屬性active如果已經被爬到索引中,直接從索引中選擇出那些active=0document刪除,如果沒有,可以將刪除檢測的sql語句改成

          Select OID from X where active = 0 and oid>12001 and oid<24100 order by oid;

          其他步驟同上面硬刪除的部分

           

          備份

          備份時需要注意不僅要備份最后一次commit之前的所有索引,而且需要備份timeTrace文件. 恢復后只需要從timeTrace的時間開始爬就可以了.

           

          注意點

          主表updateTime沒有更新

          有時候,業務邏輯更新了子對象,比如JobOrder對象包含了很多個Container對象,一個JobOrder對應一個Lucene Document,當Container對象更新時,它并沒有更新JobOrderupdateTime,只是更新了ContainerupdateTime。這也沒關系,我們再增量服務和補償策略中同時也會查出子表updateTime在當前時間段的所有主表數據。

          container如果有刪除,就必須約定application必須要update主表的updateTime。否則用戶就會搜出他本不能訪問的被刪除的container



          posted on 2010-05-07 07:12 叱咤紅人 閱讀(2775) 評論(1)  編輯  收藏 所屬分類: Lucene

          評論

          # re: RDBMS的lucene爬蟲 2010-05-07 13:52 羅萊家紡
          阿那是表達式  回復  更多評論
            


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 会同县| 宕昌县| 阿城市| 怀远县| 梁平县| 温宿县| 水富县| 万州区| 浪卡子县| 都匀市| 安庆市| 福海县| 甘洛县| 甘南县| 乌鲁木齐市| 衡水市| 洞口县| 余干县| 昌宁县| 象山县| 万源市| 库车县| 林口县| 莱芜市| 雅安市| 体育| 牙克石市| 青海省| 安义县| 息烽县| 峨山| 沿河| 澳门| 鞍山市| 思南县| 西吉县| 郁南县| 卢氏县| 和林格尔县| 和硕县| 武宣县|