Browse Source

Add ingest-attachment support for per document `indexed_chars` limit (#28977)

We today support a global `indexed_chars` processor parameter. But in some cases, users would like to set this limit depending on the document itself.
It used to be supported in mapper-attachments plugin by extracting the limit value from a meta field in the document sent to indexation process.

We add an option which reads this limit value from the document itself
by adding a setting named `indexed_chars_field`.

Which allows running:

```
PUT _ingest/pipeline/attachment
{
  "description" : "Extract attachment information. Used to parse pdf and office files",
  "processors" : [
    {
      "attachment" : {
        "field" : "data",
        "indexed_chars_field" : "size"
      }
    }
  ]
}
```

Then index either:

```
PUT index/doc/1?pipeline=attachment
{
  "data": "BASE64"
}
```

Which will use the default value (or the one defined by `indexed_chars`)

Or

```
PUT index/doc/2?pipeline=attachment
{
  "data": "BASE64",
  "size": 1000
}
```

Closes #28942
David Pilato 7 years ago
parent
commit
87553bba16

+ 116 - 6
docs/plugins/ingest-attachment.asciidoc

@@ -25,6 +25,7 @@ include::install_remove.asciidoc[]
 | `field`                | yes       | -                | The field to get the base64 encoded field from
 | `target_field`         | no        | attachment       | The field that will hold the attachment information
 | `indexed_chars`        | no        | 100000           | The number of chars being used for extraction to prevent huge fields. Use `-1` for no limit.
+| `indexed_chars_field`  | no        | `null`           | Field name from which you can overwrite the number of chars being used for extraction. See `indexed_chars`.
 | `properties`           | no        | all properties   | Array of properties to select to be stored. Can be `content`, `title`, `name`, `author`, `keywords`, `date`, `content_type`, `content_length`, `language`
 | `ignore_missing`       | no        | `false`          | If `true` and `field` does not exist, the processor quietly exits without modifying the document
 |======
@@ -44,11 +45,11 @@ PUT _ingest/pipeline/attachment
     }
   ]
 }
-PUT my_index/my_type/my_id?pipeline=attachment
+PUT my_index/_doc/my_id?pipeline=attachment
 {
   "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
 }
-GET my_index/my_type/my_id
+GET my_index/_doc/my_id
 --------------------------------------------------
 // CONSOLE
 
@@ -59,7 +60,7 @@ Returns this:
 {
   "found": true,
   "_index": "my_index",
-  "_type": "my_type",
+  "_type": "_doc",
   "_id": "my_id",
   "_version": 1,
   "_source": {
@@ -99,6 +100,115 @@ NOTE: Extracting contents from binary data is a resource intensive operation and
       consumes a lot of resources. It is highly recommended to run pipelines
       using this processor in a dedicated ingest node.
 
+[[ingest-attachment-extracted-chars]]
+==== Limit the number of extracted chars
+
+To prevent extracting too many chars and overload the node memory, the number of chars being used for extraction 
+is limited by default to `100000`. You can change this value by setting `indexed_chars`. Use `-1` for no limit but 
+ensure when setting this that your node will have enough HEAP to extract the content of very big documents.
+
+You can also define this limit per document by extracting from a given field the limit to set. If the document
+has that field, it will overwrite the `indexed_chars` setting. To set this field, define the `indexed_chars_field`
+setting.
+
+For example:
+
+[source,js]
+--------------------------------------------------
+PUT _ingest/pipeline/attachment
+{
+  "description" : "Extract attachment information",
+  "processors" : [
+    {
+      "attachment" : {
+        "field" : "data",
+        "indexed_chars" : 11,
+        "indexed_chars_field" : "max_size"
+      }
+    }
+  ]
+}
+PUT my_index/_doc/my_id?pipeline=attachment
+{
+  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
+}
+GET my_index/_doc/my_id
+--------------------------------------------------
+// CONSOLE
+
+Returns this:
+
+[source,js]
+--------------------------------------------------
+{
+  "found": true,
+  "_index": "my_index",
+  "_type": "_doc",
+  "_id": "my_id",
+  "_version": 1,
+  "_source": {
+    "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
+    "attachment": {
+      "content_type": "application/rtf",
+      "language": "sl",
+      "content": "Lorem ipsum",
+      "content_length": 11
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
+
+[source,js]
+--------------------------------------------------
+PUT _ingest/pipeline/attachment
+{
+  "description" : "Extract attachment information",
+  "processors" : [
+    {
+      "attachment" : {
+        "field" : "data",
+        "indexed_chars" : 11,
+        "indexed_chars_field" : "max_size"
+      }
+    }
+  ]
+}
+PUT my_index/_doc/my_id_2?pipeline=attachment
+{
+  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
+  "max_size": 5
+}
+GET my_index/_doc/my_id_2
+--------------------------------------------------
+// CONSOLE
+
+Returns this:
+
+[source,js]
+--------------------------------------------------
+{
+  "found": true,
+  "_index": "my_index",
+  "_type": "_doc",
+  "_id": "my_id_2",
+  "_version": 1,
+  "_source": {
+    "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
+    "max_size": 5,
+    "attachment": {
+      "content_type": "application/rtf",
+      "language": "ro",
+      "content": "Lorem",
+      "content_length": 5
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
+
 [[ingest-attachment-with-arrays]]
 ==== Using the Attachment Processor with arrays
 
@@ -150,7 +260,7 @@ PUT _ingest/pipeline/attachment
     }
   ]
 }
-PUT my_index/my_type/my_id?pipeline=attachment
+PUT my_index/_doc/my_id?pipeline=attachment
 {
   "attachments" : [
     {
@@ -163,7 +273,7 @@ PUT my_index/my_type/my_id?pipeline=attachment
     }
   ]
 }
-GET my_index/my_type/my_id
+GET my_index/_doc/my_id
 --------------------------------------------------
 // CONSOLE
 
@@ -172,7 +282,7 @@ Returns this:
 --------------------------------------------------
 {
   "_index" : "my_index",
-  "_type" : "my_type",
+  "_type" : "_doc",
   "_id" : "my_id",
   "_version" : 1,
   "found" : true,

+ 20 - 5
plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java

@@ -42,6 +42,7 @@ import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationExcept
 import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
 import static org.elasticsearch.ingest.ConfigurationUtils.readIntProperty;
 import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalList;
+import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalStringProperty;
 import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
 
 public final class AttachmentProcessor extends AbstractProcessor {
@@ -55,15 +56,17 @@ public final class AttachmentProcessor extends AbstractProcessor {
     private final Set<Property> properties;
     private final int indexedChars;
     private final boolean ignoreMissing;
+    private final String indexedCharsField;
 
     AttachmentProcessor(String tag, String field, String targetField, Set<Property> properties,
-                        int indexedChars, boolean ignoreMissing) throws IOException {
+                        int indexedChars, boolean ignoreMissing, String indexedCharsField) {
         super(tag);
         this.field = field;
         this.targetField = targetField;
         this.properties = properties;
         this.indexedChars = indexedChars;
         this.ignoreMissing = ignoreMissing;
+        this.indexedCharsField = indexedCharsField;
     }
 
     boolean isIgnoreMissing() {
@@ -82,6 +85,17 @@ public final class AttachmentProcessor extends AbstractProcessor {
             throw new IllegalArgumentException("field [" + field + "] is null, cannot parse.");
         }
 
+        Integer indexedChars = this.indexedChars;
+
+        if (indexedCharsField != null) {
+            // If the user provided the number of characters to be extracted as part of the document, we use it
+            indexedChars = ingestDocument.getFieldValue(indexedCharsField, Integer.class, true);
+            if (indexedChars == null) {
+                // If the field does not exist we fall back to the global limit
+                indexedChars = this.indexedChars;
+            }
+        }
+
         Metadata metadata = new Metadata();
         String parsedContent = "";
         try {
@@ -183,14 +197,15 @@ public final class AttachmentProcessor extends AbstractProcessor {
                                           Map<String, Object> config) throws Exception {
             String field = readStringProperty(TYPE, processorTag, config, "field");
             String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "attachment");
-            List<String> properyNames = readOptionalList(TYPE, processorTag, config, "properties");
+            List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
             int indexedChars = readIntProperty(TYPE, processorTag, config, "indexed_chars", NUMBER_OF_CHARS_INDEXED);
             boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
+            String indexedCharsField = readOptionalStringProperty(TYPE, processorTag, config, "indexed_chars_field");
 
             final Set<Property> properties;
-            if (properyNames != null) {
+            if (propertyNames != null) {
                 properties = EnumSet.noneOf(Property.class);
-                for (String fieldName : properyNames) {
+                for (String fieldName : propertyNames) {
                     try {
                         properties.add(Property.parse(fieldName));
                     } catch (Exception e) {
@@ -202,7 +217,7 @@ public final class AttachmentProcessor extends AbstractProcessor {
                 properties = DEFAULT_PROPERTIES;
             }
 
-            return new AttachmentProcessor(processorTag, field, targetField, properties, indexedChars, ignoreMissing);
+            return new AttachmentProcessor(processorTag, field, targetField, properties, indexedChars, ignoreMissing, indexedCharsField);
         }
     }
 

+ 54 - 8
plugins/ingest-attachment/src/test/java/org/elasticsearch/ingest/attachment/AttachmentProcessorTests.java

@@ -54,9 +54,9 @@ public class AttachmentProcessorTests extends ESTestCase {
     private AttachmentProcessor processor;
 
     @Before
-    public void createStandardProcessor() throws IOException {
+    public void createStandardProcessor() {
         processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
-            "target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 10000, false);
+            "target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 10000, false, null);
     }
 
     public void testEnglishTextDocument() throws Exception {
@@ -89,7 +89,7 @@ public class AttachmentProcessorTests extends ESTestCase {
             selectedProperties.add(AttachmentProcessor.Property.DATE);
         }
         processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
-            "target_field", selectedProperties, 10000, false);
+            "target_field", selectedProperties, 10000, false, null);
 
         Map<String, Object> attachmentData = parseDocument("htmlWithEmptyDateMeta.html", processor);
         assertThat(attachmentData.keySet(), hasSize(selectedFieldNames.length));
@@ -242,7 +242,7 @@ public class AttachmentProcessorTests extends ESTestCase {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
             Collections.singletonMap("source_field", null));
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true);
+        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true, null);
         processor.execute(ingestDocument);
         assertIngestDocument(originalIngestDocument, ingestDocument);
     }
@@ -250,7 +250,7 @@ public class AttachmentProcessorTests extends ESTestCase {
     public void testNonExistentWithIgnoreMissing() throws Exception {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true);
+        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, true, null);
         processor.execute(ingestDocument);
         assertIngestDocument(originalIngestDocument, ingestDocument);
     }
@@ -259,7 +259,7 @@ public class AttachmentProcessorTests extends ESTestCase {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(),
             Collections.singletonMap("source_field", null));
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false);
+        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false, null);
         Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
         assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot parse."));
     }
@@ -267,14 +267,20 @@ public class AttachmentProcessorTests extends ESTestCase {
     public void testNonExistentWithoutIgnoreMissing() throws Exception {
         IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
         IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
-        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false);
+        Processor processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field", "randomTarget", null, 10, false, null);
         Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
         assertThat(exception.getMessage(), equalTo("field [source_field] not present as part of path [source_field]"));
     }
 
     private Map<String, Object> parseDocument(String file, AttachmentProcessor processor) throws Exception {
+        return parseDocument(file, processor, new HashMap<>());
+    }
+
+    private Map<String, Object> parseDocument(String file, AttachmentProcessor processor, Map<String, Object> optionalFields)
+        throws Exception {
         Map<String, Object> document = new HashMap<>();
         document.put("source_field", getAsBase64(file));
+        document.putAll(optionalFields);
 
         IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
         processor.execute(ingestDocument);
@@ -284,7 +290,47 @@ public class AttachmentProcessorTests extends ESTestCase {
         return attachmentData;
     }
 
-    protected String getAsBase64(String filename) throws Exception {
+    public void testIndexedChars() throws Exception {
+        processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
+            "target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 19, false, null);
+
+        Map<String, Object> attachmentData = parseDocument("text-in-english.txt", processor);
+
+        assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
+        assertThat(attachmentData.get("language"), is("en"));
+        assertThat(attachmentData.get("content"), is("\"God Save the Queen"));
+        assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
+        assertThat(attachmentData.get("content_length"), is(19L));
+
+        processor = new AttachmentProcessor(randomAlphaOfLength(10), "source_field",
+            "target_field", EnumSet.allOf(AttachmentProcessor.Property.class), 19, false, "max_length");
+
+        attachmentData = parseDocument("text-in-english.txt", processor);
+
+        assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
+        assertThat(attachmentData.get("language"), is("en"));
+        assertThat(attachmentData.get("content"), is("\"God Save the Queen"));
+        assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
+        assertThat(attachmentData.get("content_length"), is(19L));
+
+        attachmentData = parseDocument("text-in-english.txt", processor, Collections.singletonMap("max_length", 10));
+
+        assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
+        assertThat(attachmentData.get("language"), is("sk"));
+        assertThat(attachmentData.get("content"), is("\"God Save"));
+        assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
+        assertThat(attachmentData.get("content_length"), is(10L));
+
+        attachmentData = parseDocument("text-in-english.txt", processor, Collections.singletonMap("max_length", 100));
+
+        assertThat(attachmentData.keySet(), containsInAnyOrder("language", "content", "content_type", "content_length"));
+        assertThat(attachmentData.get("language"), is("en"));
+        assertThat(attachmentData.get("content"), is("\"God Save the Queen\" (alternatively \"God Save the King\""));
+        assertThat(attachmentData.get("content_type").toString(), containsString("text/plain"));
+        assertThat(attachmentData.get("content_length"), is(56L));
+    }
+
+    private String getAsBase64(String filename) throws Exception {
         String path = "/org/elasticsearch/ingest/attachment/test/sample-files/" + filename;
         try (InputStream is = AttachmentProcessorTests.class.getResourceAsStream(path)) {
             byte bytes[] = IOUtils.toByteArray(is);

+ 74 - 0
plugins/ingest-attachment/src/test/resources/rest-api-spec/test/ingest_attachment/20_attachment_processor.yml

@@ -112,3 +112,77 @@
   - match: { _source.attachment.content: "This is an english text to tes" }
   - match: { _source.attachment.language: "en" }
   - match: { _source.attachment.content_length: 30 }
+
+---
+"Test indexed chars are configurable per document":
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "attachment" : {
+                  "field" : "field1",
+                  "indexed_chars": 30,
+                  "indexed_chars_field": "max_size"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 1
+        pipeline: "my_pipeline"
+        body: { field1: "VGhpcyBpcyBhbiBlbmdsaXNoIHRleHQgdG8gdGVzdCBpZiB0aGUgcGlwZWxpbmUgd29ya3M=" }
+
+  - do:
+      get:
+        index: test
+        type: test
+        id: 1
+  - length: { _source.attachment: 4 }
+  - match: { _source.attachment.content: "This is an english text to tes" }
+  - match: { _source.attachment.language: "en" }
+  - match: { _source.attachment.content_length: 30 }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 2
+        pipeline: "my_pipeline"
+        body: { field1: "VGhpcyBpcyBhbiBlbmdsaXNoIHRleHQgdG8gdGVzdCBpZiB0aGUgcGlwZWxpbmUgd29ya3M=", "max_size": 18 }
+
+  - do:
+      get:
+        index: test
+        type: test
+        id: 2
+  - length: { _source.attachment: 4 }
+  - match: { _source.attachment.content: "This is an english" }
+  - match: { _source.attachment.language: "en" }
+  - match: { _source.attachment.content_length: 18 }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 3
+        pipeline: "my_pipeline"
+        body: { field1: "VGhpcyBpcyBhbiBlbmdsaXNoIHRleHQgdG8gdGVzdCBpZiB0aGUgcGlwZWxpbmUgd29ya3M=", "max_size": 100000000 }
+
+  - do:
+      get:
+        index: test
+        type: test
+        id: 3
+  - length: { _source.attachment: 4 }
+  - match: { _source.attachment.content: "This is an english text to test if the pipeline works" }
+  - match: { _source.attachment.language: "en" }
+  - match: { _source.attachment.content_length: 54 }