|
@@ -26,6 +26,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
|
|
+import org.elasticsearch.common.component.Lifecycle;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|
@@ -66,7 +68,7 @@ import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedClust
|
|
|
* A component that runs only on the elected master node and follows leader indices automatically
|
|
|
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
|
|
|
*/
|
|
|
-public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
+public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener {
|
|
|
|
|
|
private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class);
|
|
|
private static final int MAX_AUTO_FOLLOW_ERRORS = 256;
|
|
@@ -117,6 +119,26 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settings);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected void doStart() {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doStop() {
|
|
|
+ LOGGER.trace("stopping all auto-followers");
|
|
|
+ /*
|
|
|
+ * Synchronization is not necessary here; the field is volatile and the map is a copy-on-write map, any new auto-followers will not
|
|
|
+ * start since we check started status of the coordinator before starting them.
|
|
|
+ */
|
|
|
+ autoFollowers.values().forEach(AutoFollower::stop);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void doClose() {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
public synchronized AutoFollowStats getStats() {
|
|
|
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
|
|
|
final TreeMap<String, AutoFollowedCluster> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
|
|
@@ -246,8 +268,10 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
|
|
|
};
|
|
|
newAutoFollowers.put(remoteCluster, autoFollower);
|
|
|
- LOGGER.info("starting auto follower for remote cluster [{}]", remoteCluster);
|
|
|
- autoFollower.start();
|
|
|
+ LOGGER.info("starting auto-follower for remote cluster [{}]", remoteCluster);
|
|
|
+ if (lifecycleState() == Lifecycle.State.STARTED) {
|
|
|
+ autoFollower.start();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
List<String> removedRemoteClusters = new ArrayList<>();
|
|
@@ -257,13 +281,15 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
boolean exist = autoFollowMetadata.getPatterns().values().stream()
|
|
|
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
|
|
|
if (exist == false) {
|
|
|
- LOGGER.info("removing auto follower for remote cluster [{}]", remoteCluster);
|
|
|
+ LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster);
|
|
|
autoFollower.removed = true;
|
|
|
removedRemoteClusters.add(remoteCluster);
|
|
|
} else if (autoFollower.remoteClusterConnectionMissing) {
|
|
|
- LOGGER.info("retrying auto follower [{}] after remote cluster connection was missing", remoteCluster);
|
|
|
+ LOGGER.info("retrying auto-follower for remote cluster [{}] after remote cluster connection was missing", remoteCluster);
|
|
|
autoFollower.remoteClusterConnectionMissing = false;
|
|
|
- autoFollower.start();
|
|
|
+ if (lifecycleState() == Lifecycle.State.STARTED) {
|
|
|
+ autoFollower.start();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
assert assertNoOtherActiveAutoFollower(newAutoFollowers);
|
|
@@ -313,6 +339,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
volatile boolean removed = false;
|
|
|
private volatile CountDown autoFollowPatternsCountDown;
|
|
|
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
|
|
|
+ private volatile boolean stop;
|
|
|
|
|
|
AutoFollower(final String remoteCluster,
|
|
|
final Consumer<List<AutoFollowResult>> statsUpdater,
|
|
@@ -325,6 +352,10 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
}
|
|
|
|
|
|
void start() {
|
|
|
+ if (stop) {
|
|
|
+ LOGGER.trace("auto-follower is stopped for remote cluster [{}]", remoteCluster);
|
|
|
+ return;
|
|
|
+ }
|
|
|
if (removed) {
|
|
|
// This check exists to avoid two AutoFollower instances a single remote cluster.
|
|
|
// (If an auto follow pattern is deleted and then added back quickly enough then
|
|
@@ -389,6 +420,11 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ void stop() {
|
|
|
+ LOGGER.trace("stopping auto-follower for remote cluster [{}]", remoteCluster);
|
|
|
+ stop = true;
|
|
|
+ }
|
|
|
+
|
|
|
private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
|
|
|
final ClusterState clusterState,
|
|
|
final ClusterState remoteClusterState,
|