paulwong

          EVEN DRIVEN - SPRING CLOUD STREAM - Routing Function

          SPRING CLOUD STREAM內置了一個RoutingFunction,能將MESSAGE路由到應用的其他FUNCTION中。
          對接RoutingFunction可發送消息到其外部DESTINATION中或用“|”連接符連接。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          #
           spring.cloud.stream.poller.fixed-delay=1000
          #
           DefaultPollerProperties

          # This setting can control which function method in our code will be triggered if there are multiple
          #
           spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name
          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                #BindingServiceProperties
                stream:
                   #StreamFunctionProperties
                   function:
                      definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
                                  loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
                      routing:
                         enabled: true
                   #BindingProperties
                   bindings:
                      loanCheckerProcessor|functionRouter-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                         
                      loanApprovedConsumer-in-0:
                         destination: load.approved
                         binder: local_rabbit
                      loanDeclinedConsumer-in-0:
                         destination: load.declined
                         binder: local_rabbit
                         
                      loanCheckerDecieder-in-0:
                         destination: queue.pretty.log.messages.222
                         binder: local_rabbit
                      loanCheckerDecieder-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                      loanCheckerConsumer-in-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host
                                  
                                  
          logging:
             level:
                root: info
                org.springframework:
                   cloud.function: debug
                   #retry: debug


          LoanCheckConfiguration.java
          package com.paul.testspringcloudstream.loancheck.config;

          import java.util.function.Consumer;
          import java.util.function.Function;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.cloud.stream.function.StreamBridge;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.support.MessageBuilder;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;
          import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
          import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
          import com.paul.testspringcloudstream.loancheck.service.LoanService;

          @Configuration
          public class LoanCheckConfiguration {
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
              private static final Long MAX_AMOUNT = 10000L;
              private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
              
              @Autowired
              public void test(Consumer<Loan> loanCheckerConsumer) {
                  log.info("{}", loanCheckerConsumer.getClass());
              }
              
              
              @Bean
              public Consumer<Loan> loanCheckerConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanDeclinedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanApprovedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public MessageRoutingCallback loanCheckerRouter() {
                  return new LoanCheckerRouter();
              }
              
              @Bean
              public Function<Loan, Loan> loanCheckerProcessor(
                  LoanService loanService
              ){
                  return loan -> loanService.check(loan);
              }
              
              @Bean
              public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
                  LoanService loanService
              ){
                  return loan -> {
                      Loan result = loanService.check(loan);
                      String sendTo = Status.DECLINED.name().equals(result.getStatus()) ? 
                                  LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
                      
                      return MessageBuilder.withPayload(result)
                                  .setHeader("spring.cloud.stream.sendto.destination", sendTo)
                                  .build();
                  };
              }
              
              @Bean
              public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
                  return loan -> {
                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());

                      if (loan.getAmount() > MAX_AMOUNT) {
                          loan.setStatus(Status.DECLINED.name());
                          streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
                      } else {
                          loan.setStatus(Status.APPROVED.name());
                          streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
                      }

                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
                  };
              }

          }


          LoanCheckerRouter.java,將路由條件統一在此處
          package com.paul.testspringcloudstream.loancheck.router;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;

          public class LoanCheckerRouter implements MessageRoutingCallback{
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);

              @Override
              public String functionDefinition(Message<?> message) {
                  
          //        byte[] resultByte = (byte[])message.getPayload();
          //        String resultString = new String(resultByte);
          //        
          //        return "loanDeclinedConsumer";
                  
                  Loan result = (Loan)message.getPayload();
                  
                  log.info("Loan status: {}", result.getStatus());
                  
                  return Status.DECLINED.name().equals(result.getStatus()) ? 
                              "loanDeclinedConsumer" : "loanApprovedConsumer";
              }

          }

          posted on 2021-11-15 14:46 paulwong 閱讀(313) 評論(0)  編輯  收藏 所屬分類: SPRING CLOUD

          主站蜘蛛池模板: 松滋市| 偏关县| 江达县| 泰兴市| 固镇县| 个旧市| 肥城市| 石渠县| 广南县| 太保市| 柳河县| 溧水县| 通化市| 淮南市| 清河县| 会宁县| 垣曲县| 盐池县| 广宁县| 兰西县| 曲松县| 泰顺县| 湾仔区| 齐河县| 霍州市| 松潘县| 崇文区| 鹤山市| 和政县| 安庆市| 龙州县| 江华| 淮南市| 梅州市| 达州市| 德令哈市| 宁夏| 治县。| 策勒县| 甘谷县| 资中县|