Browse Source

Allow closing a write index of a data stream. (#70908)

Prior to this commit when attempting to close a data stream a validation error is returned indicating that it is forbidden to close a write index of a data stream. The idea behind that is to ensure that a data stream always can accept writes. For the same reason deleting a write index is not allowed (the write index can only be deleted when deleting the entire data stream).

However closing an index isn't as destructive as deleting an index (an open index request makes the write index available again) and there are other cases where a data stream can't accept writes. For example when primary shards of the write index are not available. So the original reasoning for not allowing to close a write index isn't that strong.

On top of this is that this also avoids certain administrative operations from being performed. For example restoring a snapshot containing data streams that already exist in the cluster (in place restore).

Closes #70903 #70861
Martijn van Groningen 4 years ago
parent
commit
633b66f09d

+ 0 - 12
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

@@ -133,18 +133,6 @@ public class MetadataIndexStateService {
         if (concreteIndices == null || concreteIndices.length == 0) {
             throw new IllegalArgumentException("Index name is required");
         }
-        List<String> writeIndices = new ArrayList<>();
-        SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
-        for (Index index : concreteIndices) {
-            IndexAbstraction ia = lookup.get(index.getName());
-            if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
-                writeIndices.add(index.getName());
-            }
-        }
-        if (writeIndices.size() > 0) {
-            throw new IllegalArgumentException("cannot close the following data stream write indices [" +
-                Strings.collectionToCommaDelimitedString(writeIndices) + "]");
-        }
 
         clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
             new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {

+ 0 - 37
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.cluster.metadata;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
 import org.elasticsearch.cluster.ClusterName;
@@ -24,11 +23,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
-import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -40,15 +36,12 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
 import org.elasticsearch.snapshots.SnapshotInfoTests;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
-import org.hamcrest.CoreMatchers;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -68,8 +61,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class MetadataIndexStateServiceTests extends ESTestCase {
 
@@ -316,34 +307,6 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
         assertThat(failedIndices, equalTo(disappearedIndices));
     }
 
-    public void testCloseCurrentWriteIndexForDataStream() {
-        int numDataStreams = randomIntBetween(1, 3);
-        List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
-        List<String> writeIndices = new ArrayList<>();
-        for (int k = 0; k < numDataStreams; k++) {
-            String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
-            int numBackingIndices = randomIntBetween(1, 5);
-            dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
-            writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices));
-        }
-        ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate, List.of());
-
-        ClusterService clusterService = mock(ClusterService.class);
-        when(clusterService.state()).thenReturn(cs);
-
-        List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
-        Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
-        for (int k = 0; k < indicesToDelete.size(); k++) {
-            Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
-            indicesToDeleteArray[k] = indexToDelete;
-        }
-        MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null);
-        CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
-        Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
-        assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
-                Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
-    }
-
     public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
         return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
     }

+ 116 - 9
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

@@ -12,28 +12,30 @@ import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
+import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
 import org.elasticsearch.snapshots.RestoreInfo;
 import org.elasticsearch.snapshots.SnapshotInProgressException;
 import org.elasticsearch.snapshots.SnapshotInfo;
 import org.elasticsearch.snapshots.SnapshotRestoreException;
 import org.elasticsearch.snapshots.SnapshotState;
+import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
 import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
 import org.elasticsearch.xpack.core.action.GetDataStreamAction;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.metadata.DataStream;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.snapshots.mockstore.MockRepository;
 import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
 import org.hamcrest.Matchers;
 import org.junit.After;
@@ -45,6 +47,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.contains;
@@ -52,6 +55,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
@@ -145,6 +149,109 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
         assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
     }
 
+    public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
+        CreateSnapshotResponse createSnapshotResponse = client.admin()
+            .cluster()
+            .prepareCreateSnapshot(REPO, SNAPSHOT)
+            .setWaitForCompletion(true)
+            .setIndices("ds")
+            .setIncludeGlobalState(false)
+            .get();
+
+        RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
+        assertEquals(RestStatus.OK, status);
+
+        assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
+
+        // Close all indices:
+        CloseIndexRequest closeIndexRequest = new CloseIndexRequest("*");
+        closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
+        assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());
+
+        RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
+            .cluster()
+            .prepareRestoreSnapshot(REPO, SNAPSHOT)
+            .setWaitForCompletion(true)
+            .setIndices("ds")
+            .get();
+        assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
+
+        assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, id).get().getSourceAsMap());
+        SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
+        assertEquals(1, hits.length);
+        assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
+
+        GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "*" });
+        GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get();
+        assertThat(
+            ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
+            contains(equalTo("ds"), equalTo("other-ds"))
+        );
+        List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
+        assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(DS_BACKING_INDEX_NAME));
+        backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
+        String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1);
+        assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName));
+    }
+
+    public void testSnapshotAndRestoreInPlace() throws Exception {
+        CreateSnapshotResponse createSnapshotResponse = client.admin()
+            .cluster()
+            .prepareCreateSnapshot(REPO, SNAPSHOT)
+            .setWaitForCompletion(true)
+            .setIndices("ds")
+            .setIncludeGlobalState(false)
+            .get();
+
+        RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
+        assertEquals(RestStatus.OK, status);
+
+        assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
+
+        // A rollover after taking snapshot. The new backing index should be a standalone index after restoring
+        // and not part of the data stream:
+        RolloverRequest rolloverRequest = new RolloverRequest("ds", null);
+        RolloverResponse rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
+        assertThat(rolloverResponse.isRolledOver(), is(true));
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 2)));
+
+        // Close all backing indices of ds data stream:
+        CloseIndexRequest closeIndexRequest = new CloseIndexRequest(".ds-ds-*");
+        closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
+        assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());
+
+        RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
+            .cluster()
+            .prepareRestoreSnapshot(REPO, SNAPSHOT)
+            .setWaitForCompletion(true)
+            .setIndices("ds")
+            .get();
+        assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
+
+        assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, id).get().getSourceAsMap());
+        SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
+        assertEquals(1, hits.length);
+        assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
+
+        GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "ds" });
+        GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet();
+        assertThat(
+            ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
+            contains(equalTo("ds"))
+        );
+        List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
+        assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1));
+        assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(DS_BACKING_INDEX_NAME)));
+
+        // The backing index created as part of rollover should still exist (but just not part of the data stream)
+        assertThat(indexExists(DataStream.getDefaultBackingIndexName("ds", 2)), is(true));
+        // An additional rollover should create a new backing index (3th generation) and leave .ds-ds-...-2 index as is:
+        rolloverRequest = new RolloverRequest("ds", null);
+        rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
+        assertThat(rolloverResponse.isRolledOver(), is(true));
+        assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3)));
+    }
+
     public void testSnapshotAndRestoreAll() throws Exception {
         CreateSnapshotResponse createSnapshotResponse = client.admin()
             .cluster()

+ 17 - 0
x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java

@@ -22,12 +22,14 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
@@ -42,6 +44,7 @@ import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.SortedMap;
 
 public final class TransportFreezeIndexAction extends
     TransportMasterNodeAction<FreezeRequest, FreezeResponse> {
@@ -133,6 +136,20 @@ public final class TransportFreezeIndexAction extends
                 })) {
             @Override
             public ClusterState execute(ClusterState currentState) {
+                List<String> writeIndices = new ArrayList<>();
+                SortedMap<String, IndexAbstraction> lookup = currentState.metadata().getIndicesLookup();
+                for (Index index : concreteIndices) {
+                    IndexAbstraction ia = lookup.get(index.getName());
+                    if (ia != null && ia.getParentDataStream() != null &&
+                        ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
+                        writeIndices.add(index.getName());
+                    }
+                }
+                if (writeIndices.size() > 0) {
+                    throw new IllegalArgumentException("cannot freeze the following data stream write indices [" +
+                        Strings.collectionToCommaDelimitedString(writeIndices) + "]");
+                }
+
                 final Metadata.Builder builder = Metadata.builder(currentState.metadata());
                 ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
                 for (Index index : concreteIndices) {

+ 0 - 33
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml

@@ -142,39 +142,6 @@
         name: simple-data-stream1
   - is_true: acknowledged
 
----
-"Close write index for data stream fails":
-  - skip:
-      version: " - 7.8.99"
-      reason: "data streams only supported in 7.9+"
-      features: allowed_warnings
-
-  - do:
-      allowed_warnings:
-        - "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
-      indices.put_index_template:
-        name: my-template1
-        body:
-          index_patterns: [simple-data-stream1]
-          data_stream: {}
-
-  - do:
-      indices.create_data_stream:
-        name: simple-data-stream1
-  - is_true: acknowledged
-
-  - do:
-      catch: bad_request
-      indices.close:
-        index: ".ds-simple-data-stream1-*000001"
-      allowed_warnings:
-        - "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
-
-  - do:
-      indices.delete_data_stream:
-        name: simple-data-stream1
-  - is_true: acknowledged
-
 ---
 "Prohibit split on data stream's write index":
   - skip: