Browse Source

Json processor: add_to_root_conflict_strategy option (#74967)

Felix Barnsteiner 4 years ago
parent
commit
404e9865b6

+ 6 - 6
docs/reference/ingest/processors/json.asciidoc

@@ -10,12 +10,12 @@ Converts a JSON string into a structured JSON object.
 .Json Options
 [options="header"]
 |======
-| Name                         | Required  | Default  | Description
-| `field`                      | yes       | -        | The field to be parsed.
-| `target_field`               | no        | `field`  | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
-| `add_to_root`                | no        | false    | Flag that forces the serialized json to be injected into the top level of the document. `target_field` must not be set when this option is chosen.
-| `allow_duplicate_keys`       | no        | false    | When set to `true`, the JSON parser will not fail if the JSON contains duplicate keys.
-                                                        Instead, the latest value wins. Allowing duplicate keys also improves execution time.
+| Name                            | Required  | Default   | Description
+| `field`                         | yes       | -         | The field to be parsed.
+| `target_field`                  | no        | `field`   | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
+| `add_to_root`                   | no        | false     | Flag that forces the parsed JSON to be added at the top level of the document. `target_field` must not be set when this option is chosen.
+| `add_to_root_conflict_strategy` | no        | `replace` | When set to `replace`, root fields that conflict with fields from the parsed JSON will be overridden. When set to `merge`, conflicting fields will be merged. Only applicable if `add_to_root` is set to `true`.
+| `allow_duplicate_keys`          | no        | false     | When set to `true`, the JSON parser will not fail if the JSON contains duplicate keys. Instead, the last encountered value for any duplicate key wins.
 include::common-options.asciidoc[]
 |======
 

+ 71 - 6
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java

@@ -21,6 +21,7 @@ import org.elasticsearch.ingest.Processor;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -36,13 +37,16 @@ public final class JsonProcessor extends AbstractProcessor {
     private final String field;
     private final String targetField;
     private final boolean addToRoot;
+    private final ConflictStrategy addToRootConflictStrategy;
     private final boolean allowDuplicateKeys;
 
-    JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot, boolean allowDuplicateKeys) {
+    JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot,
+                  ConflictStrategy addToRootConflictStrategy, boolean allowDuplicateKeys) {
         super(tag, description);
         this.field = field;
         this.targetField = targetField;
         this.addToRoot = addToRoot;
+        this.addToRootConflictStrategy = addToRootConflictStrategy;
         this.allowDuplicateKeys = allowDuplicateKeys;
     }
 
@@ -58,6 +62,10 @@ public final class JsonProcessor extends AbstractProcessor {
         return addToRoot;
     }
 
+    public ConflictStrategy getAddToRootConflictStrategy() {
+        return addToRootConflictStrategy;
+    }
+
     public static Object apply(Object fieldValue, boolean allowDuplicateKeys) {
         BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
         try (InputStream stream = bytesRef.streamInput();
@@ -87,21 +95,45 @@ public final class JsonProcessor extends AbstractProcessor {
         }
     }
 
-    public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys) {
+    public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys, ConflictStrategy conflictStrategy) {
         Object value = apply(ctx.get(fieldName), allowDuplicateKeys);
         if (value instanceof Map) {
             @SuppressWarnings("unchecked")
-                Map<String, Object> map = (Map<String, Object>) value;
-            ctx.putAll(map);
+            Map<String, Object> map = (Map<String, Object>) value;
+            if (conflictStrategy == ConflictStrategy.MERGE) {
+                recursiveMerge(ctx, map);
+            } else {
+                ctx.putAll(map);
+            }
         } else {
             throw new IllegalArgumentException("cannot add non-map fields to root of document");
         }
     }
 
+    public static void recursiveMerge(Map<String, Object> target, Map<String, Object> from) {
+        for (String key : from.keySet()) {
+            if (target.containsKey(key)) {
+                Object targetValue = target.get(key);
+                Object fromValue = from.get(key);
+                if (targetValue instanceof Map && fromValue instanceof Map) {
+                    @SuppressWarnings("unchecked")
+                    Map<String, Object> targetMap = (Map<String, Object>) targetValue;
+                    @SuppressWarnings("unchecked")
+                    Map<String, Object> fromMap = (Map<String, Object>) fromValue;
+                    recursiveMerge(targetMap, fromMap);
+                } else {
+                    target.put(key, fromValue);
+                }
+            } else {
+                target.put(key, from.get(key));
+            }
+        }
+    }
+
     @Override
     public IngestDocument execute(IngestDocument document) throws Exception {
         if (addToRoot) {
-            apply(document.getSourceAndMetadata(), field, allowDuplicateKeys);
+            apply(document.getSourceAndMetadata(), field, allowDuplicateKeys, addToRootConflictStrategy);
         } else {
             document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class), allowDuplicateKeys));
         }
@@ -113,7 +145,22 @@ public final class JsonProcessor extends AbstractProcessor {
         return TYPE;
     }
 
+    public enum ConflictStrategy {
+        REPLACE,
+        MERGE;
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+
+        public static ConflictStrategy fromString(String conflictStrategy) {
+            return ConflictStrategy.valueOf(conflictStrategy.toUpperCase(Locale.ROOT));
+        }
+    }
+
     public static final class Factory implements Processor.Factory {
+
         @Override
         public JsonProcessor create(Map<String, Processor.Factory> registry, String processorTag,
                                     String description, Map<String, Object> config) throws Exception {
@@ -121,17 +168,35 @@ public final class JsonProcessor extends AbstractProcessor {
             String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
             boolean addToRoot = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "add_to_root", false);
             boolean allowDuplicateKeys = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicate_keys", false);
+            String conflictStrategyString = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config,
+                "add_to_root_conflict_strategy");
+            boolean hasConflictStrategy = conflictStrategyString != null;
+            if (conflictStrategyString == null) {
+                conflictStrategyString = ConflictStrategy.REPLACE.name();
+            }
+            ConflictStrategy addToRootConflictStrategy;
+            try {
+                addToRootConflictStrategy = ConflictStrategy.fromString(conflictStrategyString);
+            } catch (IllegalArgumentException e) {
+                throw newConfigurationException(TYPE, processorTag, "add_to_root_conflict_strategy", "conflict strategy [" +
+                    conflictStrategyString + "] not supported, cannot convert field.");
+            }
 
             if (addToRoot && targetField != null) {
                 throw newConfigurationException(TYPE, processorTag, "target_field",
                     "Cannot set a target field while also setting `add_to_root` to true");
             }
+            if (addToRoot == false && hasConflictStrategy) {
+                throw newConfigurationException(TYPE, processorTag, "add_to_root_conflict_strategy",
+                    "Cannot set `add_to_root_conflict_strategy` if `add_to_root` is false");
+            }
 
             if (targetField == null) {
                 targetField = field;
             }
 
-            return new JsonProcessor(processorTag, description, field, targetField, addToRoot, allowDuplicateKeys);
+            return new JsonProcessor(processorTag, description, field, targetField, addToRoot, addToRootConflictStrategy,
+                allowDuplicateKeys);
         }
     }
 }

+ 1 - 1
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java

@@ -72,7 +72,7 @@ public final class Processors {
      *             contains the JSON string
      */
     public static void json(Map<String, Object> map, String field) {
-        JsonProcessor.apply(map, field, false);
+        JsonProcessor.apply(map, field, false, JsonProcessor.ConflictStrategy.REPLACE);
     }
 
     /**

+ 38 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/JsonProcessorFactoryTests.java

@@ -77,4 +77,42 @@ public class JsonProcessorFactoryTests extends ESTestCase {
             () -> FACTORY.create(null, randomAlphaOfLength(10), null, config));
         assertThat(exception.getMessage(), equalTo("[target_field] Cannot set a target field while also setting `add_to_root` to true"));
     }
+
+    public void testReplaceMergeStrategy() throws Exception {
+        JsonProcessor jsonProcessor = getJsonProcessorWithMergeStrategy(null, true);
+        assertThat(jsonProcessor.getAddToRootConflictStrategy(), equalTo(JsonProcessor.ConflictStrategy.REPLACE));
+
+        jsonProcessor = getJsonProcessorWithMergeStrategy("replace", true);
+        assertThat(jsonProcessor.getAddToRootConflictStrategy(), equalTo(JsonProcessor.ConflictStrategy.REPLACE));
+    }
+
+    public void testRecursiveMergeStrategy() throws Exception {
+        JsonProcessor jsonProcessor = getJsonProcessorWithMergeStrategy("merge", true);
+        assertThat(jsonProcessor.getAddToRootConflictStrategy(), equalTo(JsonProcessor.ConflictStrategy.MERGE));
+    }
+
+    public void testMergeStrategyWithoutAddToRoot() throws Exception {
+        ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
+            () -> getJsonProcessorWithMergeStrategy("replace", false));
+        assertThat(exception.getMessage(),
+            equalTo("[add_to_root_conflict_strategy] Cannot set `add_to_root_conflict_strategy` if `add_to_root` is false"));
+    }
+
+    public void testUnknownMergeStrategy() throws Exception {
+        ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
+            () -> getJsonProcessorWithMergeStrategy("foo", true));
+        assertThat(exception.getMessage(),
+            equalTo("[add_to_root_conflict_strategy] conflict strategy [foo] not supported, cannot convert field."));
+    }
+
+    private JsonProcessor getJsonProcessorWithMergeStrategy(String mergeStrategy, boolean addToRoot) throws Exception {
+        String randomField = randomAlphaOfLength(10);
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", randomField);
+        config.put("add_to_root", addToRoot);
+        if (mergeStrategy != null) {
+            config.put("add_to_root_conflict_strategy", mergeStrategy);
+        }
+        return FACTORY.create(null, randomAlphaOfLength(10), null, config);
+    }
 }

+ 54 - 14
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/JsonProcessorTests.java

@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.ingest.common.JsonProcessor.ConflictStrategy.MERGE;
+import static org.elasticsearch.ingest.common.JsonProcessor.ConflictStrategy.REPLACE;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -32,7 +34,7 @@ public class JsonProcessorTests extends ESTestCase {
         String processorTag = randomAlphaOfLength(3);
         String randomField = randomAlphaOfLength(3);
         String randomTargetField = randomAlphaOfLength(2);
-        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
 
         Map<String, Object> randomJsonMap = RandomDocumentPicks.randomSource(random());
@@ -47,7 +49,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testInvalidValue() {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         document.put("field", "blah blah");
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
@@ -58,7 +60,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testByteArray() {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         document.put("field", new byte[] { 0, 1 });
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
@@ -73,7 +75,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testNull() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         document.put("field", null);
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
@@ -82,7 +84,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testBoolean() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         boolean value = true;
         document.put("field", value);
@@ -92,7 +94,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testInteger() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         int value = 3;
         document.put("field", value);
@@ -102,7 +104,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testDouble() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         double value = 3.0;
         document.put("field", value);
@@ -112,7 +114,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testString() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         String value = "hello world";
         document.put("field", "\"" + value + "\"");
@@ -122,7 +124,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testArray() throws Exception {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         List<Boolean> value = Arrays.asList(true, true, false);
         document.put("field", value.toString());
@@ -132,7 +134,7 @@ public class JsonProcessorTests extends ESTestCase {
     }
 
     public void testFieldMissing() {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
 
@@ -143,7 +145,7 @@ public class JsonProcessorTests extends ESTestCase {
     public void testAddToRoot() throws Exception {
         String processorTag = randomAlphaOfLength(3);
         String randomTargetField = randomAlphaOfLength(2);
-        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, false);
+        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
 
         String json = "{\"a\": 1, \"b\": 2}";
@@ -161,7 +163,7 @@ public class JsonProcessorTests extends ESTestCase {
 
     public void testDuplicateKeys() throws Exception {
         String processorTag = randomAlphaOfLength(3);
-        JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, true);
+        JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, REPLACE, true);
 
         Map<String, Object> document = new HashMap<>();
         String json = "{\"a\": 1, \"a\": 2}";
@@ -175,14 +177,52 @@ public class JsonProcessorTests extends ESTestCase {
         assertEquals(2, sourceAndMetadata.get("a"));
         assertEquals("see", sourceAndMetadata.get("c"));
 
-        JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, false);
+        JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, REPLACE, false);
         Exception exception = expectThrows(IllegalArgumentException.class, () ->
             strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document)));
         assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
     }
 
+    public void testAddToRootRecursiveMerge() throws Exception {
+        String processorTag = randomAlphaOfLength(3);
+        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "json", null, true, MERGE, false);
+
+        Map<String, Object> document = new HashMap<>();
+        String json = "{\"foo\": {\"bar\": \"baz\"}}";
+        document.put("json", json);
+        Map<String, Object> inner = new HashMap<>();
+        inner.put("bar", "override_me");
+        inner.put("qux", "quux");
+        document.put("foo", inner);
+
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        jsonProcessor.execute(ingestDocument);
+
+        assertEquals("baz", ingestDocument.getFieldValue("foo.bar", String.class));
+        assertEquals("quux", ingestDocument.getFieldValue("foo.qux", String.class));
+    }
+
+    public void testAddToRootNonRecursiveMerge() throws Exception {
+        String processorTag = randomAlphaOfLength(3);
+        JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "json", null, true, REPLACE, false);
+
+        Map<String, Object> document = new HashMap<>();
+        String json = "{\"foo\": {\"bar\": \"baz\"}}";
+        document.put("json", json);
+        Map<String, Object> inner = new HashMap<>();
+        inner.put("bar", "override_me");
+        inner.put("qux", "quux");
+        document.put("foo", inner);
+
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
+        jsonProcessor.execute(ingestDocument);
+
+        assertEquals("baz", ingestDocument.getFieldValue("foo.bar", String.class));
+        assertFalse(ingestDocument.hasField("foo.qux"));
+    }
+
     public void testAddBoolToRoot() {
-        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, false);
+        JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, REPLACE, false);
         Map<String, Object> document = new HashMap<>();
         document.put("field", true);
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);

+ 42 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/140_json.yml

@@ -8,6 +8,10 @@ teardown:
       ingest.delete_pipeline:
         id: "2"
         ignore: 404
+  - do:
+      ingest.delete_pipeline:
+        id: "3"
+        ignore: 404
 
 ---
 "Test JSON Processor":
@@ -108,3 +112,41 @@ teardown:
         index: test
         id: 2
   - match: { _source.dupe: 2 }
+
+---
+"Test JSON Processor recursive merge strategy":
+  - do:
+      ingest.put_pipeline:
+        id: "3"
+        body: {
+            "processors": [
+              {
+                "json" : {
+                  "field" : "json",
+                  "add_to_root": true,
+                  "add_to_root_conflict_strategy": "merge"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: 3
+        pipeline: "3"
+        body: {
+          json: "{\"foo\": {\"bar\": \"baz\"} }",
+          foo: {
+            bar: "override_me",
+            qux: "quux"
+          }
+        }
+
+  - do:
+      get:
+        index: test
+        id: 3
+  - match: { _source.foo.bar: "baz" }
+  - match: { _source.foo.qux: "quux" }

+ 1 - 1
server/src/test/java/org/elasticsearch/common/xcontent/BaseXContentTestCase.java

@@ -1157,7 +1157,7 @@ public abstract class BaseXContentTestCase extends ESTestCase {
                 .endObject();
         try (XContentParser xParser = createParser(builder)) {
             xParser.allowDuplicateKeys(true);
-            assertThat(xParser.map(), equalTo(Map.of("key", 2)));
+            assertThat(xParser.map(), equalTo(Collections.singletonMap("key", 2)));
         }
     }