Browse Source

introduce KV Processor in Ingest Node (#22272)

Now you can parse field values of the `key=value` variety and have
`key` be inserted as a field name in an ingest document.

Closes #22222.
Tal Levy 8 years ago
parent
commit
c53b2ee9cd

+ 32 - 0
docs/reference/ingest/ingest-node.asciidoc

@@ -1515,6 +1515,38 @@ Converts a JSON string into a structured JSON object.
 }
 --------------------------------------------------
 
+[[kv-processor]]
+=== KV Processor
+This processor helps automatically parse messages (or specific event fields) which are of the foo=bar variety.
+
+For example, if you have a log message which contains `ip=1.2.3.4 error=REFUSED`, you can parse those automatically by configuring:
+
+
+[source,js]
+--------------------------------------------------
+{
+  "kv": {
+    "field": "message",
+    "field_split": " ",
+    "value_split": "="
+  }
+}
+--------------------------------------------------
+
+[[kv-options]]
+.Kv Options
+[options="header"]
+|======
+| Name             | Required  | Default  | Description
+| `field`          | yes       | -        | The field to be parsed
+| `field_split`    | yes       | -        | Regex pattern to use for splitting key-value pairs
+| `value_split`    | yes       | -        | Regex pattern to use for splitting the key from the value within a key-value pair
+| `target_field`   | no        | `null`   | The field to insert the extracted keys into. Defaults to the root of the document
+| `include_keys`   | no        | `null`   | List of keys to filter and insert into document. Defaults to including all keys
+| `ignore_missing` | no        | `false`  | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
+|======
+
+
 [[lowercase-processor]]
 === Lowercase Processor
 Converts a string to its lowercase equivalent.

+ 1 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

@@ -63,6 +63,7 @@ public class IngestCommonPlugin extends Plugin implements IngestPlugin {
         processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
         processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
         processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
+        processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
         return Collections.unmodifiableMap(processors);
     }
 

+ 127 - 0
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java

@@ -0,0 +1,127 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.AbstractProcessor;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The KeyValueProcessor parses and extracts messages of the `key=value` variety into fields with values of the keys.
+ */
+public final class KeyValueProcessor extends AbstractProcessor {
+
+    public static final String TYPE = "kv";
+
+    private final String field;
+    private final String fieldSplit;
+    private final String valueSplit;
+    private final List<String> includeKeys;
+    private final String targetField;
+    private final boolean ignoreMissing;
+
+    KeyValueProcessor(String tag, String field, String fieldSplit, String valueSplit, List<String> includeKeys,
+                      String targetField, boolean ignoreMissing) {
+        super(tag);
+        this.field = field;
+        this.targetField = targetField;
+        this.fieldSplit = fieldSplit;
+        this.valueSplit = valueSplit;
+        this.includeKeys = includeKeys;
+        this.ignoreMissing = ignoreMissing;
+    }
+
+    String getField() {
+        return field;
+    }
+
+    String getFieldSplit() {
+        return fieldSplit;
+    }
+
+    String getValueSplit() {
+        return valueSplit;
+    }
+
+    List<String> getIncludeKeys() {
+        return includeKeys;
+    }
+
+    String getTargetField() {
+        return targetField;
+    }
+
+    boolean isIgnoreMissing() {
+        return ignoreMissing;
+    }
+
+    public void append(IngestDocument document, String targetField, String value) {
+        if (document.hasField(targetField)) {
+            document.appendFieldValue(targetField, value);
+        } else {
+            document.setFieldValue(targetField, value);
+        }
+    }
+
+    @Override
+    public void execute(IngestDocument document) {
+        String oldVal = document.getFieldValue(field, String.class, ignoreMissing);
+
+        if (oldVal == null && ignoreMissing) {
+            return;
+        } else if (oldVal == null) {
+            throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
+        }
+
+        String fieldPathPrefix = (targetField == null) ? "" : targetField + ".";
+        Arrays.stream(oldVal.split(fieldSplit))
+            .map((f) -> f.split(valueSplit, 2))
+            .filter((p) -> includeKeys == null || includeKeys.contains(p[0]))
+            .forEach((p) -> append(document, fieldPathPrefix + p[0], p[1]));
+    }
+
+    @Override
+    public String getType() {
+        return TYPE;
+    }
+
+    public static class Factory implements Processor.Factory {
+        @Override
+        public KeyValueProcessor create(Map<String, Processor.Factory> registry, String processorTag,
+                                        Map<String, Object> config) throws Exception {
+            String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
+            String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
+            String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
+            String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
+            List<String> includeKeys = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "include_keys");
+            if (includeKeys != null) {
+                includeKeys = Collections.unmodifiableList(includeKeys);
+            }
+            boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
+            return new KeyValueProcessor(processorTag, field, fieldSplit, valueSplit, includeKeys, targetField, ignoreMissing);
+        }
+    }
+}

+ 102 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorFactoryTests.java

@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+public class KeyValueProcessorFactoryTests extends ESTestCase {
+
+    public void testCreateWithDefaults() throws Exception {
+        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("field_split", "&");
+        config.put("value_split", "=");
+        String processorTag = randomAsciiOfLength(10);
+        KeyValueProcessor processor = factory.create(null, processorTag, config);
+        assertThat(processor.getTag(), equalTo(processorTag));
+        assertThat(processor.getField(), equalTo("field1"));
+        assertThat(processor.getFieldSplit(), equalTo("&"));
+        assertThat(processor.getValueSplit(), equalTo("="));
+        assertThat(processor.getIncludeKeys(), is(nullValue()));
+        assertThat(processor.getTargetField(), is(nullValue()));
+        assertFalse(processor.isIgnoreMissing());
+    }
+
+    public void testCreateWithAllFieldsSet() throws Exception {
+        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("field_split", "&");
+        config.put("value_split", "=");
+        config.put("target_field", "target");
+        config.put("include_keys", Arrays.asList("a", "b"));
+        config.put("ignore_missing", true);
+        String processorTag = randomAsciiOfLength(10);
+        KeyValueProcessor processor = factory.create(null, processorTag, config);
+        assertThat(processor.getTag(), equalTo(processorTag));
+        assertThat(processor.getField(), equalTo("field1"));
+        assertThat(processor.getFieldSplit(), equalTo("&"));
+        assertThat(processor.getValueSplit(), equalTo("="));
+        assertThat(processor.getIncludeKeys(), equalTo(Arrays.asList("a", "b")));
+        assertThat(processor.getTargetField(), equalTo("target"));
+        assertTrue(processor.isIgnoreMissing());
+    }
+
+    public void testCreateWithMissingField() {
+        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
+        Map<String, Object> config = new HashMap<>();
+        String processorTag = randomAsciiOfLength(10);
+        ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
+            () -> factory.create(null, processorTag, config));
+        assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
+    }
+
+    public void testCreateWithMissingFieldSplit() {
+        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        String processorTag = randomAsciiOfLength(10);
+        ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
+            () -> factory.create(null, processorTag, config));
+        assertThat(exception.getMessage(), equalTo("[field_split] required property is missing"));
+    }
+
+    public void testCreateWithMissingValueSplit() {
+        KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
+        Map<String, Object> config = new HashMap<>();
+        config.put("field", "field1");
+        config.put("field_split", "&");
+        String processorTag = randomAsciiOfLength(10);
+        ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
+            () -> factory.create(null, processorTag, config));
+        assertThat(exception.getMessage(), equalTo("[value_split] required property is missing"));
+    }
+}

+ 96 - 0
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/KeyValueProcessorTests.java

@@ -0,0 +1,96 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest.common;
+
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.RandomDocumentPicks;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
+import static org.hamcrest.Matchers.equalTo;
+
+public class KeyValueProcessorTests extends ESTestCase {
+
+    public void test() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=", null, "target", false);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
+        assertThat(ingestDocument.getFieldValue("target.second", List.class), equalTo(Arrays.asList("world", "universe")));
+    }
+
+    public void testRootTarget() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
+        ingestDocument.setFieldValue("myField", "first=hello&second=world&second=universe");
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "myField", "&", "=", null, null, false);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("first", String.class), equalTo("hello"));
+        assertThat(ingestDocument.getFieldValue("second", List.class), equalTo(Arrays.asList("world", "universe")));
+    }
+
+    public void testKeySameAsSourceField() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
+        ingestDocument.setFieldValue("first", "first=hello");
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "first", "&", "=", null, null, false);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("first", List.class), equalTo(Arrays.asList("first=hello", "hello")));
+    }
+
+    public void testIncludeKeys() throws Exception {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+        String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, "first=hello&second=world&second=universe");
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "&", "=",
+            Collections.singletonList("first"), "target", false);
+        processor.execute(ingestDocument);
+        assertThat(ingestDocument.getFieldValue("target.first", String.class), equalTo("hello"));
+        assertFalse(ingestDocument.hasField("target.second"));
+    }
+
+    public void testMissingField() {
+        IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "&", "=", null, "target", false);
+        IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
+        assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
+    }
+
+    public void testNullValueWithIgnoreMissing() throws Exception {
+        String fieldName = RandomDocumentPicks.randomFieldName(random());
+        IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
+            Collections.singletonMap(fieldName, null));
+        IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), fieldName, "", "", null, "target", true);
+        processor.execute(ingestDocument);
+        assertIngestDocument(originalIngestDocument, ingestDocument);
+    }
+
+    public void testNonExistentWithIgnoreMissing() throws Exception {
+        IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
+        IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
+        Processor processor = new KeyValueProcessor(randomAsciiOfLength(10), "unknown", "", "", null, "target", true);
+        processor.execute(ingestDocument);
+        assertIngestDocument(originalIngestDocument, ingestDocument);
+    }
+}

+ 10 - 9
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yaml

@@ -20,12 +20,13 @@
     - match:  { nodes.$master.ingest.processors.8.type: gsub }
     - match:  { nodes.$master.ingest.processors.9.type: join }
     - match:  { nodes.$master.ingest.processors.10.type: json }
-    - match:  { nodes.$master.ingest.processors.11.type: lowercase }
-    - match:  { nodes.$master.ingest.processors.12.type: remove }
-    - match:  { nodes.$master.ingest.processors.13.type: rename }
-    - match:  { nodes.$master.ingest.processors.14.type: script }
-    - match:  { nodes.$master.ingest.processors.15.type: set }
-    - match:  { nodes.$master.ingest.processors.16.type: sort }
-    - match:  { nodes.$master.ingest.processors.17.type: split }
-    - match:  { nodes.$master.ingest.processors.18.type: trim }
-    - match:  { nodes.$master.ingest.processors.19.type: uppercase }
+    - match:  { nodes.$master.ingest.processors.11.type: kv }
+    - match:  { nodes.$master.ingest.processors.12.type: lowercase }
+    - match:  { nodes.$master.ingest.processors.13.type: remove }
+    - match:  { nodes.$master.ingest.processors.14.type: rename }
+    - match:  { nodes.$master.ingest.processors.15.type: script }
+    - match:  { nodes.$master.ingest.processors.16.type: set }
+    - match:  { nodes.$master.ingest.processors.17.type: sort }
+    - match:  { nodes.$master.ingest.processors.18.type: split }
+    - match:  { nodes.$master.ingest.processors.19.type: trim }
+    - match:  { nodes.$master.ingest.processors.20.type: uppercase }

+ 43 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/150_kv.yaml

@@ -0,0 +1,43 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "1"
+        ignore: 404
+
+---
+"Test KV Processor":
+  - do:
+      ingest.put_pipeline:
+        id: "1"
+        body:  >
+          {
+            "processors": [
+              {
+                "kv" : {
+                  "field" : "foo",
+                  "field_split": " ",
+                  "value_split": "="
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 1
+        pipeline: "1"
+        body: {
+          foo: "goodbye=everybody hello=world"
+        }
+
+  - do:
+      get:
+        index: test
+        type: test
+        id: 1
+  - match: { _source.goodbye: "everybody" }
+  - match: { _source.hello: "world" }