Bläddra i källkod

Support `if_seq_no` and `if_primary_term` for ingest (#55430)

Allow for optimistic concurrency control during ingest by checking the
sequence number and primary term. This is accomplished by defining
`_if_seq_no` and `_if_primary_term` in the pipeline, similarly to `_version`
and `_version_type`.

Closes #41255
Maria Ralli 5 år sedan
förälder
incheckning
7c0b99634c

+ 16 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java

@@ -124,6 +124,22 @@ public class SetProcessorTests extends ESTestCase {
         assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
     }
 
+    public void testSetMetadataIfSeqNo() throws Exception {
+        long ifSeqNo = randomNonNegativeLong();
+        Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true);
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
+    }
+
+    public void testSetMetadataIfPrimaryTerm() throws Exception {
+        long ifPrimaryTerm = randomNonNegativeLong();
+        Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true);
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
+    }
+
     private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) {
         return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName),
                 ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled);

+ 62 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/260_seq_no.yml

@@ -0,0 +1,62 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "my_pipeline"
+        ignore: 404
+
+---
+"Test set _if_seq_no & _if_primary_term":
+  - do:
+      cluster.health:
+          wait_for_status: green
+
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "set" : {
+                  "field" : "_if_seq_no",
+                  "value": 0
+                }
+              },
+              {
+                "set" : {
+                  "field" : "_if_primary_term",
+                  "value": 1
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      catch: conflict
+      index:
+        index: test
+        id: 1
+        pipeline: "my_pipeline"
+        body: {}
+  - match: { error.root_cause.0.type: "version_conflict_engine_exception" }
+  - match: { error.root_cause.0.reason: "[1]: version conflict, required seqNo [0], primary term [1]. but no document was found" }
+
+  - do:
+      index:
+        index: test
+        id: 1
+        body: {}
+  - match: { _seq_no: 0 }
+  - match: { _primary_term: 1 }
+
+  - do:
+      index:
+        index: test
+        id: 1
+        pipeline: "my_pipeline"
+        body: {}
+  - match: { _seq_no: 1 }
+  - match: { _primary_term: 1 }

+ 8 - 0
server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

@@ -201,6 +201,14 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent
             }
             IngestDocument ingestDocument =
                 new IngestDocument(index, id, routing, version, versionType, document);
+            if (dataMap.containsKey(Metadata.IF_SEQ_NO.getFieldName())) {
+                Long ifSeqNo = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_SEQ_NO.getFieldName());
+                ingestDocument.setFieldValue(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo);
+            }
+            if (dataMap.containsKey(Metadata.IF_PRIMARY_TERM.getFieldName())) {
+                Long ifPrimaryTerm = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_PRIMARY_TERM.getFieldName());
+                ingestDocument.setFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm);
+            }
             ingestDocumentList.add(ingestDocument);
         }
         return ingestDocumentList;

+ 3 - 1
server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

@@ -700,7 +700,9 @@ public final class IngestDocument {
         ID(IdFieldMapper.NAME),
         ROUTING(RoutingFieldMapper.NAME),
         VERSION(VersionFieldMapper.NAME),
-        VERSION_TYPE("_version_type");
+        VERSION_TYPE("_version_type"),
+        IF_SEQ_NO("_if_seq_no"),
+        IF_PRIMARY_TERM("_if_primary_term");
 
         private final String fieldName;
 

+ 6 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -522,6 +522,12 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) {
                     indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE)));
                 }
+                if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) {
+                    indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue());
+                }
+                if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) {
+                    indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
+                }
                 indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
                 handler.accept(null);
             }

+ 9 - 1
server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java

@@ -46,6 +46,8 @@ import static org.elasticsearch.ingest.IngestDocument.Metadata.INDEX;
 import static org.elasticsearch.ingest.IngestDocument.Metadata.ROUTING;
 import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION;
 import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION_TYPE;
+import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
+import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
@@ -120,7 +122,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
         for (int i = 0; i < numDocs; i++) {
             Map<String, Object> doc = new HashMap<>();
             Map<String, Object> expectedDoc = new HashMap<>();
-            List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, ID, ROUTING, VERSION, VERSION_TYPE);
+            List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO, IF_PRIMARY_TERM);
             for(IngestDocument.Metadata field : fields) {
                 if (field == VERSION) {
                     Long value = randomLong();
@@ -132,6 +134,10 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
                     );
                     doc.put(field.getFieldName(), value);
                     expectedDoc.put(field.getFieldName(), value);
+                } else if (field == IF_SEQ_NO || field == IF_PRIMARY_TERM) {
+                    Long value = randomNonNegativeLong();
+                    doc.put(field.getFieldName(), value);
+                    expectedDoc.put(field.getFieldName(), value);
                 } else {
                     if (randomBoolean()) {
                         String value = randomAlphaOfLengthBetween(1, 10);
@@ -192,6 +198,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
             assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName())));
             assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName())));
             assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName())));
+            assertThat(metadataMap.get(IF_SEQ_NO), equalTo(expectedDocument.get(IF_SEQ_NO.getFieldName())));
+            assertThat(metadataMap.get(IF_PRIMARY_TERM), equalTo(expectedDocument.get(IF_PRIMARY_TERM.getFieldName())));
             assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
         }
 

+ 8 - 0
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -741,6 +741,8 @@ public class IngestServiceTests extends ESTestCase {
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final long newVersion = randomLong();
         final String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
+        final long ifSeqNo = randomNonNegativeLong();
+        final long ifPrimaryTerm = randomNonNegativeLong();
         doAnswer((InvocationOnMock invocationOnMock) -> {
             IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
             for (IngestDocument.Metadata metadata : IngestDocument.Metadata.values()) {
@@ -748,6 +750,10 @@ public class IngestServiceTests extends ESTestCase {
                     ingestDocument.setFieldValue(metadata.getFieldName(), newVersion);
                 } else if (metadata == IngestDocument.Metadata.VERSION_TYPE) {
                     ingestDocument.setFieldValue(metadata.getFieldName(), versionType);
+                } else if (metadata == IngestDocument.Metadata.IF_SEQ_NO) {
+                    ingestDocument.setFieldValue(metadata.getFieldName(), ifSeqNo);
+                } else if (metadata == IngestDocument.Metadata.IF_PRIMARY_TERM) {
+                    ingestDocument.setFieldValue(metadata.getFieldName(), ifPrimaryTerm);
                 } else {
                     ingestDocument.setFieldValue(metadata.getFieldName(), "update" + metadata.getFieldName());
                 }
@@ -773,6 +779,8 @@ public class IngestServiceTests extends ESTestCase {
         assertThat(indexRequest.routing(), equalTo("update_routing"));
         assertThat(indexRequest.version(), equalTo(newVersion));
         assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType)));
+        assertThat(indexRequest.ifSeqNo(), equalTo(ifSeqNo));
+        assertThat(indexRequest.ifPrimaryTerm(), equalTo(ifPrimaryTerm));
     }
 
     public void testExecuteFailure() throws Exception {