paulwong

          My Links

          Blog Stats

          常用鏈接

          留言簿(67)

          隨筆分類(1389)

          隨筆檔案(1147)

          文章分類(7)

          文章檔案(10)

          相冊(cè)

          收藏夾(2)

          AI

          Develop

          E-BOOK

          Other

          養(yǎng)生

          微服務(wù)

          搜索

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          60天內(nèi)閱讀排行

          EVEN DRIVEN - SPRING CLOUD STREAM - Routing Function

          SPRING CLOUD STREAM內(nèi)置了一個(gè)RoutingFunction,能將MESSAGE路由到應(yīng)用的其他FUNCTION中。
          對(duì)接RoutingFunction可發(fā)送消息到其外部DESTINATION中或用“|”連接符連接。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          #
           spring.cloud.stream.poller.fixed-delay=1000
          #
           DefaultPollerProperties

          # This setting can control which function method in our code will be triggered if there are multiple
          #
           spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name
          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                #BindingServiceProperties
                stream:
                   #StreamFunctionProperties
                   function:
                      definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
                                  loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
                      routing:
                         enabled: true
                   #BindingProperties
                   bindings:
                      loanCheckerProcessor|functionRouter-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                         
                      loanApprovedConsumer-in-0:
                         destination: load.approved
                         binder: local_rabbit
                      loanDeclinedConsumer-in-0:
                         destination: load.declined
                         binder: local_rabbit
                         
                      loanCheckerDecieder-in-0:
                         destination: queue.pretty.log.messages.222
                         binder: local_rabbit
                      loanCheckerDecieder-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                      loanCheckerConsumer-in-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host
                                  
                                  
          logging:
             level:
                root: info
                org.springframework:
                   cloud.function: debug
                   #retry: debug


          LoanCheckConfiguration.java
          package com.paul.testspringcloudstream.loancheck.config;

          import java.util.function.Consumer;
          import java.util.function.Function;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.cloud.stream.function.StreamBridge;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.support.MessageBuilder;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;
          import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
          import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
          import com.paul.testspringcloudstream.loancheck.service.LoanService;

          @Configuration
          public class LoanCheckConfiguration {
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
              private static final Long MAX_AMOUNT = 10000L;
              private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
              
              @Autowired
              public void test(Consumer<Loan> loanCheckerConsumer) {
                  log.info("{}", loanCheckerConsumer.getClass());
              }
              
              
              @Bean
              public Consumer<Loan> loanCheckerConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanDeclinedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanApprovedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public MessageRoutingCallback loanCheckerRouter() {
                  return new LoanCheckerRouter();
              }
              
              @Bean
              public Function<Loan, Loan> loanCheckerProcessor(
                  LoanService loanService
              ){
                  return loan -> loanService.check(loan);
              }
              
              @Bean
              public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
                  LoanService loanService
              ){
                  return loan -> {
                      Loan result = loanService.check(loan);
                      String sendTo = Status.DECLINED.name().equals(result.getStatus()) ? 
                                  LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
                      
                      return MessageBuilder.withPayload(result)
                                  .setHeader("spring.cloud.stream.sendto.destination", sendTo)
                                  .build();
                  };
              }
              
              @Bean
              public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
                  return loan -> {
                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());

                      if (loan.getAmount() > MAX_AMOUNT) {
                          loan.setStatus(Status.DECLINED.name());
                          streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
                      } else {
                          loan.setStatus(Status.APPROVED.name());
                          streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
                      }

                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
                  };
              }

          }


          LoanCheckerRouter.java,將路由條件統(tǒng)一在此處
          package com.paul.testspringcloudstream.loancheck.router;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;

          public class LoanCheckerRouter implements MessageRoutingCallback{
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);

              @Override
              public String functionDefinition(Message<?> message) {
                  
          //        byte[] resultByte = (byte[])message.getPayload();
          //        String resultString = new String(resultByte);
          //        
          //        return "loanDeclinedConsumer";
                  
                  Loan result = (Loan)message.getPayload();
                  
                  log.info("Loan status: {}", result.getStatus());
                  
                  return Status.DECLINED.name().equals(result.getStatus()) ? 
                              "loanDeclinedConsumer" : "loanApprovedConsumer";
              }

          }

          posted on 2021-11-15 14:46 paulwong 閱讀(314) 評(píng)論(0)  編輯  收藏 所屬分類: SPRING CLOUD

          主站蜘蛛池模板: 左权县| 大邑县| 平潭县| 贡觉县| 永城市| 钟祥市| 西乡县| 崇文区| 凌海市| 惠东县| 华宁县| 湖州市| 汨罗市| 万州区| 千阳县| 博乐市| 光山县| 临沂市| 治县。| 内乡县| 宁远县| 丹阳市| 嘉定区| 甘德县| 丰顺县| 松原市| 阿克苏市| 五指山市| 香港| 将乐县| 尚义县| 依安县| 巧家县| 普宁市| 静乐县| 大同市| 余庆县| 外汇| 浙江省| 五指山市| 平乐县|