paulwong

          SPRING REACTOR 使用樣例

          SpringReactorTest.java

          package com.paul.testreactivestream.reactor;


          import java.util.List;

          import org.junit.jupiter.api.Test;

          import reactor.core.publisher.Flux;
          import reactor.core.publisher.Mono;
          import reactor.core.scheduler.Schedulers;

          public class SpringReactorTest {
              
              private void subscribeAndEnd(Flux<?> flux) {
                  
                  flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                      .subscribe(System.out::println);
                  
                  flux.blockLast();
              }
              
              @Test
              public void createAFlux_just() throws InterruptedException {
                  Flux<String> fruitFlux = 
                          Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
                              .log()
                              ;
                  fruitFlux.subscribe(
                               f -> System.out.println(
                                           String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
                                       )
                            )
                           ;
                  fruitFlux.blockLast();
                  
          //        Thread.currentThread().join();
              }
              
              @Test
              public void zipFluxesToObject() {
                  Flux<String> characterFlux = 
                          Flux.just("Garfield", "Kojak", "Barbossa");
                  
                  Flux<String> foodFlux = 
                          Flux.just("Lasagna", "Lollipops", "Apples");
                  
                  Flux<String> zippedFlux = 
                          Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
                  
                  this.subscribeAndEnd(zippedFlux);
              }
              
              @Test
              public void map() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .map(n -> {
                                  String[] split = n.split("\\s");
                                  return new Player(split[0], split[1]);
                              })
                              ;
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void flatMap() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .flatMap(
                                  n -> Mono.just(n)
                                           .map(p -> {
                                              String[] split = p.split("\\s");
                                              return new Player(split[0], split[1]);
                                            })
                                           .subscribeOn(Schedulers.parallel())
                               );
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void buffer() {
                  Flux<List<String>> fruitFlux = 
                          Flux.just(
                                  "apple", "orange", "banana", "kiwi", "strawberry"
                               )
                              .buffer(3);
                  this.subscribeAndEnd(fruitFlux);
              }
              
              @Test
              public void bufferAsyn() {
                  Flux<String> flux =
                      Flux.just(
                              "apple", "orange", "banana", "kiwi", "strawberry"
                           )
                          .buffer(3)
                          .flatMap(x ->
                              Flux.fromIterable(x)
                                  .map(y -> y.toUpperCase())
                                  .subscribeOn(Schedulers.parallel())
              //                    .log()
                           );
                  this.subscribeAndEnd(flux);
              }
              
              @Test
              public void all() {
                  Mono<Boolean> animalFlux = 
                          Flux.just(
                                  "aardvark", "elephant", "koala", "eagle", "kangaroo"
                               )
                              .all(c -> c.contains("a"))
                              ;
                  animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                            .subscribe(System.out::println);
              
              }

          }

          posted on 2021-11-23 13:59 paulwong 閱讀(341) 評論(0)  編輯  收藏 所屬分類: REACTIVE STREAMS

          主站蜘蛛池模板: 宕昌县| 留坝县| 赤峰市| 平塘县| 湘潭市| 江西省| 广西| 泽库县| 丹东市| 余庆县| 青河县| 礼泉县| 临洮县| 芒康县| 确山县| 福鼎市| 新建县| 绥中县| 漳州市| 托克托县| 通州区| 江都市| 金秀| 安阳市| 鄂尔多斯市| 合山市| 遵义市| 北安市| 井研县| 江山市| 革吉县| 舞钢市| 青神县| 土默特右旗| 资阳市| 凤台县| 南部县| 金堂县| 霍林郭勒市| 潮州市| 云安县|