paulwong

          #

          SPRING REACTOR 使用樣例

          SpringReactorTest.java

          package com.paul.testreactivestream.reactor;


          import java.util.List;

          import org.junit.jupiter.api.Test;

          import reactor.core.publisher.Flux;
          import reactor.core.publisher.Mono;
          import reactor.core.scheduler.Schedulers;

          public class SpringReactorTest {
              
              private void subscribeAndEnd(Flux<?> flux) {
                  
                  flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                      .subscribe(System.out::println);
                  
                  flux.blockLast();
              }
              
              @Test
              public void createAFlux_just() throws InterruptedException {
                  Flux<String> fruitFlux = 
                          Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
                              .log()
                              ;
                  fruitFlux.subscribe(
                               f -> System.out.println(
                                           String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
                                       )
                            )
                           ;
                  fruitFlux.blockLast();
                  
          //        Thread.currentThread().join();
              }
              
              @Test
              public void zipFluxesToObject() {
                  Flux<String> characterFlux = 
                          Flux.just("Garfield", "Kojak", "Barbossa");
                  
                  Flux<String> foodFlux = 
                          Flux.just("Lasagna", "Lollipops", "Apples");
                  
                  Flux<String> zippedFlux = 
                          Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
                  
                  this.subscribeAndEnd(zippedFlux);
              }
              
              @Test
              public void map() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .map(n -> {
                                  String[] split = n.split("\\s");
                                  return new Player(split[0], split[1]);
                              })
                              ;
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void flatMap() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .flatMap(
                                  n -> Mono.just(n)
                                           .map(p -> {
                                              String[] split = p.split("\\s");
                                              return new Player(split[0], split[1]);
                                            })
                                           .subscribeOn(Schedulers.parallel())
                               );
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void buffer() {
                  Flux<List<String>> fruitFlux = 
                          Flux.just(
                                  "apple", "orange", "banana", "kiwi", "strawberry"
                               )
                              .buffer(3);
                  this.subscribeAndEnd(fruitFlux);
              }
              
              @Test
              public void bufferAsyn() {
                  Flux<String> flux =
                      Flux.just(
                              "apple", "orange", "banana", "kiwi", "strawberry"
                           )
                          .buffer(3)
                          .flatMap(x ->
                              Flux.fromIterable(x)
                                  .map(y -> y.toUpperCase())
                                  .subscribeOn(Schedulers.parallel())
              //                    .log()
                           );
                  this.subscribeAndEnd(flux);
              }
              
              @Test
              public void all() {
                  Mono<Boolean> animalFlux = 
                          Flux.just(
                                  "aardvark", "elephant", "koala", "eagle", "kangaroo"
                               )
                              .all(c -> c.contains("a"))
                              ;
                  animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                            .subscribe(System.out::println);
              
              }

          }

          posted @ 2021-11-23 13:59 paulwong 閱讀(338) | 評論 (0)編輯 收藏

          SPRING REACTOR資源

          Spring Reactor 入門與實踐
          https://www.jianshu.com/p/7ee89f70dfe5

          Project Reactor 核心原理解析
          https://www.jianshu.com/p/df395eb28f69

          Project Reactor 之 publishOn 與 subscribeOn
          https://www.jianshu.com/p/769f6e9824fb

          Spring響應(yīng)式編程
          https://blog.csdn.net/get_set/category_7484996.html









          posted @ 2021-11-23 13:51 paulwong 閱讀(117) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM - @PollableBean for Reactive Suppliers

          Supplier beans, or functions that only publish messages in Spring Cloud Stream, are a bit special in that they aren't triggered by the receiving of events like Function or Consumer beans. This means that you often need a way to trigger them to be executed periodically.

          For imperative functions the framework by default "polls" a Supplier function every 1 second, but that duration is configurable using the spring.cloud.stream.poller.fixed-delay property.

          However, for reactive functions supplying a Flux it is only triggered once by default. This is because a Flux itself is potentially an infinite stream of events so in many cases it will only need to be triggered once. But don't worry, if you want to periodically trigger a reactive Supplier because you are producing a finite stream of events you can still do so using @PollableBean. This annotation then allows you to configure how often the function is triggered using the same spring.cloud.stream.poller.fixed-delay property!

          One example use case here could be periodically querying a data store and publishing each entry/row as an event. The number of rows in your data store is a finite number at any given time.

          Example code:

          @PollableBean 
          public Supplier<Flux<String>> stringSupplier() { return () -> Flux.just("foo","bar","baz"); }


          Reference:
          https://solace.community/discussion/360/pollablebean-for-reactive-suppliers-in-spring-cloud-stream

          posted @ 2021-11-23 10:03 paulwong 閱讀(249) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM - 從非SCS組件發(fā)送消息到SCS組件

          在SPRING INTEGRATION中,如果要從非SPRING INTEGRATION代碼發(fā)送MESSAGE到SPRING INTEGRATION程序,通常用BUS GATEWAY。

          那么在SPRING CLOUD STREAM中,如果要從非SPRING CLOUD STREAM代碼發(fā)送MESSAGE到SPRING CLOUD STREAM程序,通常就要先通知框架自動生成一個SOURCE。

          application.property
          spring.cloud.stream.source=supplier
          spring.cloud.stream.bindings.supplier-out-0.destination=notification-events

          java
          streamBridge.send("supplier-out-0", userDto);

          Reference:
          https://blog.devgenius.io/event-driven-microservices-with-spring-cloud-stream-e034eee3f394

          posted @ 2021-11-19 11:47 paulwong 閱讀(257) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM - Error Handling

          如果Function中拋出異常,系統(tǒng)沒有配置捕獲異常,則異常消息會被丟棄。通常會進行配置。

          @ServiceActivator(inputChannel = "my-destination.my-group.errors")
              public void handleError(ErrorMessage message) {
                  Throwable throwable = message.getPayload();
                  log.error("截獲異常", throwable);

                  Message<?> originalMessage = message.getOriginalMessage();
                  assert originalMessage != null;

                  log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
              }

          詳情參考:
          https://www.itmuch.com/spring-cloud/spring-cloud-stream-error-handling/

          posted @ 2021-11-17 10:50 paulwong 閱讀(241) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM - Function Component

          如果要在SPRING CLOUD STREAM中和其他中間件打交道,如FILE、FTP、HTTP等,則要用到SPRING CLOUD FUNCTION。

          組件地址:
          https://github.com/spring-cloud/stream-applications/tree/main/functions

          特殊組件,將FUNCTION變成HTTP ENDPOINTS:
          https://github.com/spring-cloud/spring-cloud-function/tree/main/spring-cloud-starter-function-web
          https://github.com/spring-cloud/spring-cloud-function/tree/main/spring-cloud-starter-function-webflux

          posted @ 2021-11-15 17:40 paulwong 閱讀(221) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM - Routing Function

          SPRING CLOUD STREAM內(nèi)置了一個RoutingFunction,能將MESSAGE路由到應(yīng)用的其他FUNCTION中。
          對接RoutingFunction可發(fā)送消息到其外部DESTINATION中或用“|”連接符連接。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          #
           spring.cloud.stream.poller.fixed-delay=1000
          #
           DefaultPollerProperties

          # This setting can control which function method in our code will be triggered if there are multiple
          #
           spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name
          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                #BindingServiceProperties
                stream:
                   #StreamFunctionProperties
                   function:
                      definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
                                  loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
                      routing:
                         enabled: true
                   #BindingProperties
                   bindings:
                      loanCheckerProcessor|functionRouter-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                         
                      loanApprovedConsumer-in-0:
                         destination: load.approved
                         binder: local_rabbit
                      loanDeclinedConsumer-in-0:
                         destination: load.declined
                         binder: local_rabbit
                         
                      loanCheckerDecieder-in-0:
                         destination: queue.pretty.log.messages.222
                         binder: local_rabbit
                      loanCheckerDecieder-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                      loanCheckerConsumer-in-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host
                                  
                                  
          logging:
             level:
                root: info
                org.springframework:
                   cloud.function: debug
                   #retry: debug


          LoanCheckConfiguration.java
          package com.paul.testspringcloudstream.loancheck.config;

          import java.util.function.Consumer;
          import java.util.function.Function;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.cloud.stream.function.StreamBridge;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.support.MessageBuilder;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;
          import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
          import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
          import com.paul.testspringcloudstream.loancheck.service.LoanService;

          @Configuration
          public class LoanCheckConfiguration {
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
              private static final Long MAX_AMOUNT = 10000L;
              private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
              
              @Autowired
              public void test(Consumer<Loan> loanCheckerConsumer) {
                  log.info("{}", loanCheckerConsumer.getClass());
              }
              
              
              @Bean
              public Consumer<Loan> loanCheckerConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanDeclinedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public Consumer<Loan> loanApprovedConsumer(){
                  return loan -> 
                      log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
              }
              
              @Bean
              public MessageRoutingCallback loanCheckerRouter() {
                  return new LoanCheckerRouter();
              }
              
              @Bean
              public Function<Loan, Loan> loanCheckerProcessor(
                  LoanService loanService
              ){
                  return loan -> loanService.check(loan);
              }
              
              @Bean
              public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
                  LoanService loanService
              ){
                  return loan -> {
                      Loan result = loanService.check(loan);
                      String sendTo = Status.DECLINED.name().equals(result.getStatus()) ? 
                                  LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
                      
                      return MessageBuilder.withPayload(result)
                                  .setHeader("spring.cloud.stream.sendto.destination", sendTo)
                                  .build();
                  };
              }
              
              @Bean
              public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
                  return loan -> {
                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());

                      if (loan.getAmount() > MAX_AMOUNT) {
                          loan.setStatus(Status.DECLINED.name());
                          streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
                      } else {
                          loan.setStatus(Status.APPROVED.name());
                          streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
                      }

                      log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
                  };
              }

          }


          LoanCheckerRouter.java,將路由條件統(tǒng)一在此處
          package com.paul.testspringcloudstream.loancheck.router;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.cloud.function.context.MessageRoutingCallback;
          import org.springframework.messaging.Message;

          import com.paul.testspringcloudstream.common.model.Loan;
          import com.paul.testspringcloudstream.common.model.Status;

          public class LoanCheckerRouter implements MessageRoutingCallback{
              
              private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);

              @Override
              public String functionDefinition(Message<?> message) {
                  
          //        byte[] resultByte = (byte[])message.getPayload();
          //        String resultString = new String(resultByte);
          //        
          //        return "loanDeclinedConsumer";
                  
                  Loan result = (Loan)message.getPayload();
                  
                  log.info("Loan status: {}", result.getStatus());
                  
                  return Status.DECLINED.name().equals(result.getStatus()) ? 
                              "loanDeclinedConsumer" : "loanApprovedConsumer";
              }

          }

          posted @ 2021-11-15 14:46 paulwong 閱讀(314) | 評論 (0)編輯 收藏

          RABBITMQ增加消息追蹤

          https://blog.csdn.net/as140507/article/details/101104900

          posted @ 2021-11-15 14:08 paulwong 閱讀(142) | 評論 (0)編輯 收藏

          EVEN DRIVEN - SPRING CLOUD STREAM 3.x - Functional Programming Model

          SPRING CLOUD STREAM 3.x 版本時,之前的一些編程模式,如@Enablebindding,@StreamListenner等注釋被廢棄了,這是由于一些框架的代碼必需由用戶編寫,如配置框架用的Input MessageChannel,Output  MessageChannel,連接MessageHandler與MessageChannel等,被視為不必要的動作。為了簡化用戶代碼,于是推出Functional Programming Model。

          引入了新名詞:Supplier、Function與Consumer。實際上這幾個類可視為Adapter,如果之前已經(jīng)有存在的Service類,且方法名為各種各樣,可以重新包裝成Supplier、Function與Consumer,并在固定的方法名:apply/get/accept中調(diào)用Service的方法。

          Supplier

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Output  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對應(yīng)的Destination Name,即可向BROKER發(fā)消息了。

          Consumer

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對應(yīng)的Destination Name,即可收到BROKER推送關(guān)于此Destination的消息了。

          Function

          當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input和Output  MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中分別對Input和Output MessageChannel加入對應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息了。

          與SPRING INTEGRATION的整合

          如果要對消息進行復(fù)雜處理,如拆分消息、聚合消息、IF ELSE消息等,就要借助SPRING INTEGRATION了。

          @Bean
              public IntegrationFlow upperCaseFlow(LoanService loanService) {
                  return IntegrationFlows
                              //turn this IntegrationFlow as a gateway, here is a Function interface 
                              
          //with loadCheckerFunction as bean name
                              .from(LoadCheckerFunction.class, gateway -> gateway.beanName("loadCheckerFunction"))
                              .handle(loanService, "check")
                              .logAndReply(LoggingHandler.Level.WARN);
              }

              public interface LoadCheckerFunction extends Function<Loan, Loan>{

              }

          IntegrationFlows.from(Class<?> serviceInterface)是可以將本IntegrationFlow包裝成serviceInterface的實現(xiàn)類,如果調(diào)用此接口,最終會返回IntegrationFlow最后一個步驟的實體,如果這個serviceInterface是Function的話,剛好和SPRING CLOUD STREAM對接上。

          后續(xù)在spring.cloud.stream.function.definition加入此Bean的名稱loadCheckerFunction,SPRING CLOUD STREAM就會幫你生成一個Input和Output  MessageChannel,并連接上此Bean,再在BINDDING中分別對Input和Output MessageChannel加入對應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息。

          application.yaml
          # This setting can increase or decrease the rate of message production (1000 = 1s)
          # spring.cloud.stream.poller.fixed-delay=1000

          # This setting can control which function method in our code will be triggered if there are multiple
          # spring.cloud.function.definition=supplyLoan

          # Give the autogenerated binding a friendlier name

          spring:
             application:
                name: loan-check-rabbit
             banner:
                location: classpath:/banner-rabbit.txt
             cloud:
                stream:
                   function.definition: loadCheckerFunction
                   #BindingProperties
                   bindings:
                      loadCheckerFunction-in-0:
                         destination: queue.pretty.log.messages
                         binder: local_rabbit
                      loadCheckerFunction-out-0:
                         destination: queue.pretty.approved.messages
                         binder: local_rabbit
                   #BinderProperties
                   binders:
                      local_rabbit:
                         type: rabbit
                         environment:
                            spring:
                               rabbitmq:
                                  host: 10.80.27.69
                                  port: 5672
                                  username: guest
                                  password: guest
                                  virtual-host: my-virtual-host

          Reference

          https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration

          posted @ 2021-11-10 15:10 paulwong 閱讀(397) | 評論 (0)編輯 收藏

          在CENTOS LINUX上安裝RABBITMQ

          安裝ERLANG

          從這里下載0依賴的ERLANG安裝包:
          https://github.com/rabbitmq/erlang-rpm/releases 
          象這種erlang-23.3.4.8-1.el7.x86_64.rpm含el7的是CENTOS7版本,含el8的是CENTOS8版本,安裝腳本
          yum install -y erlang-23.3.4.8-1.el7.x86_64.rpm

          安裝RABBITMQ

          下載地址:https://github.com/rabbitmq/rabbitmq-server/releases
          安裝腳本:yum install -y erlang-23.3.4.8-1.el7.x86_64.rpm

          拷貝配置文件

          下載配置文件樣例:https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/rabbitmq.conf.example
          粘貼并重命名文件:/etc/rabbitmq/rabbitmq.conf

          開啟WEB控制臺

          /lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management

          配置guest可遠程訪問

          ## Uncomment the following line if you want to allow access to the
          ## guest user from anywhere on the network.
          loopback_users.guest = false

          配置開機啟動

          chkconfig rabbitmq-server on

          啟動實例

          systemctl start rabbitmq-serve
          systemctl stop rabbitmq-serve

          訪問控制臺,guest/guest

          http://10.80.27.69:15672/#/

          Reference
          https://www.cnblogs.com/ZhuChangwu/p/14093107.html
          https://juejin.cn/post/6933040530519506957

          posted @ 2021-11-08 09:27 paulwong 閱讀(245) | 評論 (0)編輯 收藏

          僅列出標(biāo)題
          共115頁: First 上一頁 5 6 7 8 9 10 11 12 13 下一頁 Last 
          主站蜘蛛池模板: 屏南县| 潢川县| 太仓市| 西吉县| 新丰县| 吴堡县| 礼泉县| 共和县| 旬阳县| 新兴县| 福安市| 桑日县| 洛阳市| 沙河市| 稻城县| 定边县| 鄂托克前旗| 浪卡子县| 鄂伦春自治旗| 怀集县| 综艺| 黔西县| 临猗县| 襄城县| 呼玛县| 五华县| 德州市| 蒲城县| 佛教| 邯郸县| 房产| 自治县| 兴海县| 衡东县| 宁化县| 内丘县| 六安市| 宕昌县| 吉安市| 项城市| 绥德县|