Browse Source

Restrict failure stores from replicating via CCR (#126355) (#126557)

Checks to see if an index belongs to a data stream's failure store before following it. If the index is a 
failure index, the follow operation is rejected. Also updates the logic in the auto follower API's to 
exclude failure indices on data streams from being followed if their parent data stream matches the 
follow pattern.
James Baiera 6 months ago
parent
commit
e724dafaac

+ 30 - 1
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

@@ -130,7 +130,7 @@ public class CcrLicenseChecker {
             client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
             RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
         );
-        checkRemoteClusterLicenseAndFetchClusterState(
+        doCheckRemoteClusterLicenseAndFetchClusterState(
             client,
             clusterAlias,
             remoteClient,
@@ -165,6 +165,12 @@ public class CcrLicenseChecker {
                 final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
                     ? indexAbstraction.getParentDataStream()
                     : null;
+                // Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
+                if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
+                    String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
+                    onFailure.accept(new IllegalArgumentException(message));
+                    return;
+                }
                 hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
                     if (e == null) {
                         fetchLeaderHistoryUUIDs(
@@ -228,6 +234,29 @@ public class CcrLicenseChecker {
         }
     }
 
+    // overridable for testing
+    protected void doCheckRemoteClusterLicenseAndFetchClusterState(
+        final Client client,
+        final String clusterAlias,
+        final RemoteClusterClient remoteClient,
+        final ClusterStateRequest request,
+        final Consumer<Exception> onFailure,
+        final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
+        final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
+        final Function<Exception, ElasticsearchStatusException> unknownLicense
+    ) {
+        checkRemoteClusterLicenseAndFetchClusterState(
+            client,
+            clusterAlias,
+            remoteClient,
+            request,
+            onFailure,
+            leaderClusterStateConsumer,
+            nonCompliantLicense,
+            unknownLicense
+        );
+    }
+
     /**
      * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
      * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,

+ 192 - 0
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseCheckerTests.java

@@ -7,18 +7,50 @@
 
 package org.elasticsearch.xpack.ccr;
 
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.RemoteClusterClient;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.DataStreamAlias;
+import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexVersion;
+import org.elasticsearch.indices.IndexClosedException;
+import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.security.user.User;
+import org.mockito.ArgumentCaptor;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
 
 public class CcrLicenseCheckerTests extends ESTestCase {
 
@@ -46,4 +78,164 @@ public class CcrLicenseCheckerTests extends ESTestCase {
         assertTrue(invoked.get());
     }
 
+    /**
+     * Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
+     */
+    public void testRemoteIndexValidation() {
+        // A cluster state with
+        // - a data stream, containing a backing index and a failure index
+        // - an alias that points to said data stream
+        // - a standalone index
+        // - an alias that points to said standalone index
+        // - a closed index
+        String indexName = "random-index";
+        String closedIndexName = "closed-index";
+        String dataStreamName = "logs-test-data";
+        String aliasName = "foo-alias";
+        String dsAliasName = "ds-alias";
+        IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
+            .putAlias(AliasMetadata.builder(aliasName))
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(5)
+            .numberOfReplicas(1)
+            .build();
+        IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(5)
+            .numberOfReplicas(1)
+            .state(IndexMetadata.State.CLOSE)
+            .build();
+        IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
+        IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
+        DataStream dataStream = DataStreamTestHelper.newInstance(
+            dataStreamName,
+            List.of(firstBackingIndex.getIndex()),
+            List.of(firstFailureStore.getIndex())
+        );
+        ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, false)
+                    .put(closedIndexMetadata, false)
+                    .put(firstBackingIndex, false)
+                    .put(firstFailureStore, false)
+                    .dataStreams(
+                        Map.of(dataStreamName, dataStream),
+                        Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
+                    )
+            )
+            .build();
+
+        final boolean isCcrAllowed = randomBoolean();
+        final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
+            @Override
+            User getUser(ThreadContext threadContext) {
+                return null;
+            }
+
+            @Override
+            protected void doCheckRemoteClusterLicenseAndFetchClusterState(
+                Client client,
+                String clusterAlias,
+                RemoteClusterClient remoteClient,
+                ClusterStateRequest request,
+                Consumer<Exception> onFailure,
+                Consumer<ClusterStateResponse> leaderClusterStateConsumer,
+                Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
+                Function<Exception, ElasticsearchStatusException> unknownLicense
+            ) {
+                leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
+            }
+
+            @Override
+            public void hasPrivilegesToFollowIndices(
+                ThreadContext threadContext,
+                RemoteClusterClient remoteClient,
+                String[] indices,
+                Consumer<Exception> handler
+            ) {
+                fail("Test case should fail before this code is called");
+            }
+        };
+
+        String clusterAlias = randomIdentifier();
+
+        ExecutorService mockExecutor = mock(ExecutorService.class);
+        ThreadPool mockThreadPool = mock(ThreadPool.class);
+        when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
+        RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);
+
+        Client mockClient = mock(Client.class);
+        when(mockClient.threadPool()).thenReturn(mockThreadPool);
+        when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);
+
+        // When following an index that does not exist, throw IndexNotFoundException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
+            assertThat(exception, instanceOf(IndexNotFoundException.class));
+            assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
+        }
+
+        // When following an alias, throw IllegalArgumentException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
+            assertThat(exception, instanceOf(IllegalArgumentException.class));
+            assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
+        }
+
+        // When following a data stream, throw IllegalArgumentException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
+            assertThat(exception, instanceOf(IllegalArgumentException.class));
+            assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
+        }
+
+        // When following a data stream alias, throw IllegalArgumentException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
+            assertThat(exception, instanceOf(IllegalArgumentException.class));
+            assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
+        }
+
+        // When following a closed index, throw IndexClosedException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
+            assertThat(exception, instanceOf(IndexClosedException.class));
+            assertThat(exception.getMessage(), equalTo("closed"));
+        }
+
+        // When following a failure store index, throw IllegalArgumentException
+        {
+            Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
+            assertThat(exception, instanceOf(IllegalArgumentException.class));
+            assertThat(
+                exception.getMessage(),
+                equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
+            );
+        }
+    }
+
+    private static Exception executeExpectingException(
+        CcrLicenseChecker checker,
+        Client mockClient,
+        String clusterAlias,
+        String leaderIndex
+    ) {
+        @SuppressWarnings("unchecked")
+        Consumer<Exception> onFailure = mock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
+        checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
+            mockClient,
+            clusterAlias,
+            leaderIndex,
+            onFailure,
+            consumer
+        );
+        ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
+        verify(onFailure, times(1)).accept(captor.capture());
+        verifyNoInteractions(consumer);
+        return captor.getValue();
+    }
+
 }

+ 53 - 8
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -169,7 +169,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         Client client = mock(Client.class);
         when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));
 
-        ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
+        ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);
 
         AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
         Map<String, AutoFollowPattern> patterns = new HashMap<>();
@@ -2616,23 +2616,53 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
     }
 
     private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
-        return createRemoteClusterStateWithDataStream(dataStreamName, false);
+        return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
     }
 
-    private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
+    private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
+        long currentTimeMillis = System.currentTimeMillis();
+
         Settings.Builder indexSettings = settings(IndexVersion.current());
         indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
         indexSettings.put("index.hidden", true);
 
-        IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
+        IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
             .settings(indexSettings)
             .numberOfShards(1)
             .numberOfReplicas(0)
             .system(system)
             .build();
-        DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
-        ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
-            .metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));
+
+        IndexMetadata failureIndexMetadata = null;
+        DataStream.DataStreamIndices failureStore = null;
+        if (withFailures) {
+            Settings.Builder failureIndexSettings = settings(IndexVersion.current());
+            failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
+            failureIndexSettings.put("index.hidden", true);
+
+            String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
+            failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
+                .settings(failureIndexSettings)
+                .numberOfShards(1)
+                .numberOfReplicas(0)
+                .system(system)
+                .build();
+
+            failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
+        }
+
+        var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
+        if (withFailures) {
+            dataStreamBuilder.setFailureIndices(failureStore);
+        }
+        DataStream dataStream = dataStreamBuilder.build();
+
+        var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
+        if (withFailures) {
+            mdBuilder.put(failureIndexMetadata, true);
+        }
+
+        var routingTableBuilder = RoutingTable.builder();
 
         ShardRouting shardRouting = TestShardRouting.newShardRouting(
             new ShardId(indexMetadata.getIndex(), 0),
@@ -2641,7 +2671,22 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             ShardRoutingState.INITIALIZING
         ).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
         IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
-        return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
+        routingTableBuilder.add(indexRoutingTable);
+
+        if (withFailures) {
+            ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
+                new ShardId(failureIndexMetadata.getIndex(), 0),
+                "1",
+                true,
+                ShardRoutingState.INITIALIZING
+            ).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
+            IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
+                .addShard(failureShardRouting)
+                .build();
+            routingTableBuilder.add(failureIndexRoutingTable);
+        }
+
+        return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
     }
 
 }

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java

@@ -313,6 +313,7 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<Metadata.Custom> i
                 final DataStream parentDataStream = indexAbstraction.getParentDataStream();
                 return parentDataStream != null
                     && parentDataStream.isSystem() == false
+                    && parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
                     && Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
                     && Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
             }