|
@@ -7,14 +7,15 @@
|
|
|
package org.elasticsearch.xpack.ccr.action;
|
|
|
|
|
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
|
|
+
|
|
|
import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
|
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
|
|
import org.elasticsearch.client.Client;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
-import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
|
|
|
import org.elasticsearch.cluster.metadata.DataStream;
|
|
|
+import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
@@ -24,13 +25,13 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.UUIDs;
|
|
|
-import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
-import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
|
|
+import org.elasticsearch.core.TimeValue;
|
|
|
+import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.index.Index;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
@@ -71,6 +72,7 @@ import static java.util.Collections.singletonList;
|
|
|
import static java.util.Collections.singletonMap;
|
|
|
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
|
|
|
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
|
|
+import static org.hamcrest.Matchers.anEmptyMap;
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
@@ -2150,11 +2152,133 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testSystemIndicesAreNotAutoFollowed() {
|
|
|
+ ClusterState clusterState = null;
|
|
|
+ final int nbLeaderSystemIndices = randomIntBetween(1, 15);
|
|
|
+ for (int i = 0; i < nbLeaderSystemIndices; i++) {
|
|
|
+ String indexName = "." + i;
|
|
|
+ if (clusterState == null) {
|
|
|
+ clusterState = createRemoteClusterState(indexName, true, 0, true);
|
|
|
+ } else {
|
|
|
+ clusterState = createRemoteClusterState(clusterState, true, indexName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults = executeAutoFollow(".*", clusterState);
|
|
|
+ assertThat(autoFollowResults.v1().size(), equalTo(1));
|
|
|
+ assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults, is(anEmptyMap()));
|
|
|
+ assertThat(autoFollowResults.v2(), is(empty()));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testSystemDataStreamsAreNotAutoFollowed() {
|
|
|
+ Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults =
|
|
|
+ executeAutoFollow("*.", createRemoteClusterStateWithDataStream(".test-data-stream"));
|
|
|
+
|
|
|
+ assertThat(autoFollowResults.v1().size(), equalTo(1));
|
|
|
+ assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults, is(anEmptyMap()));
|
|
|
+ assertThat(autoFollowResults.v2(), is(empty()));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testFollowerIndexIsCreatedInExecuteAutoFollow() {
|
|
|
+ final String indexName = "idx-1";
|
|
|
+ ClusterState clusterState = createRemoteClusterState(indexName, true, 0, false);
|
|
|
+
|
|
|
+ Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults = executeAutoFollow("idx-*", clusterState);
|
|
|
+ assertThat(autoFollowResults.v1().size(), equalTo(1));
|
|
|
+ assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults.size(), equalTo(1));
|
|
|
+ for (Map.Entry<Index, Exception> autoFollowEntry : autoFollowResults.v1().get(0).autoFollowExecutionResults.entrySet()) {
|
|
|
+ assertThat(autoFollowEntry.getKey().getName(), equalTo(indexName));
|
|
|
+ assertThat(autoFollowEntry.getValue(), nullValue());
|
|
|
+ }
|
|
|
+ assertThat(autoFollowResults.v2().contains(indexName), equalTo(true));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> executeAutoFollow(String indexPattern,
|
|
|
+ ClusterState finalRemoteState) {
|
|
|
+ final Client client = mock(Client.class);
|
|
|
+ when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
|
|
+
|
|
|
+ final String pattern = "pattern1";
|
|
|
+ final ClusterState localState = ClusterState.builder(new ClusterName("local"))
|
|
|
+ .metadata(Metadata.builder()
|
|
|
+ .putCustom(AutoFollowMetadata.TYPE,
|
|
|
+ new AutoFollowMetadata(
|
|
|
+ Map.of(
|
|
|
+ pattern,
|
|
|
+ new AutoFollowPattern(
|
|
|
+ "remote",
|
|
|
+ List.of(indexPattern),
|
|
|
+ Collections.emptyList(),
|
|
|
+ null,
|
|
|
+ Settings.EMPTY,
|
|
|
+ true,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null,
|
|
|
+ null
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ Map.of(pattern, List.of()),
|
|
|
+ Map.of(pattern, Map.of()))))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ final AtomicReference<ClusterState> lastModifiedClusterState = new AtomicReference<>(localState);
|
|
|
+ final List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
|
|
|
+ final Set<String> followedIndices = ConcurrentCollections.newConcurrentSet();
|
|
|
+ final AutoFollower autoFollower =
|
|
|
+ new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) {
|
|
|
+ @Override
|
|
|
+ void getRemoteClusterState(String remoteCluster,
|
|
|
+ long metadataVersion,
|
|
|
+ BiConsumer<ClusterStateResponse, Exception> handler) {
|
|
|
+ assertThat(remoteCluster, equalTo("remote"));
|
|
|
+ handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void createAndFollow(Map<String, String> headers,
|
|
|
+ PutFollowAction.Request followRequest,
|
|
|
+ Runnable successHandler,
|
|
|
+ Consumer<Exception> failureHandler) {
|
|
|
+ followedIndices.add(followRequest.getLeaderIndex());
|
|
|
+ successHandler.run();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
|
|
+ lastModifiedClusterState.updateAndGet(updateFunction::apply);
|
|
|
+ handler.accept(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
|
|
|
+ // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
|
|
|
+ }
|
|
|
+ };
|
|
|
+ autoFollower.start();
|
|
|
+
|
|
|
+ assertThat(results, notNullValue());
|
|
|
+ return Tuple.tuple(results, followedIndices);
|
|
|
+ }
|
|
|
+
|
|
|
private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes) {
|
|
|
return createRemoteClusterState(indexName, enableSoftDeletes, 0L);
|
|
|
}
|
|
|
|
|
|
private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes, long metadataVersion) {
|
|
|
+ return createRemoteClusterState(indexName, enableSoftDeletes, metadataVersion, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static ClusterState createRemoteClusterState(String indexName,
|
|
|
+ boolean enableSoftDeletes,
|
|
|
+ long metadataVersion,
|
|
|
+ boolean systemIndex) {
|
|
|
Settings.Builder indexSettings;
|
|
|
if (enableSoftDeletes == false) {
|
|
|
indexSettings = settings(VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_8_0_0))
|
|
@@ -2168,6 +2292,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
.settings(indexSettings)
|
|
|
.numberOfShards(1)
|
|
|
.numberOfReplicas(0)
|
|
|
+ .system(systemIndex)
|
|
|
.build();
|
|
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
|
|
.metadata(Metadata.builder()
|
|
@@ -2180,7 +2305,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
|
|
}
|
|
|
|
|
|
+
|
|
|
private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) {
|
|
|
+ return createRemoteClusterState(previous, false, indices);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static ClusterState createRemoteClusterState(final ClusterState previous, boolean systemIndices, final String... indices) {
|
|
|
if (indices == null) {
|
|
|
return previous;
|
|
|
}
|
|
@@ -2192,6 +2322,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
|
|
|
.numberOfShards(1)
|
|
|
.numberOfReplicas(0)
|
|
|
+ .system(systemIndices)
|
|
|
.build();
|
|
|
metadataBuilder.put(indexMetadata, true);
|
|
|
routingTableBuilder.add(IndexRoutingTable.builder(indexMetadata.getIndex())
|
|
@@ -2230,6 +2361,10 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
|
|
|
+ return createRemoteClusterStateWithDataStream(dataStreamName, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
|
|
|
Settings.Builder indexSettings = settings(Version.CURRENT);
|
|
|
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
|
|
|
indexSettings.put("index.hidden", true);
|
|
@@ -2238,9 +2373,10 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|
|
.settings(indexSettings)
|
|
|
.numberOfShards(1)
|
|
|
.numberOfReplicas(0)
|
|
|
+ .system(system)
|
|
|
.build();
|
|
|
DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"),
|
|
|
- List.of(indexMetadata.getIndex()));
|
|
|
+ List.of(indexMetadata.getIndex()), 1, null, false, false, system);
|
|
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
|
|
.metadata(Metadata.builder()
|
|
|
.put(indexMetadata, true)
|