Browse Source

Prohibit closing the write index for a data stream (#57692)

Dan Hermann 5 years ago
parent
commit
c4334ee074

+ 31 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml

@@ -116,3 +116,34 @@
   - match: { indices.index_2.closed: true }
   - match: { indices.index_3.closed: true }
 
+---
+"Close write index for data stream fails":
+  - skip:
+      version: " - 7.99.99"
+      reason: "data streams not yet released"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template1
+        body:
+          index_patterns: [simple-data-stream1]
+          data_stream:
+            timestamp_field: '@timestamp'
+
+  - do:
+      indices.create_data_stream:
+        name: simple-data-stream1
+  - is_true: acknowledged
+
+  - do:
+      catch: bad_request
+      indices.close:
+        index: "simple-data-stream1-000001"
+
+  - do:
+      indices.delete_data_stream:
+        name: simple-data-stream1
+  - is_true: acknowledged

+ 13 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

@@ -84,6 +84,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -137,6 +138,18 @@ public class MetadataIndexStateService {
         if (concreteIndices == null || concreteIndices.length == 0) {
             throw new IllegalArgumentException("Index name is required");
         }
+        List<String> writeIndices = new ArrayList<>();
+        SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
+        for (Index index : concreteIndices) {
+            IndexAbstraction ia = lookup.get(index.getName());
+            if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
+                writeIndices.add(index.getName());
+            }
+        }
+        if (writeIndices.size() > 0) {
+            throw new IllegalArgumentException("cannot close the following data stream write indices [" +
+                Strings.collectionToCommaDelimitedString(writeIndices) + "]");
+        }
 
         clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
             new ClusterStateUpdateTask(Priority.URGENT) {

+ 39 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java

@@ -20,8 +20,10 @@
 package org.elasticsearch.cluster.metadata;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
+import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.RestoreInProgress;
@@ -37,8 +39,11 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -50,11 +55,15 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
 import org.elasticsearch.snapshots.SnapshotInfoTests;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
+import org.hamcrest.CoreMatchers;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -75,6 +84,8 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class MetadataIndexStateServiceTests extends ESTestCase {
 
@@ -380,6 +391,34 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
         assertThat(failedIndices, equalTo(disappearedIndices));
     }
 
+    public void testCloseCurrentWriteIndexForDataStream() {
+        int numDataStreams = randomIntBetween(1, 3);
+        List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
+        List<String> writeIndices = new ArrayList<>();
+        for (int k = 0; k < numDataStreams; k++) {
+            String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
+            int numBackingIndices = randomIntBetween(1, 5);
+            dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
+            writeIndices.add(DataStream.getBackingIndexName(dataStreamName, numBackingIndices));
+        }
+        ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dataStreamsToCreate, List.of());
+
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(cs);
+
+        List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
+        Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
+        for (int k = 0; k < indicesToDelete.size(); k++) {
+            Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
+            indicesToDeleteArray[k] = indexToDelete;
+        }
+        MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null);
+        CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
+        Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
+        assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
+                Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
+    }
+
     public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
         return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
     }