paulwong

          SPRING REACTOR 使用樣例

          SpringReactorTest.java

          package com.paul.testreactivestream.reactor;


          import java.util.List;

          import org.junit.jupiter.api.Test;

          import reactor.core.publisher.Flux;
          import reactor.core.publisher.Mono;
          import reactor.core.scheduler.Schedulers;

          public class SpringReactorTest {
              
              private void subscribeAndEnd(Flux<?> flux) {
                  
                  flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                      .subscribe(System.out::println);
                  
                  flux.blockLast();
              }
              
              @Test
              public void createAFlux_just() throws InterruptedException {
                  Flux<String> fruitFlux = 
                          Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
                              .log()
                              ;
                  fruitFlux.subscribe(
                               f -> System.out.println(
                                           String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
                                       )
                            )
                           ;
                  fruitFlux.blockLast();
                  
          //        Thread.currentThread().join();
              }
              
              @Test
              public void zipFluxesToObject() {
                  Flux<String> characterFlux = 
                          Flux.just("Garfield", "Kojak", "Barbossa");
                  
                  Flux<String> foodFlux = 
                          Flux.just("Lasagna", "Lollipops", "Apples");
                  
                  Flux<String> zippedFlux = 
                          Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
                  
                  this.subscribeAndEnd(zippedFlux);
              }
              
              @Test
              public void map() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .map(n -> {
                                  String[] split = n.split("\\s");
                                  return new Player(split[0], split[1]);
                              })
                              ;
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void flatMap() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .flatMap(
                                  n -> Mono.just(n)
                                           .map(p -> {
                                              String[] split = p.split("\\s");
                                              return new Player(split[0], split[1]);
                                            })
                                           .subscribeOn(Schedulers.parallel())
                               );
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void buffer() {
                  Flux<List<String>> fruitFlux = 
                          Flux.just(
                                  "apple", "orange", "banana", "kiwi", "strawberry"
                               )
                              .buffer(3);
                  this.subscribeAndEnd(fruitFlux);
              }
              
              @Test
              public void bufferAsyn() {
                  Flux<String> flux =
                      Flux.just(
                              "apple", "orange", "banana", "kiwi", "strawberry"
                           )
                          .buffer(3)
                          .flatMap(x ->
                              Flux.fromIterable(x)
                                  .map(y -> y.toUpperCase())
                                  .subscribeOn(Schedulers.parallel())
              //                    .log()
                           );
                  this.subscribeAndEnd(flux);
              }
              
              @Test
              public void all() {
                  Mono<Boolean> animalFlux = 
                          Flux.just(
                                  "aardvark", "elephant", "koala", "eagle", "kangaroo"
                               )
                              .all(c -> c.contains("a"))
                              ;
                  animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                            .subscribe(System.out::println);
              
              }

          }

          posted on 2021-11-23 13:59 paulwong 閱讀(338) 評(píng)論(0)  編輯  收藏 所屬分類: REACTIVE STREAMS


          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 来凤县| 宁陕县| 合川市| 赤水市| 石棉县| 紫云| 蒲江县| 凤庆县| 满洲里市| 平乡县| 临泉县| 邵阳市| 龙川县| 浪卡子县| 麻江县| 宁陵县| 米易县| 始兴县| 廉江市| 灵璧县| 绍兴市| 承德县| 大新县| 荥阳市| 安国市| 长宁县| 襄垣县| 喜德县| 鄂托克旗| 麻城市| 赞皇县| 湛江市| 杂多县| 蚌埠市| 北流市| 黎平县| 荣昌县| 新化县| 盐源县| 溆浦县| 卫辉市|