فهرست منبع

Prohibit append-only writes targeting backing indices directly. (#57788)

Append-only writes can only target the corresponding data stream.

Relates to #53100
Martijn van Groningen 5 سال پیش
والد
کامیت
01b70b4068

+ 56 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

@@ -200,3 +200,59 @@ setup:
       catch: missing
       indices.get:
         index: ".ds-simple-data-stream1-000001"
+
+---
+"append-only writes to backing indices prohobited":
+  - skip:
+      version: " - 7.9.99"
+      reason: "enable in 7.9+ when backported"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            timestamp_field: timestamp
+
+  - do:
+      index:
+        index:  logs-foobar
+        body:   { foo: bar }
+  - match: { _index: .ds-logs-foobar-000001 }
+
+  - do:
+      catch: bad_request
+      index:
+        index:  .ds-logs-foobar-000001
+        body:   { foo: bar }
+
+  - do:
+      bulk:
+        body:
+          - create:
+              _index: .ds-logs-foobar-000001
+          - foo: bar
+          - index:
+              _index: .ds-logs-foobar-000001
+          - foo: bar
+          - create:
+              _index: logs-foobar
+          - foo: bar
+  - match: { errors: true }
+  - match: { items.0.create.status: 400 }
+  - match: { items.0.create.error.type: illegal_argument_exception }
+  - match: { items.0.create.error.reason: "index request with op_type=create targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" }
+  - match: { items.1.index.status: 400 }
+  - match: { items.1.index.error.type: illegal_argument_exception }
+  - match: { items.1.index.error.reason: "index request with op_type=index and no if_primary_term and if_seq_no set targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead" }
+  - match: { items.2.create.result: created }
+  - match: { items.2.create._index: .ds-logs-foobar-000001 }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged

+ 38 - 0
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -49,6 +49,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -90,6 +91,8 @@ import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
+import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 
 /**
  * Groups bulk request items by shard, optionally creating non-existent indices and
@@ -271,6 +274,40 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
     }
 
+    static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, Metadata metadata) {
+        IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(writeRequest.index());
+        if (indexAbstraction == null) {
+            return;
+        }
+        if (indexAbstraction.getType() != IndexAbstraction.Type.CONCRETE_INDEX) {
+            return;
+        }
+        if (indexAbstraction.getParentDataStream() == null) {
+            return;
+        }
+
+        DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
+
+        // At this point with write op is targeting a backing index of a data stream directly,
+        // so checking if write op is append-only and if so fail.
+        // (Updates and deletes are allowed to target a backing index)
+
+        DocWriteRequest.OpType opType = writeRequest.opType();
+        // CREATE op_type is considered append-only and
+        // INDEX op_type is considered append-only when no if_primary_term and if_seq_no is specified.
+        // (the latter maybe an update, but at this stage we can't determine that. In order to determine
+        // that an engine level change is needed and for now this check is sufficient.)
+        if (opType == DocWriteRequest.OpType.CREATE) {
+            throw new IllegalArgumentException("index request with op_type=create targeting backing indices is disallowed, " +
+                "target corresponding data stream [" + dataStream.getName() + "] instead");
+        }
+        if (opType == DocWriteRequest.OpType.INDEX && writeRequest.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM &&
+            writeRequest.ifSeqNo() == UNASSIGNED_SEQ_NO) {
+            throw new IllegalArgumentException("index request with op_type=index and no if_primary_term and if_seq_no set " +
+                "targeting backing indices is disallowed, target corresponding data stream [" + dataStream.getName() + "] instead");
+        }
+    }
+
     static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final Metadata metadata) {
         if (indexRequest.isPipelineResolved() == false) {
             final String requestPipeline = indexRequest.getPipeline();
@@ -454,6 +491,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                     switch (docWriteRequest.opType()) {
                         case CREATE:
                         case INDEX:
+                            prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
                             IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                             final IndexMetadata indexMetadata = metadata.index(concreteIndex);
                             MappingMetadata mappingMd = indexMetadata.mapping();

+ 52 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -29,7 +30,9 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -55,6 +58,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -292,4 +296,52 @@ public class TransportBulkActionTests extends ESTestCase {
         }
     }
 
+    public void testProhibitAppendWritesInBackingIndices() throws Exception {
+        String dataStreamName = "logs-foobar";
+        ClusterState clusterState = createDataStream(dataStreamName);
+        Metadata metadata = clusterState.metadata();
+
+        // Testing create op against backing index fails:
+        String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        IndexRequest invalidRequest1 = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.CREATE);
+        Exception e = expectThrows(IllegalArgumentException.class,
+            () -> TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest1, metadata));
+        assertThat(e.getMessage(), equalTo("index request with op_type=create targeting backing indices is disallowed, " +
+            "target corresponding data stream [logs-foobar] instead"));
+
+        // Testing index op against backing index fails:
+        IndexRequest invalidRequest2 = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX);
+        e = expectThrows(IllegalArgumentException.class,
+            () -> TransportBulkAction.prohibitAppendWritesInBackingIndices(invalidRequest2, metadata));
+        assertThat(e.getMessage(), equalTo("index request with op_type=index and no if_primary_term and if_seq_no set " +
+            "targeting backing indices is disallowed, target corresponding data stream [logs-foobar] instead"));
+
+        // Testing valid writes ops against a backing index:
+        DocWriteRequest<?> validRequest = new IndexRequest(backingIndexName).opType(DocWriteRequest.OpType.INDEX)
+            .setIfSeqNo(1).setIfPrimaryTerm(1);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+        validRequest = new DeleteRequest(backingIndexName);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+        validRequest = new UpdateRequest(backingIndexName, "_id");
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+
+        // Testing append only write via ds name
+        validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+
+        validRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.INDEX);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+
+        // Append only for a backing index that doesn't exist is allowed:
+        validRequest = new IndexRequest(DataStream.getDefaultBackingIndexName("logs-barbaz", 1))
+            .opType(DocWriteRequest.OpType.CREATE);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+
+        // Some other index names:
+        validRequest = new IndexRequest("my-index").opType(DocWriteRequest.OpType.CREATE);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+        validRequest = new IndexRequest("foobar").opType(DocWriteRequest.OpType.CREATE);
+        TransportBulkAction.prohibitAppendWritesInBackingIndices(validRequest, metadata);
+    }
+
 }

+ 12 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java

@@ -134,6 +134,18 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
             equalTo("matching index template [template] for data stream [my-data-stream] has no data stream template"));
     }
 
+    public static ClusterState createDataStream(final String dataStreamName) throws Exception {
+        final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
+        ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStreamName + "*"), null, null, null, null, null,
+            new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
+        ClusterState cs = ClusterState.builder(new ClusterName("_name"))
+            .metadata(Metadata.builder().put("template", template).build())
+            .build();
+        MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest req =
+            new MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
+        return MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
+    }
+
     private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
         MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
         when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))