Browse Source

Add `copy_from` option to the Append processor (#132003)

Chris Berkhout 1 month ago
parent
commit
5f7a2ed38a

+ 5 - 0
docs/changelog/132003.yaml

@@ -0,0 +1,5 @@
+pr: 132003
+summary: Add `copy_from` option to the Append processor
+area: Ingest Node
+type: enhancement
+issues: []

+ 3 - 2
docs/reference/enrich-processor/append-processor.md

@@ -14,9 +14,10 @@ $$$append-options$$$
 | Name | Required | Default | Description |
 | --- | --- | --- | --- |
 | `field` | yes | - | The field to be appended to. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
-| `value` | yes | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). |
+| `value` | yes* | - | The value to be appended. Supports [template snippets](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). May specify only one of `value` or `copy_from`. |
+| `copy_from` {applies_to}`stack: ga 9.2.0` | no | - | The origin field which will be appended to `field`, cannot set `value` simultaneously. |
 | `allow_duplicates` | no | true | If `false`, the processor does not appendvalues already present in the field. |
-| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a[template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. |
+| `media_type` | no | `application/json` | The media type for encoding `value`. Applies only when `value` is a [template snippet](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#template-snippets). Must be one of `application/json`, `text/plain`, or`application/x-www-form-urlencoded`. |
 | `description` | no | - | Description of the processor. Useful for describing the purpose of the processor or its configuration. |
 | `if` | no | - | Conditionally execute the processor. See [Conditionally run a processor](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#conditionally-run-processor). |
 | `ignore_failure` | no | `false` | Ignore failures for the processor. See [Handling pipeline failures](docs-content://manage-data/ingest/transform-enrich/ingest-pipelines.md#handling-pipeline-failures). |

+ 41 - 11
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java

@@ -21,6 +21,8 @@ import org.elasticsearch.script.TemplateScript;
 
 import java.util.Map;
 
+import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
+
 /**
  * Processor that appends value or values to existing lists. If the field is not present a new list holding the
  * provided values will be added. If the field is a scalar it will be converted to a single item list and the provided
@@ -32,12 +34,21 @@ public final class AppendProcessor extends AbstractProcessor {
 
     private final TemplateScript.Factory field;
     private final ValueSource value;
+    private final String copyFrom;
     private final boolean allowDuplicates;
 
-    AppendProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, boolean allowDuplicates) {
+    AppendProcessor(
+        String tag,
+        String description,
+        TemplateScript.Factory field,
+        ValueSource value,
+        String copyFrom,
+        boolean allowDuplicates
+    ) {
         super(tag, description);
         this.field = field;
         this.value = value;
+        this.copyFrom = copyFrom;
         this.allowDuplicates = allowDuplicates;
     }
 
@@ -49,10 +60,19 @@ public final class AppendProcessor extends AbstractProcessor {
         return value;
     }
 
+    public String getCopyFrom() {
+        return copyFrom;
+    }
+
     @Override
     public IngestDocument execute(IngestDocument document) throws Exception {
         String path = document.renderTemplate(field);
-        document.appendFieldValue(path, value, allowDuplicates);
+        if (copyFrom != null) {
+            Object fieldValue = document.getFieldValue(copyFrom, Object.class);
+            document.appendFieldValue(path, IngestDocument.deepCopy(fieldValue), allowDuplicates);
+        } else {
+            document.appendFieldValue(path, value, allowDuplicates);
+        }
         return document;
     }
 
@@ -78,17 +98,27 @@ public final class AppendProcessor extends AbstractProcessor {
             ProjectId projectId
         ) throws Exception {
             String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
-            Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
+            String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");
+            String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
+            ValueSource valueSource = null;
+            if (copyFrom == null) {
+                Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
+                valueSource = ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType));
+            } else {
+                Object value = config.remove("value");
+                if (value != null) {
+                    throw newConfigurationException(
+                        TYPE,
+                        processorTag,
+                        "copy_from",
+                        "cannot set both `copy_from` and `value` in the same processor"
+                    );
+                }
+            }
             boolean allowDuplicates = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicates", true);
             TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
-            String mediaType = ConfigurationUtils.readMediaTypeProperty(TYPE, processorTag, config, "media_type", "application/json");
-            return new AppendProcessor(
-                processorTag,
-                description,
-                compiledTemplate,
-                ValueSource.wrap(value, scriptService, Map.of(Script.CONTENT_TYPE_OPTION, mediaType)),
-                allowDuplicates
-            );
+
+            return new AppendProcessor(processorTag, description, compiledTemplate, valueSource, copyFrom, allowDuplicates);
         }
     }
 }

+ 24 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorFactoryTests.java

@@ -123,4 +123,28 @@ public class AppendProcessorFactoryTests extends ESTestCase {
         );
         assertThat(e.getMessage(), containsString("property does not contain a supported media type [" + expectedMediaType + "]"));
     }
+
+    public void testCreateWithCopyFrom() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("copy_from", "field2");
+        String processorTag = randomAlphaOfLength(10);
+        AppendProcessor appendProcessor = factory.create(null, processorTag, null, config, null);
+        assertThat(appendProcessor.getTag(), equalTo(processorTag));
+        assertThat(appendProcessor.getField().newInstance(Map.of()).execute(), equalTo("field1"));
+        assertThat(appendProcessor.getCopyFrom(), equalTo("field2"));
+    }
+
+    public void testCreateWithCopyFromAndValue() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("copy_from", "field2");
+        config.put("value", "value1");
+        String processorTag = randomAlphaOfLength(10);
+        ElasticsearchException exception = expectThrows(
+            ElasticsearchException.class,
+            () -> factory.create(null, processorTag, null, config, null)
+        );
+        assertThat(exception.getMessage(), equalTo("[copy_from] cannot set both `copy_from` and `value` in the same processor"));
+    }
 }

+ 158 - 10
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java

@@ -20,6 +20,7 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -51,13 +52,13 @@ public class AppendProcessorTests extends ESTestCase {
         if (randomBoolean()) {
             Object value = scalar.randomValue();
             values.add(value);
-            appendProcessor = createAppendProcessor(field, value, true);
+            appendProcessor = createAppendProcessor(field, value, null, true);
         } else {
             int valuesSize = randomIntBetween(0, 10);
             for (int i = 0; i < valuesSize; i++) {
                 values.add(scalar.randomValue());
             }
-            appendProcessor = createAppendProcessor(field, values, true);
+            appendProcessor = createAppendProcessor(field, values, null, true);
         }
         appendProcessor.execute(ingestDocument);
         Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
@@ -80,13 +81,13 @@ public class AppendProcessorTests extends ESTestCase {
         if (randomBoolean()) {
             Object value = scalar.randomValue();
             values.add(value);
-            appendProcessor = createAppendProcessor(field, value, true);
+            appendProcessor = createAppendProcessor(field, value, null, true);
         } else {
             int valuesSize = randomIntBetween(0, 10);
             for (int i = 0; i < valuesSize; i++) {
                 values.add(scalar.randomValue());
             }
-            appendProcessor = createAppendProcessor(field, values, true);
+            appendProcessor = createAppendProcessor(field, values, null, true);
         }
         appendProcessor.execute(ingestDocument);
         List<?> list = ingestDocument.getFieldValue(field, List.class);
@@ -104,13 +105,13 @@ public class AppendProcessorTests extends ESTestCase {
         if (randomBoolean()) {
             Object value = scalar.randomValue();
             values.add(value);
-            appendProcessor = createAppendProcessor(field, value, true);
+            appendProcessor = createAppendProcessor(field, value, null, true);
         } else {
             int valuesSize = randomIntBetween(0, 10);
             for (int i = 0; i < valuesSize; i++) {
                 values.add(scalar.randomValue());
             }
-            appendProcessor = createAppendProcessor(field, values, true);
+            appendProcessor = createAppendProcessor(field, values, null, true);
         }
         appendProcessor.execute(ingestDocument);
         List<?> fieldValue = ingestDocument.getFieldValue(field, List.class);
@@ -128,7 +129,7 @@ public class AppendProcessorTests extends ESTestCase {
 
         List<Object> valuesToAppend = new ArrayList<>();
         valuesToAppend.add(originalValue);
-        Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
+        Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
         appendProcessor.execute(ingestDocument);
         Object fieldValue = ingestDocument.getFieldValue(field, Object.class);
         assertThat(fieldValue, not(instanceOf(List.class)));
@@ -143,7 +144,7 @@ public class AppendProcessorTests extends ESTestCase {
         List<Object> valuesToAppend = new ArrayList<>();
         String newValue = randomValueOtherThan(originalValue, () -> randomAlphaOfLengthBetween(1, 10));
         valuesToAppend.add(newValue);
-        Processor appendProcessor = createAppendProcessor(field, valuesToAppend, false);
+        Processor appendProcessor = createAppendProcessor(field, valuesToAppend, null, false);
         appendProcessor.execute(ingestDocument);
         List<?> list = ingestDocument.getFieldValue(field, List.class);
         assertThat(list.size(), equalTo(2));
@@ -172,19 +173,166 @@ public class AppendProcessorTests extends ESTestCase {
         Collections.sort(valuesToAppend);
 
         // attempt to append both new and existing values
-        Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, false);
+        Processor appendProcessor = createAppendProcessor(originalField, valuesToAppend, null, false);
         appendProcessor.execute(ingestDocument);
         List<?> fieldValue = ingestDocument.getFieldValue(originalField, List.class);
         assertThat(fieldValue, sameInstance(list));
         assertThat(fieldValue, containsInAnyOrder(expectedValues.toArray()));
     }
 
-    private static Processor createAppendProcessor(String fieldName, Object fieldValue, boolean allowDuplicates) {
+    public void testCopyFromOtherFieldSimple() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        ingestDocument.setFieldValue("foo", 1);
+        ingestDocument.setFieldValue("bar", 2);
+        ingestDocument.setFieldValue("baz", new ArrayList<>(List.of(3)));
+
+        createAppendProcessor("bar", null, "foo", false).execute(ingestDocument);
+        createAppendProcessor("baz", null, "bar", false).execute(ingestDocument);
+        createAppendProcessor("quux", null, "baz", false).execute(ingestDocument);
+
+        Map<String, Object> result = ingestDocument.getCtxMap().getSource();
+        assertThat(result.get("foo"), equalTo(1));
+        assertThat(result.get("bar"), equalTo(List.of(2, 1)));
+        assertThat(result.get("baz"), equalTo(List.of(3, 2, 1)));
+        assertThat(result.get("quux"), equalTo(List.of(3, 2, 1)));
+    }
+
+    public void testCopyFromOtherField() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+
+        // generate values, add some to a target field, the rest to a source field
+        int size = randomIntBetween(0, 10);
+        Set<String> allValues = Stream.generate(() -> randomAlphaOfLengthBetween(1, 10)).limit(size).collect(Collectors.toSet());
+        List<String> originalValues = randomSubsetOf(allValues);
+        List<String> additionalValues = new ArrayList<>(Sets.difference(new HashSet<>(allValues), new HashSet<>(originalValues)));
+        List<String> targetFieldValue = new ArrayList<>(originalValues);
+        String targetField = RandomDocumentPicks.addRandomField(random(), ingestDocument, targetFieldValue);
+        String sourceField = RandomDocumentPicks.addRandomField(random(), ingestDocument, additionalValues);
+
+        Processor appendProcessor = createAppendProcessor(targetField, null, sourceField, false);
+        appendProcessor.execute(ingestDocument);
+        List<?> fieldValue = ingestDocument.getFieldValue(targetField, List.class);
+        assertThat(fieldValue, sameInstance(targetFieldValue));
+        assertThat(fieldValue, containsInAnyOrder(allValues.toArray()));
+    }
+
+    public void testCopyFromCopiesNonPrimitiveMutableTypes() throws Exception {
+        final String sourceField = "sourceField";
+        final String targetField = "targetField";
+        Processor processor = createAppendProcessor(targetField, null, sourceField, false);
+
+        // map types
+        Map<String, Object> document = new HashMap<>();
+        Map<String, Object> sourceMap = new HashMap<>();
+        sourceMap.put("foo", "bar");
+        document.put(sourceField, sourceMap);
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        IngestDocument output = processor.execute(ingestDocument);
+        sourceMap.put("foo", "not-bar");
+        Map<?, ?> outputMap = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();
+        assertThat(outputMap.get("foo"), equalTo("bar"));
+
+        // set types
+        document = new HashMap<>();
+        Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
+        Set<String> preservedSet = new HashSet<>(sourceSet);
+        document.put(sourceField, sourceSet);
+        ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        processor.execute(ingestDocument);
+        sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
+        Set<?> outputSet = (Set<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
+        assertThat(outputSet, equalTo(preservedSet));
+
+        // list types (the outer list isn't used, but an inner list should be copied)
+        document = new HashMap<>();
+        List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
+        List<String> preservedList = new ArrayList<>(sourceList);
+        List<List<String>> wrappedSourceList = List.of(sourceList);
+        document.put(sourceField, wrappedSourceList);
+        ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        processor.execute(ingestDocument);
+        sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
+        List<?> unwrappedOutputList = (List<?>) ingestDocument.getFieldValue(targetField, List.class).getFirst();
+        assertThat(unwrappedOutputList, equalTo(preservedList));
+
+        // byte[] types
+        document = new HashMap<>();
+        byte[] sourceBytes = randomByteArrayOfLength(10);
+        byte[] preservedBytes = new byte[sourceBytes.length];
+        System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
+        document.put(sourceField, sourceBytes);
+        ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        processor.execute(ingestDocument);
+        sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
+        byte[] outputBytes = (byte[]) ingestDocument.getFieldValue(targetField, List.class).getFirst();
+        assertThat(outputBytes, equalTo(preservedBytes));
+
+        // Date types
+        document = new HashMap<>();
+        Date sourceDate = new Date();
+        Date preservedDate = new Date(sourceDate.getTime());
+        document.put(sourceField, sourceDate);
+        ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        processor.execute(ingestDocument);
+        sourceDate.setTime(sourceDate.getTime() + 1);
+        Date outputDate = (Date) ingestDocument.getFieldValue(targetField, List.class).getFirst();
+        assertThat(outputDate, equalTo(preservedDate));
+    }
+
+    public void testCopyFromDeepCopiesNonPrimitiveMutableTypes() throws Exception {
+        final String sourceField = "sourceField";
+        final String targetField = "targetField";
+        Processor processor = createAppendProcessor(targetField, null, sourceField, false);
+        Map<String, Object> document = new HashMap<>();
+
+        // a root map with values of map, set, list, bytes, date
+        Map<String, Object> sourceMap = new HashMap<>();
+        sourceMap.put("foo", "bar");
+        Set<String> sourceSet = randomUnique(() -> randomAlphaOfLength(5), 5);
+        List<String> sourceList = randomList(1, 5, () -> randomAlphaOfLength(5));
+        byte[] sourceBytes = randomByteArrayOfLength(10);
+        Date sourceDate = new Date();
+        Map<String, Object> root = new HashMap<>();
+        root.put("foo", "bar");
+        root.put("map", sourceMap);
+        root.put("set", sourceSet);
+        root.put("list", sourceList);
+        root.put("bytes", sourceBytes);
+        root.put("date", sourceDate);
+
+        Set<String> preservedSet = new HashSet<>(sourceSet);
+        List<String> preservedList = new ArrayList<>(sourceList);
+        byte[] preservedBytes = new byte[sourceBytes.length];
+        System.arraycopy(sourceBytes, 0, preservedBytes, 0, sourceBytes.length);
+        Date preservedDate = new Date(sourceDate.getTime());
+
+        document.put(sourceField, root);
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        IngestDocument output = processor.execute(ingestDocument);
+        Map<?, ?> outputRoot = (Map<?, ?>) output.getFieldValue(targetField, List.class).getFirst();
+
+        root.put("foo", "not-bar");
+        sourceMap.put("foo", "not-bar");
+        sourceSet.add(randomValueOtherThanMany(sourceSet::contains, () -> randomAlphaOfLength(5)));
+        sourceList.add(randomValueOtherThanMany(sourceList::contains, () -> randomAlphaOfLength(5)));
+        sourceBytes[0] = sourceBytes[0] == 0 ? (byte) 1 : (byte) 0;
+        sourceDate.setTime(sourceDate.getTime() + 1);
+
+        assertThat(outputRoot.get("foo"), equalTo("bar"));
+        assertThat(((Map<?, ?>) outputRoot.get("map")).get("foo"), equalTo("bar"));
+        assertThat(((Set<?>) outputRoot.get("set")), equalTo(preservedSet));
+        assertThat(((List<?>) outputRoot.get("list")), equalTo(preservedList));
+        assertThat(((byte[]) outputRoot.get("bytes")), equalTo(preservedBytes));
+        assertThat(((Date) outputRoot.get("date")), equalTo(preservedDate));
+    }
+
+    private static Processor createAppendProcessor(String fieldName, Object fieldValue, String copyFrom, boolean allowDuplicates) {
         return new AppendProcessor(
             randomAlphaOfLength(10),
             null,
             new TestTemplateService.MockTemplateScript.Factory(fieldName),
             ValueSource.wrap(fieldValue, TestTemplateService.instance()),
+            copyFrom,
             allowDuplicates
         );
     }

+ 1 - 1
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

@@ -207,7 +207,7 @@ public class ForEachProcessorTests extends ESTestCase {
             new CompoundProcessor(
                 false,
                 List.of(new UppercaseProcessor("_tag_upper", null, "_ingest._value", false, "_ingest._value")),
-                List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), true))
+                List.of(new AppendProcessor("_tag", null, template, (model) -> (List.of("added")), null, true))
             ),
             false
         );

+ 157 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/350_append_copy_from.yml

@@ -0,0 +1,157 @@
+---
+setup:
+  - requires:
+      cluster_features: [ "ingest.append.copy_from" ]
+      reason: "The copy_from option of the append processor is new"
+  - do:
+      ingest.put_pipeline:
+        id: "test-pipeline-1"
+        body: >
+          {
+            "processors": [
+              {
+                "append": {
+                  "field": "dest",
+                  "copy_from": "src"
+                }
+              }
+            ]
+          }
+  - do:
+      indices.create:
+        index: "test-some-index"
+
+---
+teardown:
+  - do:
+      indices.delete:
+        index: "test-some-index"
+        ignore_unavailable: true
+  - do:
+      ingest.delete_pipeline:
+        id: "test-pipeline-1"
+        ignore: 404
+
+---
+"It is not permitted to have both copy_from and value":
+
+  - do:
+      ingest.put_pipeline:
+        id: "test-pipeline-fail"
+        body: >
+          {
+            "processors": [
+              {
+                "append": {
+                  "tag": "some_tag",
+                  "field": "dest",
+                  "copy_from": "src",
+                  "value": "uh_oh"
+                }
+              }
+            ]
+          }
+      catch: bad_request
+  - match: { status: 400 }
+  - match: { error.reason: "[copy_from] cannot set both `copy_from` and `value` in the same processor" }
+  - match: { error.property_name: "copy_from" }
+  - match: { error.processor_type: "append" }
+  - match: { error.processor_tag: "some_tag" }
+
+---
+"Simple value into an absent value":
+  - do:
+      index:
+        index: test-some-index
+        id: 1
+        pipeline: test-pipeline-1
+        body: >
+          {
+            "src": 1
+          }
+
+  - do:
+      get:
+        index: test-some-index
+        id: "1"
+  - match: { _source.src: 1 }
+  - match: { _source.dest: [1] }
+
+---
+"Simple value into a present simple value":
+  - do:
+      index:
+        index: test-some-index
+        id: 1
+        pipeline: test-pipeline-1
+        body: >
+          {
+            "src": 2,
+            "dest": 1
+          }
+
+  - do:
+      get:
+        index: test-some-index
+        id: "1"
+  - match: { _source.src: 2 }
+  - match: { _source.dest: [1, 2] }
+
+---
+"Simple value into a present array":
+  - do:
+      index:
+        index: test-some-index
+        id: 1
+        pipeline: test-pipeline-1
+        body: >
+          {
+            "src": 3,
+            "dest": [1, 2]
+          }
+
+  - do:
+      get:
+        index: test-some-index
+        id: "1"
+  - match: { _source.src: 3 }
+  - match: { _source.dest: [1, 2, 3] }
+
+---
+"Array into an absent value":
+  - do:
+      index:
+        index: test-some-index
+        id: 1
+        pipeline: test-pipeline-1
+        body: >
+          {
+            "src": [1, 2]
+          }
+
+  - do:
+      get:
+        index: test-some-index
+        id: "1"
+  - match: { _source.src: [1, 2]  }
+  - match: { _source.dest: [1, 2] }
+
+---
+"Array into a present array":
+  - do:
+      index:
+        index: test-some-index
+        id: 1
+        pipeline: test-pipeline-1
+        body: >
+          {
+            "src": [2],
+            "dest": [1]
+          }
+
+  - do:
+      get:
+        index: test-some-index
+        id: "1"
+  - match: { _source.src: [2]  }
+  - match: { _source.dest: [1, 2] }

+ 2 - 1
server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java

@@ -17,6 +17,7 @@ import java.util.Set;
 
 public class IngestFeatures implements FeatureSpecification {
     private static final NodeFeature SIMULATE_INGEST_400_ON_FAILURE = new NodeFeature("simulate.ingest.400_on_failure", true);
+    private static final NodeFeature INGEST_APPEND_COPY_FROM = new NodeFeature("ingest.append.copy_from", true);
 
     @Override
     public Set<NodeFeature> getFeatures() {
@@ -29,6 +30,6 @@ public class IngestFeatures implements FeatureSpecification {
 
     @Override
     public Set<NodeFeature> getTestFeatures() {
-        return Set.of(SIMULATE_INGEST_400_ON_FAILURE);
+        return Set.of(SIMULATE_INGEST_400_ON_FAILURE, INGEST_APPEND_COPY_FROM);
     }
 }