大大毛 的筆記

            DDM's Note

          哪怕沒有辦法一定有說法,
          就算沒有鴿子一定有烏鴉,
          固執(zhí)無罪 夢想有價(jià),
          讓他們驚訝.

          posts - 14, comments - 23, trackbacks - 0, articles - 58
             :: 首頁 ::  :: 聯(lián)系 ::  :: 管理

          日歷

          <2025年7月>
          293012345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          公告

          果然是不能想得太好。

          隨筆分類(4)

          積分與排名

          • 積分 - 60917
          • 排名 - 865

          最新評論

          Oracle資料推送MQTT

          Posted on 2019-04-10 15:25 大大毛 閱讀(415) 評論(0)  編輯  收藏 所屬分類: Nifi
          需求
          將資料從Oracle推至MQTT,資料結(jié)果使用JSON格式
          場景1:直接推送
          場景2:僅當(dāng)資料有變更時(shí)才推送

          解決方案
          QueryDatabaseTable --> ConvertAvroToJSON --> PublishMQTT


          Processor及其設(shè)定:
          QueryDatabaseTable作用是從DB中撈取資料,可以想象Nifi把它轉(zhuǎn)成一個(gè)Select語句在執(zhí)行


          • Database Connection Pooling Service:在Configure中設(shè)定的數(shù)據(jù)庫連接,可以在Processor Group中被共用
          • Database Type:這個(gè)設(shè)定的是Oracle。其實(shí)它與數(shù)據(jù)庫連接設(shè)定有點(diǎn)重疊,那邊已經(jīng)有指定是哪一種類型的DB,這里需要再指定,我想會(huì)不會(huì)是利用它來生成不同的Select查詢語法?
          • Table Name:表名,我這里使用的是View名稱,View其實(shí)就已經(jīng)對Table做出一些限定,可以挑選列及設(shè)定查詢條件
          • Maximum-value Columns:這個(gè)屬性很重要,如果不設(shè)定則Nifi在查詢的時(shí)候會(huì)撈取所有的資料,如果有設(shè)定某個(gè)列,則Nifi僅會(huì)撈取“新”資料
            • 例如圖上設(shè)定“BATCHID”這個(gè)列。第一次Nifi啟動(dòng)時(shí)撈取資料中最大BATCHID = 10,則下一次Nifi再次啟動(dòng)時(shí)就只會(huì)撈取BATCHID>10的資料,并且會(huì)自動(dòng)記錄下已經(jīng)撈過的最大值
            • 最大值保存在下面這里,Processor上右鍵選擇“View State”


            • 下圖可以看到當(dāng)前最新的值,點(diǎn)“Clear State”則可以將保留值清空(Nifi下次啟動(dòng)時(shí)就會(huì)撈取所有資料)

          • 對于僅需要簡單拉取資料的場景1來說,“Maximum-value Columns”置空即可;而對于僅在資料有更新時(shí)才要拉取的場景2來說,則需要設(shè)定并且在View中做出一些調(diào)整才可以達(dá)成
            • 當(dāng)有資料更新時(shí),則被更新資料的BatchID會(huì)是更新的值,所以只要在View中虛擬BatchID列 = Max(BatchID),就可以達(dá)成有資料更新才要拉取的效果
          ConvertAvroToJSON,將Avro類型資料轉(zhuǎn)換為JSON,這個(gè)可以不用改設(shè)定

          PublishMQTT,將資料Publish到MQTT指定Topic
          • Broker URIMQTT的Broker地址
          • Client ID發(fā)布MQTT的Client端ID(注意不要多個(gè)Processor使用相同的Client ID,這樣Processor容易被卡死)
          • Topic發(fā)布至MQTT的Topic名稱
          • Retain MessageMQTT的遺言屬性,即是否保留推送的消息(若設(shè)為false,則僅有當(dāng)前連上MQTT的客戶端才能收到這筆消息)

          i am ddm

          主站蜘蛛池模板: 河东区| 如皋市| 岳阳市| 鄂尔多斯市| 镇远县| 从江县| 沈阳市| 天气| 舞阳县| 泸水县| 文安县| 朝阳市| 土默特左旗| 潼南县| 和平县| 五指山市| 广河县| 墨竹工卡县| 额尔古纳市| 宾川县| 奉节县| 奉化市| 漳平市| 调兵山市| 永清县| 大方县| 安阳市| 大兴区| 咸丰县| 南陵县| 临夏县| 铜鼓县| 宁强县| 临猗县| 六枝特区| 文安县| 安图县| 洪泽县| 建德市| 原阳县| 泸水县|