paulwong

          EVEN DRIVEN - SPRING CLOUD STREAM 3.x - Functional Programming Model

          SPRING CLOUD STREAM 3.x 版本時(shí),之前的一些編程模式,如@Enablebindding,@StreamListenner等注釋被廢棄了,這是由于一些框架的代碼必需由用戶編寫,如配置框架用的Input MessageChannel,Output  MessageChannel,連接MessageHandler與MessageChannel等,被視為不必要的動(dòng)作。為了簡(jiǎn)化用戶代碼,于是推出Functional Programming Model。

          引入了新名詞:Supplier、Function與Consumer。實(shí)際上這幾個(gè)類可視為Adapter,如果之前已經(jīng)有存在的Service類,且方法名為各種各樣,可以重新包裝成Supplier、Function與Consumer,并在固定的方法名:apply/get/accept中調(diào)用Service的方法。

          Supplier

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會(huì)幫你生成一個(gè)Output  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對(duì)應(yīng)的Destination Name,即可向BROKER發(fā)消息了。

          Consumer

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會(huì)幫你生成一個(gè)Input  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對(duì)應(yīng)的Destination Name,即可收到BROKER推送關(guān)于此Destination的消息了。

          Function

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會(huì)幫你生成一個(gè)Input和Output  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中分別對(duì)Input和Output MessageChannel加入對(duì)應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息了。

          與SPRING INTEGRATION的整合

          如果要對(duì)消息進(jìn)行復(fù)雜處理,如拆分消息、聚合消息、IF ELSE消息等,就要借助SPRING INTEGRATION了。

          @Bean
              public IntegrationFlow upperCaseFlow(LoanService loanService) {
                  return IntegrationFlows
                              //turn this IntegrationFlow as a gateway, here is a Function interface 
                              
          //with loadCheckerFunction as bean name
                              .from(LoadCheckerFunction.class, gateway -> gateway.beanName("loadCheckerFunction"))
                              .handle(loanService, "check")
                              .logAndReply(LoggingHandler.Level.WARN);
              }

              public interface LoadCheckerFunction extends Function<Loan, Loan>{

              }

          IntegrationFlows.from(Class<?> serviceInterface)是可以將本IntegrationFlow包裝成serviceInterface的實(shí)現(xiàn)類,如果調(diào)用此接口,最終會(huì)返回IntegrationFlow最后一個(gè)步驟的實(shí)體,如果這個(gè)serviceInterface是Function的話,剛好和SPRING CLOUD STREAM對(duì)接上。

          后續(xù)在spring.cloud.stream.function.definition加入此Bean的名稱loadCheckerFunction,SPRING CLOUD STREAM就會(huì)幫你生成一個(gè)Input和Output  MessageChannel,并連接上此Bean,再在BINDDING中分別對(duì)Input和Output MessageChannel加入對(duì)應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          # spring.cloud.stream.poller.fixed-delay=1000

          # This setting can control which function method in our code will be triggered if there are multiple
          # spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name

          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                stream:
                   function.definition: loadCheckerFunction
                   #BindingProperties
                   bindings:
                      loadCheckerFunction-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                      loadCheckerFunction-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host

          Reference

          https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration

          posted on 2021-11-10 15:10 paulwong 閱讀(400) 評(píng)論(0)  編輯  收藏 所屬分類: SPRING INTERGRATIONSPRING CLOUDEVEN DRIVEN ARCHITECT

          主站蜘蛛池模板: 台北县| 旺苍县| 平谷区| 迁西县| 临高县| 本溪市| 梁山县| 普陀区| 洛扎县| 平顺县| 巴彦淖尔市| 临桂县| 巫山县| 安仁县| 冷水江市| 象州县| 广饶县| 墨竹工卡县| 黄骅市| 静宁县| 阿城市| 长春市| 望谟县| 洞口县| 石台县| 永寿县| 马鞍山市| 襄垣县| 江口县| 兴安盟| 中江县| 苗栗县| 开原市| 黄石市| 彩票| 静乐县| 名山县| 玛沁县| 乌兰察布市| 上林县| 资兴市|