Bladeren bron

Updating TransportRolloverAction.checkBlock so that non-write-index blocks do not prevent data stream rollover (#122905)

Keith Massey 8 maanden geleden
bovenliggende
commit
41dae025e7

+ 6 - 0
docs/changelog/122905.yaml

@@ -0,0 +1,6 @@
+pr: 122905
+summary: Updating `TransportRolloverAction.checkBlock` so that non-write-index blocks
+  do not prevent data stream rollover
+area: Data streams
+type: bug
+issues: []

+ 5 - 11
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

@@ -104,6 +104,7 @@ import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycle
 import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
 import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -785,14 +786,10 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
                 ).get();
                 DataStreamLifecycleHealthInfo dslHealthInfoOnHealthNode = healthNodeResponse.getHealthInfo().dslHealthInfo();
                 assertThat(dslHealthInfoOnHealthNode, is(not(DataStreamLifecycleHealthInfo.NO_DSL_ERRORS)));
-                // perhaps surprisingly rollover and delete are error-ing due to the read_only block on the first generation
-                // index which prevents metadata updates so rolling over the data stream is also blocked (note that both indices error at
-                // the same time so they'll have an equal retry count - the order becomes of the results, usually ordered by retry count,
-                // becomes non deterministic, hence the dynamic matching of index name)
-                assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(2));
+                assertThat(dslHealthInfoOnHealthNode.dslErrorsInfo().size(), is(1));
                 DslErrorInfo errorInfo = dslHealthInfoOnHealthNode.dslErrorsInfo().get(0);
                 assertThat(errorInfo.retryCount(), greaterThanOrEqualTo(3));
-                assertThat(List.of(firstGenerationIndex, secondGenerationIndex).contains(errorInfo.indexName()), is(true));
+                assertThat(errorInfo.indexName(), equalTo(firstGenerationIndex));
             });
 
             GetHealthAction.Response healthResponse = client().execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true, 1000))
@@ -808,15 +805,12 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
                 assertThat(dslIndicator.impacts(), is(STAGNATING_INDEX_IMPACT));
                 assertThat(
                     dslIndicator.symptom(),
-                    is("2 backing indices have repeatedly encountered errors whilst trying to advance in its lifecycle")
+                    is("A backing index has repeatedly encountered errors whilst trying to advance in its lifecycle")
                 );
 
                 Diagnosis diagnosis = dslIndicator.diagnosisList().get(0);
                 assertThat(diagnosis.definition(), is(STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF));
-                assertThat(
-                    diagnosis.affectedResources().get(0).getValues(),
-                    containsInAnyOrder(firstGenerationIndex, secondGenerationIndex)
-                );
+                assertThat(diagnosis.affectedResources().get(0).getValues(), contains(firstGenerationIndex));
             }
 
             // let's mark the index as writeable and make sure it's deleted and the error store is empty

+ 27 - 6
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -154,12 +154,33 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                 .build(),
             IndicesOptions.GatekeeperOptions.DEFAULT
         );
-
-        return state.blocks()
-            .indicesBlockedException(
-                ClusterBlockLevel.METADATA_WRITE,
-                indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request)
-            );
+        ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions());
+        final IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(resolvedRolloverTarget.resource());
+        final String[] indicesToCheck;
+        if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
+            DataStream dataStream = (DataStream) indexAbstraction;
+            boolean targetFailureStore = resolvedRolloverTarget.selector() != null
+                && resolvedRolloverTarget.selector().shouldIncludeFailures();
+            if (targetFailureStore == false) {
+                assert dataStream.getWriteIndex() != null : dataStream.getName() + " is a data stream but has no write index";
+                assert dataStream.getWriteIndex().getName() != null
+                    : dataStream.getName() + " is a data stream but the write index is null";
+                indicesToCheck = new String[] { dataStream.getWriteIndex().getName() };
+            } else if (dataStream.getWriteFailureIndex() != null) {
+                assert dataStream.getWriteFailureIndex().getName() != null
+                    : "the write index for the data stream " + dataStream.getName() + " is null";
+                indicesToCheck = new String[] { dataStream.getWriteFailureIndex().getName() };
+            } else {
+                indicesToCheck = null;
+            }
+        } else {
+            indicesToCheck = indexNameExpressionResolver.concreteIndexNames(state, indicesOptions, request);
+        }
+        if (indicesToCheck == null) {
+            return null;
+        } else {
+            return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indicesToCheck);
+        }
     }
 
     @Override

+ 222 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.indices.rollover;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.admin.indices.stats.CommonStats;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@@ -19,11 +20,14 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlocks;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -43,6 +47,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.cache.query.QueryCacheStats;
@@ -578,6 +583,223 @@ public class TransportRolloverActionTests extends ESTestCase {
         assertThat(illegalStateException.getMessage(), containsString("Aliases to data streams cannot be rolled over."));
     }
 
+    public void testCheckBlockForIndices() {
+        final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
+            mock(TransportService.class),
+            mockClusterService,
+            mockThreadPool,
+            mockActionFilters,
+            mockIndexNameExpressionResolver,
+            rolloverService,
+            mockClient,
+            mockAllocationService,
+            mockMetadataDataStreamService,
+            dataStreamAutoShardingService
+        );
+        final IndexMetadata.Builder indexMetadata1 = IndexMetadata.builder("my-index-1")
+            .putAlias(AliasMetadata.builder("my-alias").writeIndex(true).build())
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(1);
+        final IndexMetadata indexMetadata2 = IndexMetadata.builder("my-index-2")
+            .settings(settings(IndexVersion.current()).put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true))
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .build();
+        final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(indexMetadata1).put(indexMetadata2, false))
+            .blocks(ClusterBlocks.builder().addBlocks(indexMetadata2))
+            .build();
+        {
+            RolloverRequest rolloverRequest = new RolloverRequest("my-alias", "my-new-index");
+            when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
+                new String[] { "my-index-1" }
+            );
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
+        }
+        {
+            RolloverRequest rolloverRequest = new RolloverRequest("my-index-2", "my-new-index");
+            when(mockIndexNameExpressionResolver.concreteIndexNames(any(), any(), (IndicesRequest) any())).thenReturn(
+                new String[] { "my-index-2" }
+            );
+            assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, stateBefore));
+        }
+    }
+
+    public void testCheckBlockForDataStreams() {
+        final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
+            mock(TransportService.class),
+            mockClusterService,
+            mockThreadPool,
+            mockActionFilters,
+            mockIndexNameExpressionResolver,
+            rolloverService,
+            mockClient,
+            mockAllocationService,
+            mockMetadataDataStreamService,
+            dataStreamAutoShardingService
+        );
+        String dataStreamName = randomAlphaOfLength(20);
+        {
+            // First, make sure checkBlock returns null when there are no blocks
+            final ClusterState clusterState = createDataStream(
+                dataStreamName,
+                false,
+                false,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+        {
+            // Make sure checkBlock returns null when indices other than the write index have blocks
+            final ClusterState clusterState = createDataStream(
+                dataStreamName,
+                false,
+                true,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+        {
+            // Make sure checkBlock returns null when indices other than the write index have blocks and we use "::data"
+            final ClusterState clusterState = createDataStream(
+                dataStreamName,
+                false,
+                true,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+        {
+            // Make sure checkBlock returns an exception when the write index has a block
+            ClusterState clusterState = createDataStream(
+                dataStreamName,
+                true,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName, null);
+            if (randomBoolean()) {
+                rolloverRequest.setIndicesOptions(IndicesOptions.lenientExpandOpenNoSelectors());
+            }
+            ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
+            assertNotNull(e);
+        }
+        {
+            // Make sure checkBlock returns an exception when the write index has a block and we use "::data"
+            ClusterState clusterState = createDataStream(
+                dataStreamName,
+                true,
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean(),
+                randomBoolean()
+            );
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::data", null);
+            ClusterBlockException e = transportRolloverAction.checkBlock(rolloverRequest, clusterState);
+            assertNotNull(e);
+        }
+    }
+
+    public void testCheckBlockForDataStreamFailureStores() {
+        final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
+            mock(TransportService.class),
+            mockClusterService,
+            mockThreadPool,
+            mockActionFilters,
+            mockIndexNameExpressionResolver,
+            rolloverService,
+            mockClient,
+            mockAllocationService,
+            mockMetadataDataStreamService,
+            dataStreamAutoShardingService
+        );
+        String dataStreamName = randomAlphaOfLength(20);
+        {
+            // Make sure checkBlock returns no exception when there is no failure store block
+            ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, false);
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+        {
+            // Make sure checkBlock returns an exception when the failure store write index has a block
+            ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, true, randomBoolean());
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
+            assertNotNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+        {
+            // Make sure checkBlock returns no exception when failure store non-write indices have a block
+            ClusterState clusterState = createDataStream(dataStreamName, randomBoolean(), randomBoolean(), true, false, true);
+            RolloverRequest rolloverRequest = new RolloverRequest(dataStreamName + "::failures", null);
+            assertNull(transportRolloverAction.checkBlock(rolloverRequest, clusterState));
+        }
+    }
+
+    private ClusterState createDataStream(
+        String dataStreamName,
+        boolean blockOnWriteIndex,
+        boolean blocksOnNonWriteIndices,
+        boolean includeFailureStore,
+        boolean blockOnFailureStoreWriteIndex,
+        boolean blockOnFailureStoreNonWriteIndices
+    ) {
+        ClusterState.Builder clusterStateBuilder = ClusterState.builder(ClusterName.DEFAULT);
+        Metadata.Builder metadataBuilder = Metadata.builder();
+        ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder();
+        List<Index> indices = new ArrayList<>();
+        int totalIndices = randomIntBetween(1, 20);
+        for (int i = 0; i < totalIndices; i++) {
+            Settings.Builder settingsBuilder = settings(IndexVersion.current());
+            if ((blockOnWriteIndex && i == totalIndices - 1) || (blocksOnNonWriteIndices && i != totalIndices - 1)) {
+                settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
+            }
+            final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-00000" + (i + 1))
+                .settings(settingsBuilder)
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .build();
+            metadataBuilder.put(backingIndexMetadata, false);
+            indices.add(backingIndexMetadata.getIndex());
+            clusterBlocksBuilder.addBlocks(backingIndexMetadata);
+        }
+
+        DataStream.Builder dataStreamBuilder = DataStream.builder(dataStreamName, indices)
+            .setMetadata(Map.of())
+            .setIndexMode(randomFrom(IndexMode.values()));
+        if (includeFailureStore) {
+            List<Index> failureStoreIndices = new ArrayList<>();
+            int totalFailureStoreIndices = randomIntBetween(1, 20);
+            for (int i = 0; i < totalFailureStoreIndices; i++) {
+                Settings.Builder settingsBuilder = settings(IndexVersion.current());
+                if ((blockOnFailureStoreWriteIndex && i == totalFailureStoreIndices - 1)
+                    || (blockOnFailureStoreNonWriteIndices && i != totalFailureStoreIndices - 1)) {
+                    settingsBuilder.put(IndexMetadata.INDEX_READ_ONLY_SETTING.getKey(), true);
+                }
+                final IndexMetadata failureStoreIndexMetadata = IndexMetadata.builder(
+                    DataStream.getDefaultFailureStoreName(dataStreamName, i + 1, randomMillisUpToYear9999())
+                ).settings(settingsBuilder).numberOfShards(1).numberOfReplicas(1).build();
+                failureStoreIndices.add(failureStoreIndexMetadata.getIndex());
+                clusterBlocksBuilder.addBlocks(failureStoreIndexMetadata);
+            }
+            dataStreamBuilder.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStoreIndices).build());
+        }
+        clusterStateBuilder.blocks(clusterBlocksBuilder);
+        final DataStream dataStream = dataStreamBuilder.build();
+        metadataBuilder.put(dataStream);
+        return clusterStateBuilder.metadata(metadataBuilder).build();
+    }
+
     private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) {
         final CommonStats primaryStats = mock(CommonStats.class);
         when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));