paulwong

          SPRING INTEGRATION - 集群選主、分布式鎖

          集群通常是有多個相同的實例,但對于定時任務場景,只希望有一個實例工作即可,如果這個實例掛了,其他實例可以頂替。

          這個問題的方案則是集群選主,一個集群中,只有一個LEADER,由LEADER負責執行定時任務工作。當LEADER被取消時,會在剩下的實例中再選LEADER。

          持有分布式鎖的實例則是LEADER。

          SPRING INTEGRATION JDBC 則已提供相關功能。

          pom.xml
                  <dependency>
                     <groupId>org.springframework.integration</groupId>
                     <artifactId>spring-integration-jdbc</artifactId>
                  </dependency>

                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-jdbc</artifactId>
                  </dependency>

                  <dependency>
                     <groupId>org.flywaydb</groupId>
                     <artifactId>flyway-core</artifactId>
                  </dependency>
                  
                  <dependency>
                      <groupId>org.mariadb.jdbc</groupId>
                      <artifactId>mariadb-java-client</artifactId>
                  </dependency>

          LeaderElectionIntegrationConfig.java
          import java.util.List;
          import java.util.concurrent.CopyOnWriteArrayList;

          import javax.sql.DataSource;

          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.jdbc.lock.DefaultLockRepository;
          import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
          import org.springframework.integration.jdbc.lock.LockRepository;
          import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;

          import com.paul.integration.leader.ControlBusGateway;
          import com.paul.integration.leader.MyCandidate;

          @Configuration
          public class LeaderElectionIntegrationConfig {
              
              @Bean
              public List<String> needToStartupAdapterList(){
                  return new CopyOnWriteArrayList<>();
              }
              
              @Bean
              public DefaultLockRepository defaultLockRepository(DataSource dataSource){
                  DefaultLockRepository defaultLockRepository =
                          new DefaultLockRepository(dataSource);
          //        defaultLockRepository.setTimeToLive(60_000);
                  return defaultLockRepository;
              }

              @Bean
              public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
                  return new JdbcLockRegistry(lockRepository);
              }
              
              @Bean
              public MyCandidate myCandidate(
                  ControlBusGateway controlBusGateway,
                  List<String> needToStartupAdapterList
              ) {
                  return new MyCandidate(controlBusGateway, needToStartupAdapterList);
              }
              
              @Bean
              public LockRegistryLeaderInitiator leaderInitiator() {
                  return new LockRegistryLeaderInitiator(
                              jdbcLockRegistry(null), myCandidate(nullnull)
                         );
              }
              
              
          }


          MyCandidate.java
          import java.util.List;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.integration.leader.Context;
          import org.springframework.integration.leader.DefaultCandidate;

          import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;

          public class MyCandidate extends DefaultCandidate{
              
              private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
              
              private List<String> needToStartupAdapterList;
              
              private ControlBusGateway controlBusGateway;
              
              public MyCandidate(
                  ControlBusGateway controlBusGateway,
                  List<String> needToStartupAdapterList
              ) {
                  this.controlBusGateway = controlBusGateway;
                  this.needToStartupAdapterList = needToStartupAdapterList;
              }
              
              @Override
              public void onGranted(Context context) {
                  super.onGranted(context);
                  LOG.info("*** Leadership granted ***");
                  LOG.info("STARTING MONGODB POLLER");
                  needToStartupAdapterList
                      .forEach(
                          c -> {
          //                    c = "@'testIntegrationFlow.org.springframework.integration.config."
          //                            + "SourcePollingChannelAdapterFactoryBean#0'";
                              String command = c + ".start()";
                              LOG.info("-----{}", command);
                              controlBusGateway.sendCommand(command);
                          }
                       );
                  LOG.info("STARTUP MESSAGE SENT");

              }

              @Override
              public void onRevoked(Context context) {
                  super.onRevoked(context);
                  LOG.info("*** Leadership revoked ***");
                  LOG.info("STOPPING MONGODB POLLER");
                  needToStartupAdapterList
                      .forEach(
                          c -> {
          //                    c = "@'testIntegrationConfig.testIntegrationFlow."
          //                            + "mongoMessageSource.inboundChannelAdapter'";
                              String command = c + ".stop()";
                              LOG.info("-----{}", command);
          //                    controlBusGateway.sendCommand(command);
                          }
                       );
                  LOG.info("SHUTDOWN MESSAGE SENT");
              }

          }


          ControlBusIntegrationConfig.java
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.dsl.MessageChannels;
          import org.springframework.integration.gateway.GatewayProxyFactoryBean;
          import org.springframework.integration.handler.LoggingHandler;
          import org.springframework.messaging.MessageChannel;

          import com.paul.integration.gateway.ControlBusGateway;

          @Configuration
          public class ControlBusIntegrationConfig {
              
              @Bean
              public MessageChannel controlBusChannel() {
                  return MessageChannels.direct().get();
              }
              
              @Bean
              public IntegrationFlow controlBusFlow() {
                  return IntegrationFlows.from(controlBusChannel())
                              .log(LoggingHandler.Level.INFO, "controlBusChannel")
                              .controlBus()
                              .get();
              }
              
              @Bean
              public GatewayProxyFactoryBean controlBusGateway() {
                  GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
                  gateway.setDefaultRequestChannel(controlBusChannel());
                  gateway.setDefaultRequestTimeout(300l);
                  gateway.setDefaultReplyTimeout(300l);
                  return gateway;
              }
              
          }


          ControlBusGateway.java
          public interface ControlBusGateway {
              
              public void sendCommand(String command);

          }


          各個應用實例運行時,其中的LockRegistryLeaderInitiator會自動運行,搶奪LEADER數據,最終只有一個實例奪取。之后再執行MyCandidate中的代碼。







          posted on 2022-01-20 13:49 paulwong 閱讀(551) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATION

          主站蜘蛛池模板: 微博| 安国市| 乐安县| 通渭县| 思茅市| 娄烦县| 雷州市| 奎屯市| 平遥县| 卢湾区| 堆龙德庆县| 开封市| 石嘴山市| 嵊州市| 宁都县| 长春市| 泗阳县| 彰武县| 盖州市| 密云县| 宣汉县| 蓝山县| 鹤岗市| 都安| 济阳县| 贺兰县| 连州市| 新闻| 田东县| 阜康市| 兴化市| 永福县| 时尚| 宝清县| 长汀县| 南宁市| 南丹县| 建水县| 德格县| 荔浦县| 龙门县|