|
@@ -7,6 +7,8 @@ package org.elasticsearch.xpack.ccr.action;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.elasticsearch.ElasticsearchException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
|
|
import org.elasticsearch.client.Client;
|
|
@@ -17,8 +19,10 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|
|
import org.elasticsearch.common.util.concurrent.CountDown;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.license.LicenseUtils;
|
|
@@ -27,15 +31,18 @@ import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
|
|
import org.elasticsearch.xpack.ccr.CcrSettings;
|
|
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
|
|
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
|
|
|
+import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
|
|
|
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
|
|
|
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.TreeMap;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.Consumer;
|
|
|
import java.util.function.Function;
|
|
@@ -47,6 +54,7 @@ import java.util.function.Function;
|
|
|
public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
|
|
|
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
|
|
|
+ private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
|
|
|
|
|
|
private final Client client;
|
|
|
private final TimeValue pollInterval;
|
|
@@ -56,6 +64,12 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
|
|
|
private volatile boolean localNodeMaster = false;
|
|
|
|
|
|
+ // The following fields are read and updated under a lock:
|
|
|
+ private long numberOfSuccessfulIndicesAutoFollowed = 0;
|
|
|
+ private long numberOfFailedIndicesAutoFollowed = 0;
|
|
|
+ private long numberOfFailedRemoteClusterStateRequests = 0;
|
|
|
+ private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;
|
|
|
+
|
|
|
public AutoFollowCoordinator(
|
|
|
Settings settings,
|
|
|
Client client,
|
|
@@ -69,6 +83,47 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
|
|
|
this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings);
|
|
|
clusterService.addStateApplier(this);
|
|
|
+
|
|
|
+ this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
|
|
|
+ @Override
|
|
|
+ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchException> eldest) {
|
|
|
+ return size() > MAX_AUTO_FOLLOW_ERRORS;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized AutoFollowStats getStats() {
|
|
|
+ return new AutoFollowStats(
|
|
|
+ numberOfFailedIndicesAutoFollowed,
|
|
|
+ numberOfFailedRemoteClusterStateRequests,
|
|
|
+ numberOfSuccessfulIndicesAutoFollowed,
|
|
|
+ new TreeMap<>(recentAutoFollowErrors)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void updateStats(List<AutoFollowResult> results) {
|
|
|
+ for (AutoFollowResult result : results) {
|
|
|
+ if (result.clusterStateFetchException != null) {
|
|
|
+ recentAutoFollowErrors.put(result.clusterAlias,
|
|
|
+ new ElasticsearchException(result.clusterStateFetchException));
|
|
|
+ numberOfFailedRemoteClusterStateRequests++;
|
|
|
+ LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]",
|
|
|
+ result.clusterAlias), result.clusterStateFetchException);
|
|
|
+ } else {
|
|
|
+ for (Map.Entry<Index, Exception> entry : result.autoFollowExecutionResults.entrySet()) {
|
|
|
+ if (entry.getValue() != null) {
|
|
|
+ numberOfFailedIndicesAutoFollowed++;
|
|
|
+ recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(),
|
|
|
+ new ElasticsearchException(entry.getValue()));
|
|
|
+ LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]",
|
|
|
+ entry.getKey(), result.clusterAlias), entry.getValue());
|
|
|
+ } else {
|
|
|
+ numberOfSuccessfulIndicesAutoFollowed++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void doAutoFollow() {
|
|
@@ -94,10 +149,8 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- Consumer<Exception> handler = e -> {
|
|
|
- if (e != null) {
|
|
|
- LOGGER.warn("failure occurred during auto-follower coordination", e);
|
|
|
- }
|
|
|
+ Consumer<List<AutoFollowResult>> handler = results -> {
|
|
|
+ updateStats(results);
|
|
|
threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow);
|
|
|
};
|
|
|
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
|
|
@@ -178,101 +231,97 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
|
|
|
abstract static class AutoFollower {
|
|
|
|
|
|
- private final Consumer<Exception> handler;
|
|
|
+ private final Consumer<List<AutoFollowResult>> handler;
|
|
|
private final ClusterState followerClusterState;
|
|
|
private final AutoFollowMetadata autoFollowMetadata;
|
|
|
|
|
|
private final CountDown autoFollowPatternsCountDown;
|
|
|
- private final AtomicReference<Exception> autoFollowPatternsErrorHolder = new AtomicReference<>();
|
|
|
+ private final AtomicArray<AutoFollowResult> autoFollowResults;
|
|
|
|
|
|
- AutoFollower(final Consumer<Exception> handler, final ClusterState followerClusterState) {
|
|
|
+ AutoFollower(final Consumer<List<AutoFollowResult>> handler, final ClusterState followerClusterState) {
|
|
|
this.handler = handler;
|
|
|
this.followerClusterState = followerClusterState;
|
|
|
this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
|
|
|
this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size());
|
|
|
+ this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size());
|
|
|
}
|
|
|
|
|
|
void autoFollowIndices() {
|
|
|
+ int i = 0;
|
|
|
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
|
|
|
- String clusterAlias = entry.getKey();
|
|
|
- AutoFollowPattern autoFollowPattern = entry.getValue();
|
|
|
- List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
|
|
|
+ final int slot = i;
|
|
|
+ final String clusterAlias = entry.getKey();
|
|
|
+ final AutoFollowPattern autoFollowPattern = entry.getValue();
|
|
|
|
|
|
getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
|
|
|
if (leaderClusterState != null) {
|
|
|
assert e == null;
|
|
|
- handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState);
|
|
|
+ final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
|
|
|
+ final List<Index> leaderIndicesToFollow =
|
|
|
+ getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices);
|
|
|
+ if (leaderIndicesToFollow.isEmpty()) {
|
|
|
+ finalise(slot, new AutoFollowResult(clusterAlias));
|
|
|
+ } else {
|
|
|
+ Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
|
|
|
+ checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler);
|
|
|
+ }
|
|
|
} else {
|
|
|
- finalise(e);
|
|
|
+ finalise(slot, new AutoFollowResult(clusterAlias, e));
|
|
|
}
|
|
|
});
|
|
|
+ i++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollowPattern,
|
|
|
- List<String> followedIndexUUIDs, ClusterState leaderClusterState) {
|
|
|
- final List<Index> leaderIndicesToFollow =
|
|
|
- getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs);
|
|
|
- if (leaderIndicesToFollow.isEmpty()) {
|
|
|
- finalise(null);
|
|
|
- } else {
|
|
|
- final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
|
|
|
- final AtomicReference<Exception> leaderIndicesErrorHolder = new AtomicReference<>();
|
|
|
- for (Index indexToFollow : leaderIndicesToFollow) {
|
|
|
- final String leaderIndexName = indexToFollow.getName();
|
|
|
- final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName);
|
|
|
-
|
|
|
- String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
|
|
|
- clusterAlias + ":" + leaderIndexName;
|
|
|
- FollowIndexAction.Request followRequest =
|
|
|
- new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
|
|
|
- autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(),
|
|
|
- autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(),
|
|
|
- autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(),
|
|
|
- autoFollowPattern.getIdleShardRetryDelay());
|
|
|
-
|
|
|
- // Execute if the create and follow api call succeeds:
|
|
|
- Runnable successHandler = () -> {
|
|
|
- LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);
|
|
|
-
|
|
|
- // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
|
|
|
- // (so that we do not try to follow it in subsequent auto follow runs)
|
|
|
- Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
|
|
|
- // The coordinator always runs on the elected master node, so we can update cluster state here:
|
|
|
- updateAutoFollowMetadata(function, updateError -> {
|
|
|
- if (updateError != null) {
|
|
|
- LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError);
|
|
|
- if (leaderIndicesErrorHolder.compareAndSet(null, updateError) == false) {
|
|
|
- leaderIndicesErrorHolder.get().addSuppressed(updateError);
|
|
|
- }
|
|
|
- } else {
|
|
|
- LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName);
|
|
|
- }
|
|
|
- if (leaderIndicesCountDown.countDown()) {
|
|
|
- finalise(leaderIndicesErrorHolder.get());
|
|
|
- }
|
|
|
- });
|
|
|
- };
|
|
|
- // Execute if the create and follow apu call fails:
|
|
|
- Consumer<Exception> failureHandler = followError -> {
|
|
|
- assert followError != null;
|
|
|
- LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError);
|
|
|
- if (leaderIndicesCountDown.countDown()) {
|
|
|
- finalise(followError);
|
|
|
- }
|
|
|
- };
|
|
|
- createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
|
|
|
- }
|
|
|
+ private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern,
|
|
|
+ List<Index> leaderIndicesToFollow, Consumer<AutoFollowResult> resultHandler) {
|
|
|
+
|
|
|
+ final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
|
|
|
+ final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
|
|
|
+ for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
|
|
|
+ final Index indexToFollow = leaderIndicesToFollow.get(i);
|
|
|
+ final int slot = i;
|
|
|
+ followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> {
|
|
|
+ results.set(slot, new Tuple<>(indexToFollow, error));
|
|
|
+ if (leaderIndicesCountDown.countDown()) {
|
|
|
+ resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void finalise(Exception failure) {
|
|
|
- if (autoFollowPatternsErrorHolder.compareAndSet(null, failure) == false) {
|
|
|
- autoFollowPatternsErrorHolder.get().addSuppressed(failure);
|
|
|
- }
|
|
|
+ private void followLeaderIndex(String clusterAlias, Index indexToFollow,
|
|
|
+ AutoFollowPattern pattern, Consumer<Exception> onResult) {
|
|
|
+ final String leaderIndexName = indexToFollow.getName();
|
|
|
+ final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
|
|
|
+
|
|
|
+ String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
|
|
|
+ clusterAlias + ":" + leaderIndexName;
|
|
|
+ FollowIndexAction.Request request =
|
|
|
+ new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
|
|
|
+ pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
|
|
|
+ pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
|
|
|
+ pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
|
|
|
+ pattern.getIdleShardRetryDelay());
|
|
|
+
|
|
|
+ // Execute if the create and follow api call succeeds:
|
|
|
+ Runnable successHandler = () -> {
|
|
|
+ LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);
|
|
|
+
|
|
|
+ // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
|
|
|
+ // (so that we do not try to follow it in subsequent auto follow runs)
|
|
|
+ Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
|
|
|
+ // The coordinator always runs on the elected master node, so we can update cluster state here:
|
|
|
+ updateAutoFollowMetadata(function, onResult);
|
|
|
+ };
|
|
|
+ createAndFollow(pattern.getHeaders(), request, successHandler, onResult);
|
|
|
+ }
|
|
|
|
|
|
+ private void finalise(int slot, AutoFollowResult result) {
|
|
|
+ assert autoFollowResults.get(slot) == null;
|
|
|
+ autoFollowResults.set(slot, result);
|
|
|
if (autoFollowPatternsCountDown.countDown()) {
|
|
|
- handler.accept(autoFollowPatternsErrorHolder.get());
|
|
|
+ handler.accept(autoFollowResults.asList());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -347,4 +396,33 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|
|
);
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ static class AutoFollowResult {
|
|
|
+
|
|
|
+ final String clusterAlias;
|
|
|
+ final Exception clusterStateFetchException;
|
|
|
+ final Map<Index, Exception> autoFollowExecutionResults;
|
|
|
+
|
|
|
+ AutoFollowResult(String clusterAlias, List<Tuple<Index, Exception>> results) {
|
|
|
+ this.clusterAlias = clusterAlias;
|
|
|
+
|
|
|
+ Map<Index, Exception> autoFollowExecutionResults = new HashMap<>();
|
|
|
+ for (Tuple<Index, Exception> result : results) {
|
|
|
+ autoFollowExecutionResults.put(result.v1(), result.v2());
|
|
|
+ }
|
|
|
+
|
|
|
+ this.clusterStateFetchException = null;
|
|
|
+ this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults);
|
|
|
+ }
|
|
|
+
|
|
|
+ AutoFollowResult(String clusterAlias, Exception e) {
|
|
|
+ this.clusterAlias = clusterAlias;
|
|
|
+ this.clusterStateFetchException = e;
|
|
|
+ this.autoFollowExecutionResults = Collections.emptyMap();
|
|
|
+ }
|
|
|
+
|
|
|
+ AutoFollowResult(String clusterAlias) {
|
|
|
+ this(clusterAlias, (Exception) null);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|