Pārlūkot izejas kodu

Add data stream support to the reindex api. (#57870)

This change now also copies the op_type from the reindex request's destination index request to the actual index request being used in the bulk request.

For ensuring no document exists, the op_type create doesn't need to be copied, since Versions.MATCH_DELETED will copied from the 'mainRequest.getDestination().version()'.
The `version()` method on IndexRequest only returns Versions.MATCH_DELETED if op_type=create and no specific version has been specified.

However in order to be able to index into a data stream, the op_type must be create. So in order to support that the op_type must be copied from the reindex request's destination index request to the actual index request being used in the bulk request.

Relates to #53100 and #57788
Martijn van Groningen 5 gadi atpakaļ
vecāks
revīzija
05b277552a

+ 1 - 1
docs/reference/indices/rollover-index.asciidoc

@@ -254,7 +254,7 @@ POST /my-data-stream/_rollover <2>
 --------------------------------------------------
 // TEST[continued]
 // TEST[setup:huge_twitter]
-// TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":".ds-my-data-stream-000001"}}/]
+// TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":"my-data-stream","op_type":"create"}}/]
 <1> Creates a data stream called `my-data-stream` with one initial backing index
 named `my-data-stream-000001`.
 <2> This request creates a new backing index, `my-data-stream-000002`, and adds

+ 2 - 2
modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexValidator.java

@@ -120,9 +120,9 @@ class ReindexValidator {
              * it. This is the same sort of dance that TransportIndexRequest
              * uses to decide to autocreate the index.
              */
-            target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination, false).getName();
+            target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination, true).getName();
         }
-        for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
+        for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source, true)) {
             if (sourceIndex.equals(target)) {
                 ActionRequestValidationException e = new ActionRequestValidationException();
                 e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');

+ 4 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java

@@ -30,6 +30,7 @@ import org.apache.http.message.BasicHeader;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BackoffPolicy;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -265,7 +266,9 @@ public class Reindexer {
              */
             index.routing(mainRequest.getDestination().routing());
             index.setPipeline(mainRequest.getDestination().getPipeline());
-            // OpType is synthesized from version so it is handled when we copy version above.
+            if (mainRequest.getDestination().opType() == DocWriteRequest.OpType.CREATE) {
+                index.opType(mainRequest.getDestination().opType());
+            }
 
             return wrap(index);
         }

+ 116 - 0
modules/reindex/src/test/resources/rest-api-spec/test/reindex/96_data_streams.yml

@@ -0,0 +1,116 @@
+---
+setup:
+  - skip:
+      features: allowed_warnings
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            timestamp_field: timestamp
+          template:
+            mappings:
+              properties:
+                timestamp:
+                  type: date
+
+---
+teardown:
+  - do:
+      indices.delete_data_stream:
+        name: '*'
+
+---
+"Reindex from data stream into another data stream":
+  - skip:
+      version: " - 7.99.99"
+      reason: "change to 7.8.99 after backport"
+      features: allowed_warnings
+
+  - do:
+      index:
+        index:  logs-foobar
+        refresh: true
+        body:   { foo: bar }
+
+  - do:
+      reindex:
+        refresh: true
+        body:
+          source:
+            index:   logs-foobar
+          dest:
+            index:   logs-barbaz
+            op_type: create
+
+  - do:
+      search:
+        index: logs-barbaz
+        body: { query: { match_all: {} } }
+  - length:   { hits.hits: 1  }
+  - match: { hits.hits.0._index: .ds-logs-barbaz-000001 }
+  - match: { hits.hits.0._source.foo: 'bar' }
+
+---
+"Reindex from index into data stream":
+  - skip:
+      version: " - 7.99.99"
+      reason: "change to 7.8.99 after backport"
+      features: allowed_warnings
+
+  - do:
+      index:
+        index: old-logs-index
+        refresh: true
+        body:   { foo: bar }
+
+  - do:
+      reindex:
+        refresh: true
+        body:
+          source:
+            index: old-logs-index
+          dest:
+            index: logs-foobar
+            op_type: create
+
+  - do:
+      search:
+        index: logs-foobar
+        body: { query: { match_all: {} } }
+  - length:   { hits.hits: 1  }
+  - match: { hits.hits.0._index: .ds-logs-foobar-000001 }
+  - match: { hits.hits.0._source.foo: 'bar' }
+
+---
+"Reindex from data source into an index":
+  - skip:
+      version: " - 7.99.99"
+      reason: "change to 7.8.99 after backport"
+      features: allowed_warnings
+
+  - do:
+      index:
+        index:  logs-foobar
+        refresh: true
+        body:   { foo: bar }
+
+  - do:
+      reindex:
+        refresh: true
+        body:
+          source:
+            index: logs-foobar
+          dest:
+            index: my-index
+
+  - do:
+      search:
+        index: my-index
+        body: { query: { match_all: {} } }
+  - length:   { hits.hits: 1  }
+  - match: { hits.hits.0._index: my-index }
+  - match: { hits.hits.0._source.foo: 'bar' }