需求
將資料從Oracle推至MQTT,資料結果使用JSON格式
場景1:直接推送
場景2:僅當資料有變更時才推送
解決方案
QueryDatabaseTable --> ConvertAvroToJSON --> PublishMQTT
Processor及其設定:
QueryDatabaseTable,作用是從DB中撈取資料,可以想象Nifi把它轉成一個Select語句在執行

- Database Connection Pooling Service:在Configure中設定的數據庫連接,可以在Processor Group中被共用
- Database Type:這個設定的是Oracle。其實它與數據庫連接設定有點重疊,那邊已經有指定是哪一種類型的DB,這里需要再指定,我想會不會是利用它來生成不同的Select查詢語法?
- Table Name:表名,我這里使用的是View名稱,View其實就已經對Table做出一些限定,可以挑選列及設定查詢條件
- Maximum-value Columns:這個屬性很重要,如果不設定則Nifi在查詢的時候會撈取所有的資料,如果有設定某個列,則Nifi僅會撈取“新”資料
- 例如圖上設定“BATCHID”這個列。第一次Nifi啟動時撈取資料中最大BATCHID = 10,則下一次Nifi再次啟動時就只會撈取BATCHID>10的資料,并且會自動記錄下已經撈過的最大值
- 最大值保存在下面這里,Processor上右鍵選擇“View State”
- 下圖可以看到當前最新的值,點“Clear State”則可以將保留值清空(Nifi下次啟動時就會撈取所有資料)
- 對于僅需要簡單拉取資料的場景1來說,“Maximum-value Columns”置空即可;而對于僅在資料有更新時才要拉取的場景2來說,則需要設定并且在View中做出一些調整才可以達成
- 當有資料更新時,則被更新資料的BatchID會是更新的值,所以只要在View中虛擬BatchID列 = Max(BatchID),就可以達成有資料更新才要拉取的效果
ConvertAvroToJSON,將Avro類型資料轉換為JSON,這個可以不用改設定
PublishMQTT,將資料Publish到MQTT指定Topic

- Broker URI:MQTT的Broker地址
- Client ID:發布MQTT的Client端ID(注意不要多個Processor使用相同的Client ID,這樣Processor容易被卡死)
- Topic:發布至MQTT的Topic名稱
- Retain Message:MQTT的遺言屬性,即是否保留推送的消息(若設為false,則僅有當前連上MQTT的客戶端才能收到這筆消息)