paulwong

          SPRING INTEGRATION - 集群選主、分布式鎖

          集群通常是有多個相同的實例,但對于定時任務(wù)場景,只希望有一個實例工作即可,如果這個實例掛了,其他實例可以頂替。

          這個問題的方案則是集群選主,一個集群中,只有一個LEADER,由LEADER負(fù)責(zé)執(zhí)行定時任務(wù)工作。當(dāng)LEADER被取消時,會在剩下的實例中再選LEADER。

          持有分布式鎖的實例則是LEADER。

          SPRING INTEGRATION JDBC 則已提供相關(guān)功能。

          pom.xml
                  <dependency>
                     <groupId>org.springframework.integration</groupId>
                     <artifactId>spring-integration-jdbc</artifactId>
                  </dependency>

                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter-jdbc</artifactId>
                  </dependency>

                  <dependency>
                     <groupId>org.flywaydb</groupId>
                     <artifactId>flyway-core</artifactId>
                  </dependency>
                  
                  <dependency>
                      <groupId>org.mariadb.jdbc</groupId>
                      <artifactId>mariadb-java-client</artifactId>
                  </dependency>

          LeaderElectionIntegrationConfig.java
          import java.util.List;
          import java.util.concurrent.CopyOnWriteArrayList;

          import javax.sql.DataSource;

          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.jdbc.lock.DefaultLockRepository;
          import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
          import org.springframework.integration.jdbc.lock.LockRepository;
          import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;

          import com.paul.integration.leader.ControlBusGateway;
          import com.paul.integration.leader.MyCandidate;

          @Configuration
          public class LeaderElectionIntegrationConfig {
              
              @Bean
              public List<String> needToStartupAdapterList(){
                  return new CopyOnWriteArrayList<>();
              }
              
              @Bean
              public DefaultLockRepository defaultLockRepository(DataSource dataSource){
                  DefaultLockRepository defaultLockRepository =
                          new DefaultLockRepository(dataSource);
          //        defaultLockRepository.setTimeToLive(60_000);
                  return defaultLockRepository;
              }

              @Bean
              public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
                  return new JdbcLockRegistry(lockRepository);
              }
              
              @Bean
              public MyCandidate myCandidate(
                  ControlBusGateway controlBusGateway,
                  List<String> needToStartupAdapterList
              ) {
                  return new MyCandidate(controlBusGateway, needToStartupAdapterList);
              }
              
              @Bean
              public LockRegistryLeaderInitiator leaderInitiator() {
                  return new LockRegistryLeaderInitiator(
                              jdbcLockRegistry(null), myCandidate(nullnull)
                         );
              }
              
              
          }


          MyCandidate.java
          import java.util.List;

          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.integration.leader.Context;
          import org.springframework.integration.leader.DefaultCandidate;

          import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;

          public class MyCandidate extends DefaultCandidate{
              
              private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
              
              private List<String> needToStartupAdapterList;
              
              private ControlBusGateway controlBusGateway;
              
              public MyCandidate(
                  ControlBusGateway controlBusGateway,
                  List<String> needToStartupAdapterList
              ) {
                  this.controlBusGateway = controlBusGateway;
                  this.needToStartupAdapterList = needToStartupAdapterList;
              }
              
              @Override
              public void onGranted(Context context) {
                  super.onGranted(context);
                  LOG.info("*** Leadership granted ***");
                  LOG.info("STARTING MONGODB POLLER");
                  needToStartupAdapterList
                      .forEach(
                          c -> {
          //                    c = "@'testIntegrationFlow.org.springframework.integration.config."
          //                            + "SourcePollingChannelAdapterFactoryBean#0'";
                              String command = c + ".start()";
                              LOG.info("-----{}", command);
                              controlBusGateway.sendCommand(command);
                          }
                       );
                  LOG.info("STARTUP MESSAGE SENT");

              }

              @Override
              public void onRevoked(Context context) {
                  super.onRevoked(context);
                  LOG.info("*** Leadership revoked ***");
                  LOG.info("STOPPING MONGODB POLLER");
                  needToStartupAdapterList
                      .forEach(
                          c -> {
          //                    c = "@'testIntegrationConfig.testIntegrationFlow."
          //                            + "mongoMessageSource.inboundChannelAdapter'";
                              String command = c + ".stop()";
                              LOG.info("-----{}", command);
          //                    controlBusGateway.sendCommand(command);
                          }
                       );
                  LOG.info("SHUTDOWN MESSAGE SENT");
              }

          }


          ControlBusIntegrationConfig.java
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.integration.dsl.IntegrationFlow;
          import org.springframework.integration.dsl.IntegrationFlows;
          import org.springframework.integration.dsl.MessageChannels;
          import org.springframework.integration.gateway.GatewayProxyFactoryBean;
          import org.springframework.integration.handler.LoggingHandler;
          import org.springframework.messaging.MessageChannel;

          import com.paul.integration.gateway.ControlBusGateway;

          @Configuration
          public class ControlBusIntegrationConfig {
              
              @Bean
              public MessageChannel controlBusChannel() {
                  return MessageChannels.direct().get();
              }
              
              @Bean
              public IntegrationFlow controlBusFlow() {
                  return IntegrationFlows.from(controlBusChannel())
                              .log(LoggingHandler.Level.INFO, "controlBusChannel")
                              .controlBus()
                              .get();
              }
              
              @Bean
              public GatewayProxyFactoryBean controlBusGateway() {
                  GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
                  gateway.setDefaultRequestChannel(controlBusChannel());
                  gateway.setDefaultRequestTimeout(300l);
                  gateway.setDefaultReplyTimeout(300l);
                  return gateway;
              }
              
          }


          ControlBusGateway.java
          public interface ControlBusGateway {
              
              public void sendCommand(String command);

          }


          各個應(yīng)用實例運行時,其中的LockRegistryLeaderInitiator會自動運行,搶奪LEADER數(shù)據(jù),最終只有一個實例奪取。之后再執(zhí)行MyCandidate中的代碼。







          posted on 2022-01-20 13:49 paulwong 閱讀(551) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATION

          主站蜘蛛池模板: 建水县| 江津市| 绥宁县| 炉霍县| 丽江市| 宕昌县| 太康县| 千阳县| 全南县| 盐边县| 石景山区| 三河市| 丰城市| 隆安县| 陕西省| 闽侯县| 临朐县| 仁化县| 娄底市| 南澳县| 青州市| 林口县| 南京市| 招远市| 嵊州市| 邵阳县| 石首市| 永春县| 固阳县| 伊宁县| 威远县| 商水县| 新河县| 靖远县| 安康市| 珲春市| 静宁县| 上栗县| 揭阳市| 湘西| 林口县|