Browse Source

INGEST: Extend KV Processor (#31789) (#32232)

* INGEST: Extend KV Processor (#31789)

Added more capabilities supported by LS to the KV processor:
* Stripping of brackets and quotes from values (`include_brackets` in corresponding LS filter)
* Adding key prefixes
* Trimming specified chars from keys and values

Refactored the way the filter is configured to avoid conditionals during execution.
Refactored Tests a little to not have to add more redundant getters for new parameters.

Relates #31786
* Add documentation
Armin Braun 7 years ago
parent
commit
7aa8a0a927

+ 4 - 0
docs/reference/ingest/ingest-node.asciidoc

@@ -1732,6 +1732,10 @@ For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`
 | `include_keys`   | no        | `null`   | List of keys to filter and insert into document. Defaults to including all keys
 | `exclude_keys`   | no        | `null`   | List of keys to exclude from document
 | `ignore_missing` | no        | `false`  | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
+| `prefix`         | no        | `null`   | Prefix to be added to extracted keys
+| `trim_key`       | no        | `null`   | String of characters to trim from extracted keys
+| `trim_value`     | no        | `null`   | String of characters to trim from extracted values
+| `strip_brackets` | no        | `false`  | If `true` strip brackets `()`, `<>`, `[]` as well as quotes `'` and `"` from extracted values
 |======
 
 

+ 106 - 25
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

@@ -25,11 +25,14 @@ import org.elasticsearch.ingest.ConfigurationUtils;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Processor;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
 
 /**
  * The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
@@ -38,6 +41,8 @@ public final class KeyValueProcessor extends AbstractProcessor {
 
     public static final String TYPE = "kv";
 
+    private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");
+
     private final String field;
     private final String fieldSplit;
     private final String valueSplit;
@@ -45,9 +50,11 @@ public final class KeyValueProcessor extends AbstractProcessor {
     private final Set<String> excludeKeys;
     private final String targetField;
     private final boolean ignoreMissing;
+    private final Consumer<IngestDocument> execution;
 
     KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
-                      Set<String> excludeKeys, String targetField, boolean ignoreMissing) {
+                      Set<String> excludeKeys, String targetField, boolean ignoreMissing,
+                      String trimKey, String trimValue, boolean stripBrackets, String prefix) {
         super(tag);
         this.field = field;
         this.targetField = targetField;
@@ -56,6 +63,92 @@ public final class KeyValueProcessor extends AbstractProcessor {
         this.includeKeys = includeKeys;
         this.excludeKeys = excludeKeys;
         this.ignoreMissing = ignoreMissing;
+        this.execution = buildExecution(
+            fieldSplit, valueSplit, field, includeKeys, excludeKeys, targetField, ignoreMissing, trimKey, trimValue,
+            stripBrackets, prefix
+        );
+    }
+
+    private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
+                                                           Set<String> includeKeys, Set<String> excludeKeys,
+                                                           String targetField, boolean ignoreMissing,
+                                                           String trimKey, String trimValue, boolean stripBrackets,
+                                                           String prefix) {
+        final Predicate<String> keyFilter;
+        if (includeKeys == null) {
+            if (excludeKeys == null) {
+                keyFilter = key -> true;
+            } else {
+                keyFilter = key -> excludeKeys.contains(key) == false;
+            }
+        } else {
+            if (excludeKeys == null) {
+                keyFilter = includeKeys::contains;
+            } else {
+                keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
+            }
+        }
+        final String fieldPathPrefix;
+        String keyPrefix = prefix == null ? "" : prefix;
+        if (targetField == null) {
+            fieldPathPrefix = keyPrefix;
+        } else {
+            fieldPathPrefix = targetField + "." + keyPrefix;
+        }
+        final Function<String, String> keyPrefixer;
+        if (fieldPathPrefix.isEmpty()) {
+            keyPrefixer = val -> val;
+        } else {
+            keyPrefixer = val -> fieldPathPrefix + val;
+        }
+        final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
+        Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
+        final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
+        final Function<String, String> bracketStrip;
+        if (stripBrackets) {
+            bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
+        } else {
+            bracketStrip = val -> val;
+        }
+        final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
+        return document -> {
+            String value = document.getFieldValue(field, String.class, ignoreMissing);
+            if (value == null) {
+                if (ignoreMissing) {
+                    return;
+                }
+                throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
+            }
+            for (String part : fieldSplitter.apply(value)) {
+                String[] kv = valueSplitter.apply(part);
+                if (kv.length != 2) {
+                    throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
+                }
+                String key = keyTrimmer.apply(kv[0]);
+                if (keyFilter.test(key)) {
+                    append(document, keyPrefixer.apply(key), valueTrimmer.apply(bracketStrip.apply(kv[1])));
+                }
+            }
+        };
+    }
+
+    private static Function<String, String> buildTrimmer(String trim) {
+        if (trim == null) {
+            return val -> val;
+        } else {
+            Pattern pattern = Pattern.compile("(^([" + trim + "]+))|([" + trim + "]+$)");
+            return val -> pattern.matcher(val).replaceAll("");
+        }
+    }
+
+    private static Function<String, String[]> buildSplitter(String split, boolean fields) {
+        int limit = fields ? 0 : 2;
+        if (split.length() > 2 || split.length() == 2 && split.charAt(0) != '\\') {
+            Pattern splitPattern = Pattern.compile(split);
+            return val -> splitPattern.split(val, limit);
+        } else {
+            return val -> val.split(split, limit);
+        }
     }
 
     String getField() {
@@ -86,7 +179,7 @@ public final class KeyValueProcessor extends AbstractProcessor {
         return ignoreMissing;
     }
 
-    public void append(IngestDocument document, String targetField, String value) {
+    private static void append(IngestDocument document, String targetField, String value) {
         if (document.hasField(targetField)) {
             document.appendFieldValue(targetField, value);
         } else {
@@ -96,27 +189,7 @@ public final class KeyValueProcessor extends AbstractProcessor {
 
     @Override
     public void execute(IngestDocument document) {
-        String oldVal = document.getFieldValue(field, String.class, ignoreMissing);
-
-        if (oldVal == null && ignoreMissing) {
-            return;
-        } else if (oldVal == null) {
-            throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
-        }
-
-        String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
-        Arrays.stream(oldVal.split(fieldSplit))
-            .map((f) -> {
-                String[] kv = f.split(valueSplit, 2);
-                if (kv.length != 2) {
-                    throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
-                }
-                return kv;
-            })
-            .filter((p) ->
-                (includeKeys == null || includeKeys.contains(p[0])) &&
-                    (excludeKeys == null || excludeKeys.contains(p[0]) == false))
-            .forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
+        execution.accept(document);
     }
 
     @Override
@@ -132,6 +205,11 @@ public final class KeyValueProcessor extends AbstractProcessor {
             String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
             String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
             String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
+            String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
+            String trimValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_value");
+            String prefix = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "prefix");
+            boolean stripBrackets =
+                ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "strip_brackets", false);
             Set<String> includeKeys = null;
             Set<String> excludeKeys = null;
             List<String> includeKeysList = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
@@ -143,7 +221,10 @@ public final class KeyValueProcessor extends AbstractProcessor {
                 excludeKeys = Collections.unmodifiableSet(Sets.newHashSet(excludeKeysList));
             }
             boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
-            return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing);
+            return new KeyValueProcessor(
+                processorTag, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
+                trimKey, trimValue, stripBrackets, prefix
+            );
         }
     }
 }

+ 104 - 12
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java

@@ -25,19 +25,25 @@ import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.RandomDocumentPicks;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
 import static org.hamcrest.Matchers.equalTo;
 
 public class KeyValueProcessorTests extends ESTestCase {
 
+    private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
+
     public void test() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
         String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
+        Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
         assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
@@ -46,7 +52,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testRootTarget() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "myField", "&", "=", null, null,null, false);
+        Processor processor = createKvProcessor("myField", "&", "=", null, null,null, false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
         assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
@@ -55,7 +61,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testKeySameAsSourceField() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         ingestDocument.setFieldValue("first", "first=hello");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "first", "&", "=", null, null,null, false);
+        Processor processor = createKvProcessor("first", "&", "=", null, null,null, false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
     }
@@ -63,7 +69,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testIncludeKeys() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
         String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
+        Processor processor = createKvProcessor(fieldName, "&", "=",
             Sets.newHashSet("first"), null, "target", false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
@@ -73,7 +79,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testExcludeKeys() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
         String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
+        Processor processor = createKvProcessor(fieldName, "&", "=",
             null, Sets.newHashSet("second"), "target", false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
@@ -84,7 +90,7 @@ public class KeyValueProcessorTests extends ESTestCase {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
         String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument,
             "first=hello&second=world&second=universe&third=bar");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=",
+        Processor processor = createKvProcessor(fieldName, "&", "=",
             Sets.newHashSet("first", "second"), Sets.newHashSet("first", "second"), "target", false);
         processor.execute(ingestDocument);
         assertFalse(ingestDocument.hasField("target.first"));
@@ -92,9 +98,9 @@ public class KeyValueProcessorTests extends ESTestCase {
         assertFalse(ingestDocument.hasField("target.third"));
     }
 
-    public void testMissingField() {
+    public void testMissingField() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "&",
+        Processor processor = createKvProcessor("unknown", "&",
             "=", null, null, "target", false);
         IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
         assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
@@ -105,7 +111,7 @@ public class KeyValueProcessorTests extends ESTestCase {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
             Collections.singletonMap(fieldName, null));
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "", "", null, null, "target", true);
+        Processor processor = createKvProcessor(fieldName, "", "", null, null, "target", true);
         processor.execute(ingestDocument);
         assertIngestDocument(originalIngestDocument, ingestDocument);
     }
@@ -113,7 +119,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testNonExistentWithIgnoreMissing() throws Exception {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "unknown", "", "", null, null, "target", true);
+        Processor processor = createKvProcessor("unknown", "", "", null, null, "target", true);
         processor.execute(ingestDocument);
         assertIngestDocument(originalIngestDocument, ingestDocument);
     }
@@ -121,7 +127,7 @@ public class KeyValueProcessorTests extends ESTestCase {
     public void testFailFieldSplitMatch() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
         String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello|second=world|second=universe");
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), fieldName, "&", "=", null, null, "target", false);
+        Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false);
         processor.execute(ingestDocument);
         assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello|second=world|second=universe"));
         assertFalse(ingestDocument.hasField("target.second"));
@@ -129,8 +135,94 @@ public class KeyValueProcessorTests extends ESTestCase {
 
     public void testFailValueSplitMatch() throws Exception {
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("foo", "bar"));
-        Processor processor = new KeyValueProcessor(randomAlphaOfLength(10), "foo", "&", "=", null, null, "target", false);
+        Processor processor = createKvProcessor("foo", "&", "=", null, null, "target", false);
         Exception exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
         assertThat(exception.getMessage(), equalTo("field [foo] does not contain value_split [=]"));
     }
+
+    public void testTrimKeyAndValue() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first= hello &second=world& second =universe");
+        Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, " ", " ", false, null);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
+        assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
+    }
+
+    public void testTrimMultiCharSequence() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument,
+            "to=<foo@example.com>, orig_to=<bar@example.com>, %+relay=mail.example.com[private/dovecot-lmtp]," +
+                " delay=2.2, delays=1.9/0.01/0.01/0.21, dsn=2.0.0, status=sent "
+        );
+        Processor processor = createKvProcessor(fieldName, " ", "=", null, null, "target", false, "%+", "<>,", false, null);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.to", String.class), equalTo("foo@example.com"));
+        assertThat(ingestDocument.getFieldValue("target.orig_to", String.class), equalTo("bar@example.com"));
+        assertThat(ingestDocument.getFieldValue("target.relay", String.class), equalTo("mail.example.com[private/dovecot-lmtp]"));
+        assertThat(ingestDocument.getFieldValue("target.delay", String.class), equalTo("2.2"));
+        assertThat(ingestDocument.getFieldValue("target.delays", String.class), equalTo("1.9/0.01/0.01/0.21"));
+        assertThat(ingestDocument.getFieldValue("target.dsn", String.class), equalTo("2.0.0"));
+        assertThat(ingestDocument.getFieldValue("target.status", String.class), equalTo("sent"));
+    }
+
+    public void testStripBrackets() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(
+            random(), ingestDocument, "first=<hello>&second=\"world\"&second=(universe)&third=<foo>&fourth=[bar]&fifth='last'"
+        );
+        Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, null, null, true, null);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
+        assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
+        assertThat(ingestDocument.getFieldValue("target.third", String.class), equalTo("foo"));
+        assertThat(ingestDocument.getFieldValue("target.fourth", String.class), equalTo("bar"));
+        assertThat(ingestDocument.getFieldValue("target.fifth", String.class), equalTo("last"));
+    }
+
+    public void testAddPrefix() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
+        Processor processor = createKvProcessor(fieldName, "&", "=", null, null, "target", false, null, null, false, "arg_");
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.arg_first", String.class), equalTo("hello"));
+        assertThat(ingestDocument.getFieldValue("target.arg_second", List.class), equalTo(Arrays.asList("world", "universe")));
+    }
+
+    private static KeyValueProcessor createKvProcessor(String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
+                                                       Set<String> excludeKeys, String targetField,
+                                                       boolean ignoreMissing) throws Exception {
+        return createKvProcessor(
+            field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing, null, null, false, null
+        );
+    }
+
+    private static KeyValueProcessor createKvProcessor(String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
+                                                       Set<String> excludeKeys, String targetField, boolean ignoreMissing,
+                                                       String trimKey, String trimValue, boolean stripBrackets,
+                                                       String prefix) throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", field);
+        config.put("field_split", fieldSplit);
+        config.put("value_split", valueSplit);
+        config.put("target_field", targetField);
+        if (includeKeys != null) {
+            config.put("include_keys", new ArrayList<>(includeKeys));
+        }
+        if (excludeKeys != null) {
+            config.put("exclude_keys", new ArrayList<>(excludeKeys));
+        }
+        config.put("ignore_missing", ignoreMissing);
+        if (trimKey != null) {
+            config.put("trim_key", trimKey);
+        }
+        if (trimValue != null) {
+            config.put("trim_value", trimValue);
+        }
+        config.put("strip_brackets", stripBrackets);
+        if (prefix != null) {
+            config.put("prefix", prefix);
+        }
+        return FACTORY.create(null, randomAlphaOfLength(10), config);
+    }
 }