paulwong

          SPRING REACTOR 使用樣例

          SpringReactorTest.java

          package com.paul.testreactivestream.reactor;


          import java.util.List;

          import org.junit.jupiter.api.Test;

          import reactor.core.publisher.Flux;
          import reactor.core.publisher.Mono;
          import reactor.core.scheduler.Schedulers;

          public class SpringReactorTest {
              
              private void subscribeAndEnd(Flux<?> flux) {
                  
                  flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                      .subscribe(System.out::println);
                  
                  flux.blockLast();
              }
              
              @Test
              public void createAFlux_just() throws InterruptedException {
                  Flux<String> fruitFlux = 
                          Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
                              .log()
                              ;
                  fruitFlux.subscribe(
                               f -> System.out.println(
                                           String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
                                       )
                            )
                           ;
                  fruitFlux.blockLast();
                  
          //        Thread.currentThread().join();
              }
              
              @Test
              public void zipFluxesToObject() {
                  Flux<String> characterFlux = 
                          Flux.just("Garfield", "Kojak", "Barbossa");
                  
                  Flux<String> foodFlux = 
                          Flux.just("Lasagna", "Lollipops", "Apples");
                  
                  Flux<String> zippedFlux = 
                          Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
                  
                  this.subscribeAndEnd(zippedFlux);
              }
              
              @Test
              public void map() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .map(n -> {
                                  String[] split = n.split("\\s");
                                  return new Player(split[0], split[1]);
                              })
                              ;
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void flatMap() {
                  Flux<Player> playerFlux = 
                          Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
                              .flatMap(
                                  n -> Mono.just(n)
                                           .map(p -> {
                                              String[] split = p.split("\\s");
                                              return new Player(split[0], split[1]);
                                            })
                                           .subscribeOn(Schedulers.parallel())
                               );
                  this.subscribeAndEnd(playerFlux);
              }
              
              @Test
              public void buffer() {
                  Flux<List<String>> fruitFlux = 
                          Flux.just(
                                  "apple", "orange", "banana", "kiwi", "strawberry"
                               )
                              .buffer(3);
                  this.subscribeAndEnd(fruitFlux);
              }
              
              @Test
              public void bufferAsyn() {
                  Flux<String> flux =
                      Flux.just(
                              "apple", "orange", "banana", "kiwi", "strawberry"
                           )
                          .buffer(3)
                          .flatMap(x ->
                              Flux.fromIterable(x)
                                  .map(y -> y.toUpperCase())
                                  .subscribeOn(Schedulers.parallel())
              //                    .log()
                           );
                  this.subscribeAndEnd(flux);
              }
              
              @Test
              public void all() {
                  Mono<Boolean> animalFlux = 
                          Flux.just(
                                  "aardvark", "elephant", "koala", "eagle", "kangaroo"
                               )
                              .all(c -> c.contains("a"))
                              ;
                  animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
                            .subscribe(System.out::println);
              
              }

          }

          posted on 2021-11-23 13:59 paulwong 閱讀(338) 評論(0)  編輯  收藏 所屬分類: REACTIVE STREAMS


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 清水河县| 茌平县| 营口市| 莱阳市| 万源市| 东港市| 察隅县| 抚顺市| 茶陵县| 双流县| 新龙县| 惠东县| 荆州市| 卢氏县| 光泽县| 灌南县| 前郭尔| 沙河市| 西藏| 永昌县| 岗巴县| 潜江市| 寿光市| 库车县| 江孜县| 北安市| 图木舒克市| 连城县| 称多县| 锡林郭勒盟| 沙坪坝区| 娄烦县| 自治县| 泊头市| 娄底市| 合川市| 蒙自县| 清涧县| 萨嘎县| 夹江县| 岳阳市|