浏览代码

Refactor AutoFollowCoordinator to track leader indices per remote cluster (#36031)

and replaced poll interval setting with a hardcoded poll interval.
The hard coded interval will be removed in a follow up change to make
use of cluster state API's wait_for_metatdata_version.

Before the auto following was bootstrapped from thread pool scheduler,
but now auto followers for new remote clusters are bootstrapped when
a new cluster state is published.

Originates from #35895
Relates to #33007
Martijn van Groningen 6 年之前
父节点
当前提交
a264cb6ddb

+ 1 - 0
x-pack/plugin/ccr/qa/chain/build.gradle

@@ -46,6 +46,7 @@ followClusterTestCluster {
     numNodes = 1
     clusterName = 'follow-cluster'
     setting 'xpack.license.self_generated.type', 'trial'
+    setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
     setting 'cluster.remote.middle_cluster.seeds', "\"${-> middleClusterTest.nodes.get(0).transportUri()}\""
     setting 'node.name', 'follow'
 }

+ 69 - 0
x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ccr;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class AutoFollowIT extends ESCCRRestTestCase {
+
+    public void testAutoFollowPatterns() throws Exception {
+        if ("follow".equals(targetCluster) == false) {
+            return;
+        }
+        Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
+        putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
+        assertOK(client().performRequest(putPatternRequest));
+        putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
+        putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
+        assertOK(client().performRequest(putPatternRequest));
+        try (RestClient leaderClient = buildLeaderClient()) {
+            Settings settings = Settings.builder()
+                .put("index.soft_deletes.enabled", true)
+                .build();
+            Request request = new Request("PUT", "/logs-20190101");
+            request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
+                ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
+            assertOK(leaderClient.performRequest(request));
+            for (int i = 0; i < 5; i++) {
+                String id = Integer.toString(i);
+                index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
+            }
+        }
+        try (RestClient middleClient = buildMiddleClient()) {
+            Settings settings = Settings.builder()
+                .put("index.soft_deletes.enabled", true)
+                .build();
+            Request request = new Request("PUT", "/logs-20200101");
+            request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
+                ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
+            assertOK(middleClient.performRequest(request));
+            for (int i = 0; i < 5; i++) {
+                String id = Integer.toString(i);
+                index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
+            }
+        }
+        assertBusy(() -> {
+            Request statsRequest = new Request("GET", "/_ccr/stats");
+            Map<?, ?> response = toMap(client().performRequest(statsRequest));
+            Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
+            assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));
+
+            ensureYellow("logs-20190101");
+            ensureYellow("logs-20200101");
+            verifyDocuments("logs-20190101", 5, "filtered_field:true");
+            verifyDocuments("logs-20200101", 5, "filtered_field:true");
+        });
+    }
+
+}

+ 1 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

@@ -155,7 +155,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
 
         return Arrays.asList(
             ccrLicenseChecker,
-            new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
+            new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
         );
     }
 

+ 1 - 9
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

@@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ccr;
 
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.core.XPackSettings;
 
 import java.util.Arrays;
@@ -29,12 +28,6 @@ public final class CcrSettings {
     public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
             Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);
 
-    /**
-     * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
-     */
-    public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
-        Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);
-
     /**
      * The settings defined by CCR.
      *
@@ -43,8 +36,7 @@ public final class CcrSettings {
     static List<Setting<?>> getSettings() {
         return Arrays.asList(
                 XPackSettings.CCR_ENABLED_SETTING,
-                CCR_FOLLOWING_INDEX_SETTING,
-                CCR_AUTO_FOLLOW_POLL_INTERVAL);
+                CCR_FOLLOWING_INDEX_SETTING);
     }
 
 }

+ 153 - 118
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -15,14 +15,14 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateApplier;
+import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.collect.CopyOnWriteHashMap;
 import org.elasticsearch.common.collect.Tuple;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.CountDown;
@@ -31,7 +31,6 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
-import org.elasticsearch.xpack.ccr.CcrSettings;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
 import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
@@ -45,28 +44,29 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
  * A component that runs only on the elected master node and follows leader indices automatically
  * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
  */
-public class AutoFollowCoordinator implements ClusterStateApplier {
+public class AutoFollowCoordinator implements ClusterStateListener {
 
     private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
     private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
 
     private final Client client;
-    private final TimeValue pollInterval;
     private final ThreadPool threadPool;
     private final ClusterService clusterService;
     private final CcrLicenseChecker ccrLicenseChecker;
 
-    private volatile boolean localNodeMaster = false;
+    private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
 
     // The following fields are read and updated under a lock:
     private long numberOfSuccessfulIndicesAutoFollowed = 0;
@@ -75,7 +75,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
     private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
 
     public AutoFollowCoordinator(
-            Settings settings,
             Client client,
             ThreadPool threadPool,
             ClusterService clusterService,
@@ -84,10 +83,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
         this.threadPool = threadPool;
         this.clusterService = clusterService;
         this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
-
-        this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
-        clusterService.addStateApplier(this);
-
+        clusterService.addListener(this);
         this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
             @Override
             protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchException> eldest) {
@@ -130,151 +126,189 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
         }
     }
 
-    private void doAutoFollow() {
-        if (localNodeMaster == false) {
-            return;
-        }
-        ClusterState followerClusterState = clusterService.state();
+    void updateAutoFollowers(ClusterState followerClusterState) {
         AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
         if (autoFollowMetadata == null) {
-            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
-            return;
-        }
-
-        if (autoFollowMetadata.getPatterns().isEmpty()) {
-            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
             return;
         }
 
         if (ccrLicenseChecker.isCcrAllowed() == false) {
             // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
             LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr"));
-            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
             return;
         }
 
-        Consumer<List<AutoFollowResult>> handler = results -> {
-            updateStats(results);
-            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
-        };
-        AutoFollower operation = new AutoFollower(handler, followerClusterState) {
+        final CopyOnWriteHashMap<String, AutoFollower> autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers);
+        Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream()
+            .map(entry -> entry.getValue().getRemoteCluster())
+            .filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false)
+            .collect(Collectors.toSet());
+
+        Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
+        for (String remoteCluster : newRemoteClusters) {
+            AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
+
+                @Override
+                void getLeaderClusterState(final String remoteCluster,
+                                           final BiConsumer<ClusterState, Exception> handler) {
+                    final ClusterStateRequest request = new ClusterStateRequest();
+                    request.clear();
+                    request.metaData(true);
+                    request.routingTable(true);
+                    // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
+                    ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
+                        client,
+                        remoteCluster,
+                        request,
+                        e -> handler.accept(null, e),
+                        leaderClusterState -> handler.accept(leaderClusterState, null));
+                }
 
-            @Override
-            void getLeaderClusterState(final String remoteCluster,
-                                       final BiConsumer<ClusterState, Exception> handler) {
-                final ClusterStateRequest request = new ClusterStateRequest();
-                request.clear();
-                request.metaData(true);
-                request.routingTable(true);
-                // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
-                ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
-                    client,
-                    remoteCluster,
-                    request,
-                    e -> handler.accept(null, e),
-                    leaderClusterState -> handler.accept(leaderClusterState, null));
-            }
+                @Override
+                void createAndFollow(Map<String, String> headers,
+                                     PutFollowAction.Request request,
+                                     Runnable successHandler,
+                                     Consumer<Exception> failureHandler) {
+                    Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
+                    followerClient.execute(
+                        PutFollowAction.INSTANCE,
+                        request,
+                        ActionListener.wrap(r -> successHandler.run(), failureHandler)
+                    );
+                }
 
-            @Override
-            void createAndFollow(Map<String, String> headers,
-                                 PutFollowAction.Request request,
-                                 Runnable successHandler,
-                                 Consumer<Exception> failureHandler) {
-                Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
-                followerClient.execute(
-                    PutFollowAction.INSTANCE,
-                    request,
-                    ActionListener.wrap(r -> successHandler.run(), failureHandler)
-                );
-            }
+                @Override
+                void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
+                                              Consumer<Exception> handler) {
+                    clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {
 
-            @Override
-            void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
-                                          Consumer<Exception> handler) {
-                clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() {
+                        @Override
+                        public ClusterState execute(ClusterState currentState) throws Exception {
+                            return updateFunction.apply(currentState);
+                        }
 
-                    @Override
-                    public ClusterState execute(ClusterState currentState) throws Exception {
-                        return updateFunction.apply(currentState);
-                    }
+                        @Override
+                        public void onFailure(String source, Exception e) {
+                            handler.accept(e);
+                        }
 
-                    @Override
-                    public void onFailure(String source, Exception e) {
-                        handler.accept(e);
-                    }
+                        @Override
+                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                            handler.accept(null);
+                        }
+                    });
+                }
 
-                    @Override
-                    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                        handler.accept(null);
-                    }
-                });
-            }
+            };
+            newAutoFollowers.put(remoteCluster, autoFollower);
+            autoFollower.autoFollowIndices();
+        }
 
-        };
-        operation.autoFollowIndices();
+        List<String> removedRemoteClusters = new ArrayList<>();
+        for (String remoteCluster : autoFollowers.keySet()) {
+            boolean exist = autoFollowMetadata.getPatterns().values().stream()
+                .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
+            if (exist == false) {
+                removedRemoteClusters.add(remoteCluster);
+            }
+        }
+        this.autoFollowers = autoFollowers
+            .copyAndPutAll(newAutoFollowers)
+            .copyAndRemoveAll(removedRemoteClusters);
     }
 
     @Override
-    public void applyClusterState(ClusterChangedEvent event) {
-        final boolean beforeLocalMasterNode = localNodeMaster;
-        localNodeMaster = event.localNodeMaster();
-        if (beforeLocalMasterNode == false && localNodeMaster) {
-            threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
+    public void clusterChanged(ClusterChangedEvent event) {
+        if (event.localNodeMaster()) {
+            updateAutoFollowers(event.state());
         }
     }
 
+    /**
+     * Each auto follower independently monitors a remote cluster for new leader indices that should be auto followed.
+     * The reason that this should happen independently, is that when auto followers start to make use of cluster state
+     * API's 'wait_for_metadata_version' feature, it may take sometime before a remote cluster responds with a new
+     * cluster state or times out. Other auto follow patterns for different remote clusters are then forced to wait,
+     * which can cause new follower indices to unnecessarily start with a large backlog of operations that need to be
+     * replicated.
+     */
     abstract static class AutoFollower {
 
-        private final Consumer<List<AutoFollowResult>> handler;
-        private final ClusterState followerClusterState;
-        private final AutoFollowMetadata autoFollowMetadata;
-
-        private final CountDown autoFollowPatternsCountDown;
-        private final AtomicArray<AutoFollowResult> autoFollowResults;
-
-        AutoFollower(final Consumer<List<AutoFollowResult>> handler, final ClusterState followerClusterState) {
-            this.handler = handler;
-            this.followerClusterState = followerClusterState;
-            this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
-            this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size());
-            this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size());
+        private final String remoteCluster;
+        private final ThreadPool threadPool;
+        private final Consumer<List<AutoFollowResult>> statsUpdater;
+        private final Supplier<ClusterState> followerClusterStateSupplier;
+
+        private volatile CountDown autoFollowPatternsCountDown;
+        private volatile AtomicArray<AutoFollowResult> autoFollowResults;
+
+        AutoFollower(final String remoteCluster,
+                     final ThreadPool threadPool,
+                     final Consumer<List<AutoFollowResult>> statsUpdater,
+                     final Supplier<ClusterState> followerClusterStateSupplier) {
+            this.remoteCluster = remoteCluster;
+            this.threadPool = threadPool;
+            this.statsUpdater = statsUpdater;
+            this.followerClusterStateSupplier = followerClusterStateSupplier;
         }
 
         void autoFollowIndices() {
-            int i = 0;
-            for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
-                final int slot = i;
-                final String autoFollowPattenName = entry.getKey();
-                final AutoFollowPattern autoFollowPattern = entry.getValue();
-                final String remoteCluster = autoFollowPattern.getRemoteCluster();
-
-                Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
-                getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
-                    if (leaderClusterState != null) {
-                        assert e == null;
-                        final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
-                        final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern,
-                            leaderClusterState, followerClusterState, followedIndices);
+            final ClusterState followerClusterState = followerClusterStateSupplier.get();
+            final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE);
+            if (autoFollowMetadata == null) {
+                LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster);
+                return;
+            }
+
+            final List<String> patterns = autoFollowMetadata.getPatterns().entrySet().stream()
+                .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster))
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList());
+            if (patterns.isEmpty()) {
+                LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster);
+                return;
+            }
+
+            this.autoFollowPatternsCountDown = new CountDown(patterns.size());
+            this.autoFollowResults = new AtomicArray<>(patterns.size());
+
+            getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
+                if (leaderClusterState != null) {
+                    assert e == null;
+
+                    int i = 0;
+                    for (String autoFollowPatternName : patterns) {
+                        final int slot = i;
+                        AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName);
+                        Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
+                        List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
+
+                        final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState,
+                            followerClusterState, followedIndices);
                         if (leaderIndicesToFollow.isEmpty()) {
-                            finalise(slot, new AutoFollowResult(autoFollowPattenName));
+                            finalise(slot, new AutoFollowResult(autoFollowPatternName));
                         } else {
                             List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
                                 .entrySet().stream()
-                                .filter(item -> autoFollowPattenName.equals(item.getKey()) == false)
+                                .filter(item -> autoFollowPatternName.equals(item.getKey()) == false)
                                 .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
                                 .map(item -> new Tuple<>(item.getKey(), item.getValue()))
                                 .collect(Collectors.toList());
 
                             Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
-                            checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
+                            checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
                                 patternsForTheSameLeaderCluster, resultHandler);
                         }
-                    } else {
-                        finalise(slot, new AutoFollowResult(autoFollowPattenName, e));
+                        i++;
                     }
-                });
-                i++;
-            }
+                } else {
+                    List<AutoFollowResult> results = new ArrayList<>(patterns.size());
+                    for (String autoFollowPatternName : patterns) {
+                        results.add(new AutoFollowResult(autoFollowPatternName, e));
+                    }
+                    statsUpdater.accept(results);
+                }
+            });
         }
 
         private void checkAutoFollowPattern(String autoFollowPattenName,
@@ -357,12 +391,13 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
             assert autoFollowResults.get(slot) == null;
             autoFollowResults.set(slot, result);
             if (autoFollowPatternsCountDown.countDown()) {
-                handler.accept(autoFollowResults.asList());
+                statsUpdater.accept(autoFollowResults.asList());
+                // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
+                threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices);
             }
         }
 
-        static List<Index> getLeaderIndicesToFollow(String remoteCluster,
-                                                    AutoFollowPattern autoFollowPattern,
+        static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
                                                     ClusterState leaderClusterState,
                                                     ClusterState followerClusterState,
                                                     List<String> followedIndexUUIDs) {

+ 47 - 43
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java

@@ -140,60 +140,64 @@ public class CcrLicenseIT extends CcrSingleNodeTestCase {
     }
 
     public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCompliantLicense() throws Exception {
-        // Update the cluster state so that we have auto follow patterns and verify that we log a warning in case of incompatible license:
-        CountDownLatch latch = new CountDownLatch(1);
-        ClusterService clusterService = getInstanceFromNode(ClusterService.class);
-        clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() {
-
-            @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
-                AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"),
-                    null, null, null, null, null, null, null, null, null, null, null);
-                AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
-                    Collections.singletonMap("test_alias", autoFollowPattern),
-                    Collections.emptyMap(),
-                    Collections.emptyMap());
-
-                ClusterState.Builder newState = ClusterState.builder(currentState);
-                newState.metaData(MetaData.builder(currentState.getMetaData())
-                    .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)
-                    .build());
-                return newState.build();
-            }
-
-            @Override
-            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                latch.countDown();
-            }
-
-            @Override
-            public void onFailure(String source, Exception e) {
-                latch.countDown();
-                fail("unexpected error [" + e.getMessage() + "]");
-            }
-        });
-        latch.await();
-
         final Logger logger = LogManager.getLogger(AutoFollowCoordinator.class);
         final MockLogAppender appender = new MockLogAppender();
         appender.start();
         appender.addExpectation(
-                new MockLogAppender.ExceptionSeenEventExpectation(
-                        getTestName(),
-                        logger.getName(),
-                        Level.WARN,
-                        "skipping auto-follower coordination",
-                        ElasticsearchSecurityException.class,
-                        "current license is non-compliant for [ccr]"));
-        Loggers.addAppender(logger, appender);
+            new MockLogAppender.ExceptionSeenEventExpectation(
+                getTestName(),
+                logger.getName(),
+                Level.WARN,
+                "skipping auto-follower coordination",
+                ElasticsearchSecurityException.class,
+                "current license is non-compliant for [ccr]"));
+
         try {
-            assertBusy(appender::assertAllExpectationsMatched);
+            // Need to add mock log appender before submitting CS update, otherwise we miss the expected log:
+            // (Auto followers for new remote clusters are bootstrapped when a new cluster state is published)
+            Loggers.addAppender(logger, appender);
+            // Update the cluster state so that we have auto follow patterns and verify that we log a warning
+            // in case of incompatible license:
+            CountDownLatch latch = new CountDownLatch(1);
+            ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+            clusterService.submitStateUpdateTask("test-add-auto-follow-pattern", new ClusterStateUpdateTask() {
+
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"),
+                        null, null, null, null, null, null, null, null, null, null, null);
+                    AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(
+                        Collections.singletonMap("test_alias", autoFollowPattern),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
+
+                    ClusterState.Builder newState = ClusterState.builder(currentState);
+                    newState.metaData(MetaData.builder(currentState.getMetaData())
+                        .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)
+                        .build());
+                    return newState.build();
+                }
+
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    latch.countDown();
+                    fail("unexpected error [" + e.getMessage() + "]");
+                }
+            });
+            latch.await();
+            appender.assertAllExpectationsMatched();
         } finally {
             Loggers.removeAppender(logger, appender);
             appender.stop();
         }
     }
 
+
     private void assertNonCompliantLicense(final Exception e) {
         assertThat(e, instanceOf(ElasticsearchSecurityException.class));
         assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]"));

+ 45 - 9
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
 import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
 import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
@@ -34,11 +35,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
 import static org.hamcrest.Matchers.equalTo;
@@ -46,7 +49,9 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -54,6 +59,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testAutoFollower() {
         Client client = mock(Client.class);
+        ThreadPool threadPool = mockThreadPool();
         when(client.getRemoteClusterClient(anyString())).thenReturn(client);
 
         ClusterState leaderState = createRemoteClusterState("logs-20190101");
@@ -83,7 +89,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
             assertThat(entries.get(0).getValue(), nullValue());
         };
-        AutoFollower autoFollower = new AutoFollower(handler, currentState) {
+        AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) {
             @Override
             void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
@@ -119,6 +125,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testAutoFollowerClusterStateApiFailure() {
         Client client = mock(Client.class);
+        ThreadPool threadPool = mockThreadPool();
         when(client.getRemoteClusterClient(anyString())).thenReturn(client);
 
         AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
@@ -142,7 +149,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             assertThat(results.get(0).clusterStateFetchException, sameInstance(failure));
             assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0));
         };
-        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) {
             @Override
             void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
@@ -169,6 +176,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testAutoFollowerUpdateClusterStateFailure() {
         Client client = mock(Client.class);
+        ThreadPool threadPool = mockThreadPool();
         when(client.getRemoteClusterClient(anyString())).thenReturn(client);
         ClusterState leaderState = createRemoteClusterState("logs-20190101");
 
@@ -196,7 +204,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
             assertThat(entries.get(0).getValue(), sameInstance(failure));
         };
-        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) {
             @Override
             void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
@@ -225,6 +233,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testAutoFollowerCreateAndFollowApiCallFailure() {
         Client client = mock(Client.class);
+        ThreadPool  threadPool = mockThreadPool();
         when(client.getRemoteClusterClient(anyString())).thenReturn(client);
         ClusterState leaderState = createRemoteClusterState("logs-20190101");
 
@@ -252,7 +261,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
             assertThat(entries.get(0).getValue(), sameInstance(failure));
         };
-        AutoFollower autoFollower = new AutoFollower(handler, followerState) {
+        AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) {
             @Override
             void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
@@ -324,7 +333,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .routingTable(routingTableBuilder.build())
             .build();
 
-        List<Index> result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState,
+        List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState,
             Collections.emptyList());
         result.sort(Comparator.comparing(Index::getName));
         assertThat(result.size(), equalTo(3));
@@ -333,7 +342,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         assertThat(result.get(2).getName(), equalTo("metrics-4"));
 
         List<String> followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID());
-        result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, followedIndexUUIDs);
+        result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs);
         result.sort(Comparator.comparing(Index::getName));
         assertThat(result.size(), equalTo(2));
         assertThat(result.get(0).getName(), equalTo("metrics-0"));
@@ -365,7 +374,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build())
             .build();
 
-        List<Index> result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState,
+        List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState,
             Collections.emptyList());
         assertThat(result.size(), equalTo(1));
         assertThat(result.get(0).getName(), equalTo("index1"));
@@ -379,7 +388,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build())
             .build();
 
-        result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, Collections.emptyList());
+        result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList());
         assertThat(result.size(), equalTo(2));
         result.sort(Comparator.comparing(Index::getName));
         assertThat(result.get(0).getName(), equalTo("index1"));
@@ -429,7 +438,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
 
     public void testStats() {
         AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
-            Settings.EMPTY,
             null,
             null,
             mock(ClusterService.class),
@@ -503,4 +511,32 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         return csBuilder.build();
     }
 
+    private static Supplier<ClusterState> followerClusterStateSupplier(ClusterState... states) {
+        final AutoFollowMetadata emptyAutoFollowMetadata =
+            new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+        final ClusterState lastState = ClusterState.builder(new ClusterName("remote"))
+            .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, emptyAutoFollowMetadata))
+            .build();
+        final LinkedList<ClusterState> queue = new LinkedList<>(Arrays.asList(states));
+        return () -> {
+            final ClusterState current = queue.poll();
+            if (current != null) {
+                return current;
+            } else {
+                return lastState;
+            }
+        };
+    }
+
+    private static ThreadPool mockThreadPool() {
+        ThreadPool threadPool = mock(ThreadPool.class);
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable task = (Runnable) args[2];
+            task.run();
+            return null;
+        }).when(threadPool).schedule(any(), anyString(), any());
+        return threadPool;
+    }
+
 }