#
SpringReactorTest.java
package com.paul.testreactivestream.reactor;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class SpringReactorTest {
private void subscribeAndEnd(Flux<?> flux) {
flux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
.subscribe(System.out::println);
flux.blockLast();
}
@Test
public void createAFlux_just() throws InterruptedException {
Flux<String> fruitFlux =
Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry")
.log()
;
fruitFlux.subscribe(
f -> System.out.println(
String.format("[%s] Here's some fruit: %s", Thread.currentThread().getName(), f)
)
)
;
fruitFlux.blockLast();
// Thread.currentThread().join();
}
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux =
Flux.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux =
Flux.just("Lasagna", "Lollipops", "Apples");
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
this.subscribeAndEnd(zippedFlux);
}
@Test
public void map() {
Flux<Player> playerFlux =
Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
})
;
this.subscribeAndEnd(playerFlux);
}
@Test
public void flatMap() {
Flux<Player> playerFlux =
Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(
n -> Mono.just(n)
.map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
})
.subscribeOn(Schedulers.parallel())
);
this.subscribeAndEnd(playerFlux);
}
@Test
public void buffer() {
Flux<List<String>> fruitFlux =
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
)
.buffer(3);
this.subscribeAndEnd(fruitFlux);
}
@Test
public void bufferAsyn() {
Flux<String> flux =
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
)
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
// .log()
);
this.subscribeAndEnd(flux);
}
@Test
public void all() {
Mono<Boolean> animalFlux =
Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo"
)
.all(c -> c.contains("a"))
;
animalFlux.map(c -> String.format("[%s] %s", Thread.currentThread().getName(), c))
.subscribe(System.out::println);
}
}
Supplier
beans, or functions that only publish messages in Spring Cloud Stream, are a bit special in that they aren't triggered by the receiving of events like Function
or Consumer
beans. This means that you often need a way to trigger them to be executed periodically.
For imperative functions the framework by default "polls" a Supplier
function every 1 second, but that duration is configurable using the spring.cloud.stream.poller.fixed-delay
property.
However, for reactive functions supplying a Flux
it is only triggered once by default. This is because a Flux itself is potentially an infinite stream of events so in many cases it will only need to be triggered once. But don't worry, if you want to periodically trigger a reactive Supplier
because you are producing a finite stream of events you can still do so using @PollableBean
. This annotation then allows you to configure how often the function is triggered using the same spring.cloud.stream.poller.fixed-delay
property!
One example use case here could be periodically querying a data store and publishing each entry/row as an event. The number of rows in your data store is a finite number at any given time.
Example code:
@PollableBean
public Supplier<Flux<String>> stringSupplier() { return () -> Flux.just("foo","bar","baz"); }
Reference:
https://solace.community/discussion/360/pollablebean-for-reactive-suppliers-in-spring-cloud-stream
在SPRING INTEGRATION中,如果要從非SPRING INTEGRATION代碼發(fā)送MESSAGE到SPRING INTEGRATION程序,通常用BUS GATEWAY。
那么在SPRING CLOUD STREAM中,如果要從非SPRING CLOUD STREAM代碼發(fā)送MESSAGE到SPRING CLOUD STREAM程序,通常就要先通知框架自動生成一個SOURCE。
application.property
spring.cloud.stream.source=supplier
spring.cloud.stream.bindings.supplier-out-0.destination=notification-events
java
streamBridge.send("supplier-out-0", userDto);
Reference:
https://blog.devgenius.io/event-driven-microservices-with-spring-cloud-stream-e034eee3f394
如果Function中拋出異常,系統(tǒng)沒有配置捕獲異常,則異常消息會被丟棄。通常會進行配置。
@ServiceActivator(inputChannel = "my-destination.my-group.errors")
public void handleError(ErrorMessage message) {
Throwable throwable = message.getPayload();
log.error("截獲異常", throwable);
Message<?> originalMessage = message.getOriginalMessage();
assert originalMessage != null;
log.info("原始消息體 = {}", new String((byte[]) originalMessage.getPayload()));
}
詳情參考:
https://www.itmuch.com/spring-cloud/spring-cloud-stream-error-handling/
SPRING CLOUD STREAM內(nèi)置了一個RoutingFunction,能將MESSAGE路由到應(yīng)用的其他FUNCTION中。
對接RoutingFunction可發(fā)送消息到其外部DESTINATION中或用“|”連接符連接。
application.yaml
# This setting can increase or decrease the rate of message production (1000 = 1s)
# spring.cloud.stream.poller.fixed-delay=1000
# DefaultPollerProperties
# This setting can control which function method in our code will be triggered if there are multiple
# spring.cloud.function.definition=supplyLoan
# Give the autogenerated binding a friendlier name
spring:
application:
name: loan-check-rabbit
banner:
location: classpath:/banner-rabbit.txt
cloud:
#BindingServiceProperties
stream:
#StreamFunctionProperties
function:
definition: loadCheckerFunction;loanCheckerDecieder;loanCheckerConsumer;\
loanDeclinedConsumer;loanApprovedConsumer;loanCheckerProcessor|functionRouter
routing:
enabled: true
#BindingProperties
bindings:
loanCheckerProcessor|functionRouter-in-0:
destination: queue.pretty.log.messages
binder: local_rabbit
loanApprovedConsumer-in-0:
destination: load.approved
binder: local_rabbit
loanDeclinedConsumer-in-0:
destination: load.declined
binder: local_rabbit
loanCheckerDecieder-in-0:
destination: queue.pretty.log.messages.222
binder: local_rabbit
loanCheckerDecieder-out-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
loanCheckerConsumer-in-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
#BinderProperties
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 10.80.27.69
port: 5672
username: guest
password: guest
virtual-host: my-virtual-host
logging:
level:
root: info
org.springframework:
cloud.function: debug
#retry: debug
LoanCheckConfiguration.java
package com.paul.testspringcloudstream.loancheck.config;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;
import com.paul.testspringcloudstream.loancheck.router.LoanCheckerRouter;
import com.paul.testspringcloudstream.loancheck.service.LoanProcessor;
import com.paul.testspringcloudstream.loancheck.service.LoanService;
@Configuration
public class LoanCheckConfiguration {
private static final Logger log = LoggerFactory.getLogger(LoanCheckConfiguration.class);
private static final Long MAX_AMOUNT = 10000L;
private static final String LOG_PATTERN = "{} - {} {} for ${} for {}";
@Autowired
public void test(Consumer<Loan> loanCheckerConsumer) {
log.info("{}", loanCheckerConsumer.getClass());
}
@Bean
public Consumer<Loan> loanCheckerConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanCheckerConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public Consumer<Loan> loanDeclinedConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanDeclinedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public Consumer<Loan> loanApprovedConsumer(){
return loan ->
log.info(LOG_PATTERN, "loanApprovedConsumer", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
}
@Bean
public MessageRoutingCallback loanCheckerRouter() {
return new LoanCheckerRouter();
}
@Bean
public Function<Loan, Loan> loanCheckerProcessor(
LoanService loanService
){
return loan -> loanService.check(loan);
}
@Bean
public Function<Loan, Message<Loan>> loanCheckerProcessorBak(
LoanService loanService
){
return loan -> {
Loan result = loanService.check(loan);
String sendTo = Status.DECLINED.name().equals(result.getStatus()) ?
LoanProcessor.DECLINED_OUT : LoanProcessor.APPROVED_OUT;
return MessageBuilder.withPayload(result)
.setHeader("spring.cloud.stream.sendto.destination", sendTo)
.build();
};
}
@Bean
public Consumer<Loan> loanCheckerDecieder(StreamBridge streamBridge){
return loan -> {
log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Status.DECLINED.name());
streamBridge.send(LoanProcessor.DECLINED_OUT, "local_rabbit", loan);
} else {
loan.setStatus(Status.APPROVED.name());
streamBridge.send(LoanProcessor.APPROVED_OUT, "local_rabbit", loan);
}
log.info(LOG_PATTERN, "loanCheckerDecieder", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
};
}
}
LoanCheckerRouter.java,將路由條件統(tǒng)一在此處
package com.paul.testspringcloudstream.loancheck.router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.messaging.Message;
import com.paul.testspringcloudstream.common.model.Loan;
import com.paul.testspringcloudstream.common.model.Status;
public class LoanCheckerRouter implements MessageRoutingCallback{
private static final Logger log = LoggerFactory.getLogger(LoanCheckerRouter.class);
@Override
public String functionDefinition(Message<?> message) {
// byte[] resultByte = (byte[])message.getPayload();
// String resultString = new String(resultByte);
//
// return "loanDeclinedConsumer";
Loan result = (Loan)message.getPayload();
log.info("Loan status: {}", result.getStatus());
return Status.DECLINED.name().equals(result.getStatus()) ?
"loanDeclinedConsumer" : "loanApprovedConsumer";
}
}
SPRING CLOUD STREAM 3.x 版本時,之前的一些編程模式,如@Enablebindding,@StreamListenner等注釋被廢棄了,這是由于一些框架的代碼必需由用戶編寫,如配置框架用的Input MessageChannel,Output MessageChannel,連接MessageHandler與MessageChannel等,被視為不必要的動作。為了簡化用戶代碼,于是推出Functional Programming Model。
引入了新名詞:Supplier、Function與Consumer。實際上這幾個類可視為Adapter,如果之前已經(jīng)有存在的Service類,且方法名為各種各樣,可以重新包裝成Supplier、Function與Consumer,并在固定的方法名:apply/get/accept中調(diào)用Service的方法。
Supplier
當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Output MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對應(yīng)的Destination Name,即可向BROKER發(fā)消息了。
Consumer
當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中加入對應(yīng)的Destination Name,即可收到BROKER推送關(guān)于此Destination的消息了。
Function
當(dāng)在配置文件中注入此類型的Bean,并在spring.cloud.stream.function.definition加入此Bean的名稱,SPRING CLOUD STREAM就會幫你生成一個Input和Output MessageChannel,并連接上此Bean,后續(xù)只需要在BINDDING中分別對Input和Output MessageChannel加入對應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息了。
與SPRING INTEGRATION的整合
如果要對消息進行復(fù)雜處理,如拆分消息、聚合消息、IF ELSE消息等,就要借助SPRING INTEGRATION了。
@Bean
public IntegrationFlow upperCaseFlow(LoanService loanService) {
return IntegrationFlows
//turn this IntegrationFlow as a gateway, here is a Function interface
//with loadCheckerFunction as bean name
.from(LoadCheckerFunction.class, gateway -> gateway.beanName("loadCheckerFunction"))
.handle(loanService, "check")
.logAndReply(LoggingHandler.Level.WARN);
}
public interface LoadCheckerFunction extends Function<Loan, Loan>{
}
IntegrationFlows.from(Class<?> serviceInterface)是可以將本IntegrationFlow包裝成serviceInterface的實現(xiàn)類,如果調(diào)用此接口,最終會返回IntegrationFlow最后一個步驟的實體,如果這個serviceInterface是Function的話,剛好和SPRING CLOUD STREAM對接上。
后續(xù)在spring.cloud.stream.function.definition加入此Bean的名稱loadCheckerFunction,SPRING CLOUD STREAM就會幫你生成一個Input和Output MessageChannel,并連接上此Bean,再在BINDDING中分別對Input和Output MessageChannel加入對應(yīng)的Destination Name1/Name2,即可收到BROKER推送關(guān)于此Destination的消息,也可以向BROKER發(fā)消息。
application.yaml
# This setting can increase or decrease the rate of message production (1000 = 1s)
# spring.cloud.stream.poller.fixed-delay=1000
# This setting can control which function method in our code will be triggered if there are multiple
# spring.cloud.function.definition=supplyLoan
# Give the autogenerated binding a friendlier name
spring:
application:
name: loan-check-rabbit
banner:
location: classpath:/banner-rabbit.txt
cloud:
stream:
function.definition: loadCheckerFunction
#BindingProperties
bindings:
loadCheckerFunction-in-0:
destination: queue.pretty.log.messages
binder: local_rabbit
loadCheckerFunction-out-0:
destination: queue.pretty.approved.messages
binder: local_rabbit
#BinderProperties
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 10.80.27.69
port: 5672
username: guest
password: guest
virtual-host: my-virtual-host
Reference
https://spring.io/blog/2019/10/25/spring-cloud-stream-and-spring-integration
安裝ERLANG
從這里下載0依賴的ERLANG安裝包:
https://github.com/rabbitmq/erlang-rpm/releases 象這種erlang-23.3.4.8-1.el7.x86_64.rpm含el7的是CENTOS7版本,含el8的是CENTOS8版本,安裝腳本
yum install -y erlang-23.3.4.8-1.el7.x86_64.rpm
安裝RABBITMQ
下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases安裝腳本:
yum install -y erlang-23.3.4.8-1.el7.x86_64.rpm拷貝配置文件
下載配置文件樣例:
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/rabbitmq.conf.example粘貼并重命名文件:
/etc/rabbitmq/rabbitmq.conf開啟WEB控制臺
/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
配置guest可遠程訪問
## Uncomment the following line if you want to allow access to the
## guest user from anywhere on the network.
loopback_users.guest = false
配置開機啟動
chkconfig rabbitmq-server on
啟動實例
systemctl start rabbitmq-serve
systemctl stop rabbitmq-serve
訪問控制臺,guest/guest
http://10.80.27.69:15672/#/
Reference
https://www.cnblogs.com/ZhuChangwu/p/14093107.htmlhttps://juejin.cn/post/6933040530519506957