paulwong

          EVEN DRIVEN - SPRING CLOUD STREAM 3.x - Functional Programming Model

          SPRING CLOUD STREAM 3.x 版本時,之前的一些編程模式,如@Enablebindding,@StreamListenner等注釋被廢棄了,這是由于一些框架的代碼必需由用戶編寫,如配置框架用的Input MessageChannel,Output  MessageChannel,連接MessageHandler與MessageChannel等,被視為不必要的動作。為了簡化用戶代碼,于是推出Functional Programming Model。

          引入了新名詞:Supplier、Function與Consumer。實際上這幾個類可視為Adapter,如果之前已經有存在的Service類,且方法名為各種各樣,可以重新包裝成Supplier、Function與Consumer,并在固定的方法名:apply/get/accept中調用Service的方法。

          Supplier

          當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Output  MessageChannel,并連接上此Bean,后續只需要在BINDDING中加入對應的Destination Name,即可向BROKER發消息了。

          Consumer

          當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input  MessageChannel,并連接上此Bean,后續只需要在BINDDING中加入對應的Destination Name,即可收到BROKER推送關于此Destination的消息了。

          Function

          當在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input和Output  MessageChannel,并連接上此Bean,后續只需要在BINDDING中分別對Input和Output MessageChannel加入對應的Destination Name1/Name2,即可收到BROKER推送關于此Destination的消息,也可以向BROKER發消息了。

          與SPRING INTEGRATION的整合

          如果要對消息進行復雜處理,如拆分消息、聚合消息、IF ELSE消息等,就要借助SPRING INTEGRATION了。

          @Bean
              public IntegrationFlow upperCaseFlow(LoanService loanService) {
                  return IntegrationFlows
                              //turn this IntegrationFlow as a gateway, here is a Function interface 
                              
          //with loadCheckerFunction as bean name
                              .from(LoadCheckerFunction.class, gateway -> gateway.beanName("loadCheckerFunction"))
                              .handle(loanService, "check")
                              .logAndReply(LoggingHandler.Level.WARN);
              }

              public interface LoadCheckerFunction extends Function<Loan, Loan>{

              }

          IntegrationFlows.from(Class<?> serviceInterface)是可以將本IntegrationFlow包裝成serviceInterface的實現類,如果調用此接口,最終會返回IntegrationFlow最后一個步驟的實體,如果這個serviceInterface是Function的話,剛好和SPRING CLOUD STREAM對接上。

          后續在spring.cloud.stream.function.definition加入此Bean的名稱loadCheckerFunction,SPRING CLOUD STREAM就會幫你生成一個Input和Output  MessageChannel,并連接上此Bean,再在BINDDING中分別對Input和Output MessageChannel加入對應的Destination Name1/Name2,即可收到BROKER推送關于此Destination的消息,也可以向BROKER發消息。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          # spring.cloud.stream.poller.fixed-delay=1000

          # This setting can control which function method in our code will be triggered if there are multiple
          # spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name

          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                stream:
                   function.definition: loadCheckerFunction
                   #BindingProperties
                   bindings:
                      loadCheckerFunction-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                      loadCheckerFunction-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host

          Reference

          https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration

          posted on 2021-11-10 15:10 paulwong 閱讀(396) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATIONSPRING CLOUDEVEN DRIVEN ARCHITECT

          主站蜘蛛池模板: 土默特右旗| 富源县| 清水河县| 石家庄市| 丽江市| 乐都县| 团风县| 秦皇岛市| 上饶市| 岳池县| 比如县| 于田县| 乡宁县| 延吉市| 江阴市| 云和县| 杨浦区| 五常市| 汝南县| 黄梅县| 四平市| 固原市| 峨眉山市| 桂林市| 南昌市| 科尔| 大余县| 葫芦岛市| 阿荣旗| 金沙县| 怀仁县| 都昌县| 泾源县| 托克托县| 红安县| 沾益县| 广南县| 宁晋县| 磐安县| 吉水县| 安溪县|