大大毛 的筆記

            DDM's Note

          哪怕沒有辦法一定有說法,
          就算沒有鴿子一定有烏鴉,
          固執無罪 夢想有價,
          讓他們驚訝.

          posts - 14, comments - 23, trackbacks - 0, articles - 58
             :: 首頁 ::  :: 聯系 ::  :: 管理

          Oracle資料推送MQTT

          Posted on 2019-04-10 15:25 大大毛 閱讀(409) 評論(0)  編輯  收藏 所屬分類: Nifi
          需求
          將資料從Oracle推至MQTT,資料結果使用JSON格式
          場景1:直接推送
          場景2:僅當資料有變更時才推送

          解決方案
          QueryDatabaseTable --> ConvertAvroToJSON --> PublishMQTT


          Processor及其設定:
          QueryDatabaseTable作用是從DB中撈取資料,可以想象Nifi把它轉成一個Select語句在執行


          • Database Connection Pooling Service:在Configure中設定的數據庫連接,可以在Processor Group中被共用
          • Database Type:這個設定的是Oracle。其實它與數據庫連接設定有點重疊,那邊已經有指定是哪一種類型的DB,這里需要再指定,我想會不會是利用它來生成不同的Select查詢語法?
          • Table Name:表名,我這里使用的是View名稱,View其實就已經對Table做出一些限定,可以挑選列及設定查詢條件
          • Maximum-value Columns:這個屬性很重要,如果不設定則Nifi在查詢的時候會撈取所有的資料,如果有設定某個列,則Nifi僅會撈取“新”資料
            • 例如圖上設定“BATCHID”這個列。第一次Nifi啟動時撈取資料中最大BATCHID = 10,則下一次Nifi再次啟動時就只會撈取BATCHID>10的資料,并且會自動記錄下已經撈過的最大值
            • 最大值保存在下面這里,Processor上右鍵選擇“View State”


            • 下圖可以看到當前最新的值,點“Clear State”則可以將保留值清空(Nifi下次啟動時就會撈取所有資料)

          • 對于僅需要簡單拉取資料的場景1來說,“Maximum-value Columns”置空即可;而對于僅在資料有更新時才要拉取的場景2來說,則需要設定并且在View中做出一些調整才可以達成
            • 當有資料更新時,則被更新資料的BatchID會是更新的值,所以只要在View中虛擬BatchID列 = Max(BatchID),就可以達成有資料更新才要拉取的效果
          ConvertAvroToJSON,將Avro類型資料轉換為JSON,這個可以不用改設定

          PublishMQTT,將資料Publish到MQTT指定Topic
          • Broker URIMQTT的Broker地址
          • Client ID發布MQTT的Client端ID(注意不要多個Processor使用相同的Client ID,這樣Processor容易被卡死)
          • Topic發布至MQTT的Topic名稱
          • Retain MessageMQTT的遺言屬性,即是否保留推送的消息(若設為false,則僅有當前連上MQTT的客戶端才能收到這筆消息)

          i am ddm

          主站蜘蛛池模板: 鄂州市| 恩平市| 宁乡县| 鄂托克旗| 怀化市| 依安县| 海伦市| 旌德县| 石首市| 丰原市| 阿巴嘎旗| 云浮市| 灵寿县| 高碑店市| 罗田县| 湘潭县| 资溪县| 香格里拉县| 乡宁县| 会昌县| 林西县| 高雄县| 广安市| 宾川县| 荣成市| 万荣县| 曲靖市| 栾川县| 石景山区| 修文县| 文山县| 黄石市| 府谷县| 桃江县| 调兵山市| 阳曲县| 南通市| 白玉县| 南和县| 昌都县| 阜平县|