Browse Source

Add template snippets support for KV ingest processor (#73758)

bellengao 4 years ago
parent
commit
42c0be4c67

+ 2 - 2
docs/reference/ingest/processors/kv.asciidoc

@@ -27,10 +27,10 @@ TIP: Using the KV Processor can result in field names that you cannot control. C
 [options="header"]
 |======
 | Name             | Required  | Default  | Description
-| `field`          | yes       | -        | The field to be parsed
+| `field`          | yes       | -        | The field to be parsed. Supports <<template-snippets,template snippets>>.
 | `field_split`    | yes       | -        | Regex pattern to use for splitting key-value pairs
 | `value_split`    | yes       | -        | Regex pattern to use for splitting the key from the value within a key-value pair
-| `target_field`   | no        | `null`   | The field to insert the extracted keys into. Defaults to the root of the document
+| `target_field`   | no        | `null`   | The field to insert the extracted keys into. Defaults to the root of the document. Supports <<template-snippets,template snippets>>.
 | `include_keys`   | no        | `null`   | List of keys to filter and insert into document. Defaults to including all keys
 | `exclude_keys`   | no        | `null`   | List of keys to exclude from document
 | `ignore_missing` | no        | `false`  | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document

+ 1 - 1
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -70,7 +70,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
                 entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
                 entry(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()),
                 entry(JsonProcessor.TYPE, new JsonProcessor.Factory()),
-                entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()),
+                entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)),
                 entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()),
                 entry(BytesProcessor.TYPE, new BytesProcessor.Factory()),
                 entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)),

+ 56 - 26
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

@@ -13,6 +13,8 @@ import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.TemplateScript;
 
 import java.util.Collections;
 import java.util.List;
@@ -32,17 +34,17 @@ public final class KeyValueProcessor extends AbstractProcessor {
 
     private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");
 
-    private final String field;
+    private final TemplateScript.Factory field;
     private final String fieldSplit;
     private final String valueSplit;
     private final Set<String> includeKeys;
     private final Set<String> excludeKeys;
-    private final String targetField;
+    private final TemplateScript.Factory targetField;
     private final boolean ignoreMissing;
     private final Consumer<IngestDocument> execution;
 
-    KeyValueProcessor(String tag, String description, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
-                      Set<String> excludeKeys, String targetField, boolean ignoreMissing,
+    KeyValueProcessor(String tag, String description, TemplateScript.Factory field, String fieldSplit, String valueSplit,
+                      Set<String> includeKeys, Set<String> excludeKeys, TemplateScript.Factory targetField, boolean ignoreMissing,
                       String trimKey, String trimValue, boolean stripBrackets, String prefix) {
         super(tag, description);
         this.field = field;
@@ -58,9 +60,9 @@ public final class KeyValueProcessor extends AbstractProcessor {
         );
     }
 
-    private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
+    private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, TemplateScript.Factory field,
                                                            Set<String> includeKeys, Set<String> excludeKeys,
-                                                           String targetField, boolean ignoreMissing,
+                                                           TemplateScript.Factory targetField, boolean ignoreMissing,
                                                            String trimKey, String trimValue, boolean stripBrackets,
                                                            String prefix) {
         final Predicate<String> keyFilter;
@@ -77,19 +79,7 @@ public final class KeyValueProcessor extends AbstractProcessor {
                 keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
             }
         }
-        final String fieldPathPrefix;
-        String keyPrefix = prefix == null ? "" : prefix;
-        if (targetField == null) {
-            fieldPathPrefix = keyPrefix;
-        } else {
-            fieldPathPrefix = targetField + "." + keyPrefix;
-        }
-        final Function<String, String> keyPrefixer;
-        if (fieldPathPrefix.isEmpty()) {
-            keyPrefixer = val -> val;
-        } else {
-            keyPrefixer = val -> fieldPathPrefix + val;
-        }
+
         final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
         Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
         final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
@@ -101,17 +91,43 @@ public final class KeyValueProcessor extends AbstractProcessor {
         }
         final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
         return document -> {
-            String value = document.getFieldValue(field, String.class, ignoreMissing);
+            String target = "";
+            if (targetField != null) {
+                target = document.renderTemplate(targetField);
+            }
+
+            final String fieldPathPrefix;
+            String keyPrefix = prefix == null ? "" : prefix;
+            if (target.isEmpty()) {
+                fieldPathPrefix = keyPrefix;
+            } else {
+                fieldPathPrefix = target + "." + keyPrefix;
+            }
+            final Function<String, String> keyPrefixer;
+            if (fieldPathPrefix.isEmpty()) {
+                keyPrefixer = val -> val;
+            } else {
+                keyPrefixer = val -> fieldPathPrefix + val;
+            }
+            String path = document.renderTemplate(field);
+            if (path.isEmpty() || document.hasField(path, true) == false) {
+                if (ignoreMissing) {
+                    return;
+                } else {
+                    throw new IllegalArgumentException("field [" + path + "] doesn't exist");
+                }
+            }
+            String value = document.getFieldValue(path, String.class, ignoreMissing);
             if (value == null) {
                 if (ignoreMissing) {
                     return;
                 }
-                throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
+                throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs.");
             }
             for (String part : fieldSplitter.apply(value)) {
                 String[] kv = valueSplitter.apply(part);
                 if (kv.length != 2) {
-                    throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
+                    throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]");
                 }
                 String key = keyTrimmer.apply(kv[0]);
                 if (keyFilter.test(key)) {
@@ -140,7 +156,7 @@ public final class KeyValueProcessor extends AbstractProcessor {
         }
     }
 
-    String getField() {
+    TemplateScript.Factory getField() {
         return field;
     }
 
@@ -160,7 +176,7 @@ public final class KeyValueProcessor extends AbstractProcessor {
         return excludeKeys;
     }
 
-    String getTargetField() {
+    TemplateScript.Factory getTargetField() {
         return targetField;
     }
 
@@ -188,11 +204,25 @@ public final class KeyValueProcessor extends AbstractProcessor {
     }
 
     public static class Factory implements Processor.Factory {
+        private final ScriptService scriptService;
+
+        public Factory(ScriptService scriptService) {
+            this.scriptService = scriptService;
+        }
+
         @Override
         public KeyValueProcessor create(Map<String, Processor.Factory> registry, String processorTag,
                                         String description, Map<String, Object> config) throws Exception {
             String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
+            TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
+                "field", field, scriptService);
             String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
+            TemplateScript.Factory targetFieldTemplate = null;
+            if (targetField != null) {
+                targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
+                    "target_field", targetField, scriptService);
+            }
+
             String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
             String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
             String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
@@ -212,8 +242,8 @@ public final class KeyValueProcessor extends AbstractProcessor {
             }
             boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
             return new KeyValueProcessor(
-                processorTag, description, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
-                trimKey, trimValue, stripBrackets, prefix
+                processorTag, description, fieldTemplate, fieldSplit, valueSplit, includeKeys, excludeKeys, targetFieldTemplate,
+                ignoreMissing, trimKey, trimValue, stripBrackets, prefix
             );
         }
     }

+ 12 - 8
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorFactoryTests.java

@@ -11,7 +11,9 @@ package org.elasticsearch.ingest.common;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.ingest.TestTemplateService;
 import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -24,8 +26,14 @@ import static org.hamcrest.Matchers.nullValue;
 
 public class KeyValueProcessorFactoryTests extends ESTestCase {
 
+    private KeyValueProcessor.Factory factory;
+
+    @Before
+    public void init() {
+        factory = new KeyValueProcessor.Factory(TestTemplateService.instance());
+    }
+
     public void testCreateWithDefaults() throws Exception {
-        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
         Map<String, Object> config = new HashMap<>();
         config.put("field", "field1");
         config.put("field_split", "&");
@@ -33,7 +41,7 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
         String processorTag = randomAlphaOfLength(10);
         KeyValueProcessor processor = factory.create(null, processorTag, null, config);
         assertThat(processor.getTag(), equalTo(processorTag));
-        assertThat(processor.getField(), equalTo("field1"));
+        assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
         assertThat(processor.getFieldSplit(), equalTo("&"));
         assertThat(processor.getValueSplit(), equalTo("="));
         assertThat(processor.getIncludeKeys(), is(nullValue()));
@@ -42,7 +50,6 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithAllFieldsSet() throws Exception {
-        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
         Map<String, Object> config = new HashMap<>();
         config.put("field", "field1");
         config.put("field_split", "&");
@@ -54,17 +61,16 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
         String processorTag = randomAlphaOfLength(10);
         KeyValueProcessor processor = factory.create(null, processorTag, null, config);
         assertThat(processor.getTag(), equalTo(processorTag));
-        assertThat(processor.getField(), equalTo("field1"));
+        assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
         assertThat(processor.getFieldSplit(), equalTo("&"));
         assertThat(processor.getValueSplit(), equalTo("="));
         assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
         assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
-        assertThat(processor.getTargetField(), equalTo("target"));
+        assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
         assertTrue(processor.isIgnoreMissing());
     }
 
     public void testCreateWithMissingField() {
-        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
         Map<String, Object> config = new HashMap<>();
         String processorTag = randomAlphaOfLength(10);
         ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
@@ -73,7 +79,6 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithMissingFieldSplit() {
-        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
         Map<String, Object> config = new HashMap<>();
         config.put("field", "field1");
         String processorTag = randomAlphaOfLength(10);
@@ -83,7 +88,6 @@ public class KeyValueProcessorFactoryTests extends ESTestCase {
     }
 
     public void testCreateWithMissingValueSplit() {
-        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
         Map<String, Object> config = new HashMap<>();
         config.put("field", "field1");
         config.put("field_split", "&");

+ 3 - 2
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.RandomDocumentPicks;
+import org.elasticsearch.ingest.TestTemplateService;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.ArrayList;
@@ -27,7 +28,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class KeyValueProcessorTests extends ESTestCase {
 
-    private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
+    private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance());
 
     public void test() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
@@ -92,7 +93,7 @@ public class KeyValueProcessorTests extends ESTestCase {
         Processor processor = createKvProcessor("unknown", "&",
             "=", null, null, "target", false);
         IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
-        assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
+        assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist"));
     }
 
     public void testNullValueWithIgnoreMissing() throws Exception {

+ 55 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml

@@ -39,3 +39,58 @@ teardown:
         id: 1
   - match: { _source.goodbye: "everybody" }
   - match: { _source.hello: "world" }
+
+---
+"Test KV Processor with template snippets":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body:  >
+          {
+            "processors": [
+              {
+                "kv" : {
+                  "field" : "{{origin}}",
+                  "target_field" : "{{target}}",
+                  "field_split": " ",
+                  "value_split": "="
+                }
+              },
+              {
+                "kv" : {
+                  "field" : "{{origin}}",
+                  "field_split": " ",
+                  "value_split": "="
+                }
+              },
+              {
+                "kv" : {
+                  "field" : "{{origin1}}",
+                  "field_split": " ",
+                  "value_split": "=",
+                  "ignore_missing": true
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        id: 1
+        pipeline: "1"
+        body: {
+          origin: "field1",
+          field1: "goodbye=everybody hello=world",
+          target: "bar"
+        }
+
+  - do:
+      get:
+        index: test
+        id: 1
+  - match: { _source.bar.goodbye: "everybody" }
+  - match: { _source.bar.hello: "world" }
+  - match: { _source.goodbye: "everybody" }
+  - match: { _source.hello: "world" }