|
@@ -1,907 +0,0 @@
|
|
|
-[[pipeline]]
|
|
|
-== Pipeline Definition
|
|
|
-
|
|
|
-A pipeline is a definition of a series of <<ingest-processors, processors>> that are to be executed
|
|
|
-in the same order as they are declared. A pipeline consists of two main fields: a `description`
|
|
|
-and a list of `processors`:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "description" : "...",
|
|
|
- "processors" : [ ... ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-The `description` is a special field to store a helpful description of
|
|
|
-what the pipeline does.
|
|
|
-
|
|
|
-The `processors` parameter defines a list of processors to be executed in
|
|
|
-order.
|
|
|
-
|
|
|
-[[accessing-data-in-pipelines]]
|
|
|
-== Accessing Data in Pipelines
|
|
|
-
|
|
|
-The processors in a pipeline have read and write access to documents that pass through the pipeline.
|
|
|
-The processors can access fields in the source of a document and the document's metadata fields.
|
|
|
-
|
|
|
-[discrete]
|
|
|
-[[accessing-source-fields]]
|
|
|
-=== Accessing Fields in the Source
|
|
|
-Accessing a field in the source is straightforward. You simply refer to fields by
|
|
|
-their name. For example:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "my_field",
|
|
|
- "value": 582.1
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-On top of this, fields from the source are always accessible via the `_source` prefix:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "_source.my_field",
|
|
|
- "value": 582.1
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-[discrete]
|
|
|
-[[accessing-metadata-fields]]
|
|
|
-=== Accessing Metadata Fields
|
|
|
-You can access metadata fields in the same way that you access fields in the source. This
|
|
|
-is possible because Elasticsearch doesn't allow fields in the source that have the
|
|
|
-same name as metadata fields.
|
|
|
-
|
|
|
-The following metadata fields are accessible by a processor:
|
|
|
-
|
|
|
-* `_index`
|
|
|
-* `_id`
|
|
|
-* `_routing`
|
|
|
-
|
|
|
-The following example sets the `_id` metadata field of a document to `1`:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "_id",
|
|
|
- "value": "1"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-You can access a metadata field's value by surrounding it in double
|
|
|
-curly brackets `"{{ }}"`. For example, `{{_index}}` retrieves the name of a
|
|
|
-document's index.
|
|
|
-
|
|
|
-WARNING: If you <<create-document-ids-automatically,automatically generate>>
|
|
|
-document IDs, you cannot use the `{{_id}}` value in an ingest processor. {es}
|
|
|
-assigns auto-generated `_id` values after ingest.
|
|
|
-
|
|
|
-[discrete]
|
|
|
-[[accessing-ingest-metadata]]
|
|
|
-=== Accessing Ingest Metadata Fields
|
|
|
-Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.
|
|
|
-These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestamp
|
|
|
-under the `_ingest.timestamp` key of the ingest metadata. The ingest timestamp is the time when Elasticsearch
|
|
|
-received the index or bulk request to pre-process the document.
|
|
|
-
|
|
|
-Any processor can add ingest-related metadata during document processing. Ingest metadata is transient
|
|
|
-and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won't be indexed.
|
|
|
-
|
|
|
-The following example adds a field with the name `received`. The value is the ingest timestamp:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "received",
|
|
|
- "value": "{{_ingest.timestamp}}"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-Unlike Elasticsearch metadata fields, the ingest metadata field name `_ingest` can be used as a valid field name
|
|
|
-in the source of a document. Use `_source._ingest` to refer to the field in the source document. Otherwise, `_ingest`
|
|
|
-will be interpreted as an ingest metadata field.
|
|
|
-
|
|
|
-[discrete]
|
|
|
-[[accessing-template-fields]]
|
|
|
-=== Accessing Fields and Metafields in Templates
|
|
|
-A number of processor settings also support templating. Settings that support templating can have zero or more
|
|
|
-template snippets. A template snippet begins with `{{` and ends with `}}`.
|
|
|
-Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
|
|
|
-
|
|
|
-The following example adds a field named `field_c`. Its value is a concatenation of
|
|
|
-the values of `field_a` and `field_b`.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "field_c",
|
|
|
- "value": "{{field_a}} {{field_b}}"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-The following example uses the value of the `geoip.country_iso_code` field in the source
|
|
|
-to set the index that the document will be indexed into:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "_index",
|
|
|
- "value": "{{geoip.country_iso_code}}"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-Dynamic field names are also supported. This example sets the field named after the
|
|
|
-value of `service` to the value of the field `code`:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "field": "{{service}}",
|
|
|
- "value": "{{code}}"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-[[ingest-conditionals]]
|
|
|
-== Conditional Execution in Pipelines
|
|
|
-
|
|
|
-Each processor allows for an optional `if` condition to determine if that
|
|
|
-processor should be executed or skipped. The value of the `if` is a
|
|
|
-<<modules-scripting-painless, Painless>> script that needs to evaluate
|
|
|
-to `true` or `false`.
|
|
|
-
|
|
|
-For example the following processor will <<drop-processor,drop>> the document
|
|
|
-(i.e. not index it) if the input document has a field named `network_name`
|
|
|
-and it is equal to `Guest`.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/drop_guests_network
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": "ctx.network_name == 'Guest'"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-Using that pipeline for an index request:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/1?pipeline=drop_guests_network
|
|
|
-{
|
|
|
- "network_name" : "Guest"
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-Results in nothing indexed since the conditional evaluated to `true`.
|
|
|
-
|
|
|
-[source,console-result]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "_index": "test",
|
|
|
- "_id": "1",
|
|
|
- "_version": -3,
|
|
|
- "result": "noop",
|
|
|
- "_shards": {
|
|
|
- "total": 0,
|
|
|
- "successful": 0,
|
|
|
- "failed": 0
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-
|
|
|
-[[ingest-conditional-nullcheck]]
|
|
|
-=== Handling Nested Fields in Conditionals
|
|
|
-
|
|
|
-Source documents often contain nested fields. Care should be taken
|
|
|
-to avoid NullPointerExceptions if the parent object does not exist
|
|
|
-in the document. For example `ctx.a.b.c` can throw an NullPointerExceptions
|
|
|
-if the source document does not have top level `a` object, or a second
|
|
|
-level `b` object.
|
|
|
-
|
|
|
-To help protect against NullPointerExceptions, null safe operations should be used.
|
|
|
-Fortunately, Painless makes {painless}/painless-operators-reference.html#null-safe-operator[null safe]
|
|
|
-operations easy with the `?.` operator.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/drop_guests_network
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": "ctx.network?.name == 'Guest'"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-The following document will get <<drop-processor,dropped>> correctly:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/1?pipeline=drop_guests_network
|
|
|
-{
|
|
|
- "network": {
|
|
|
- "name": "Guest"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-Thanks to the `?.` operator the following document will not throw an error.
|
|
|
-If the pipeline used a `.` the following document would throw a NullPointerException
|
|
|
-since the `network` object is not part of the source document.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/2?pipeline=drop_guests_network
|
|
|
-{
|
|
|
- "foo" : "bar"
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-////
|
|
|
-Hidden example assertion:
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-GET test/_doc/2
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-[source,console-result]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "_index": "test",
|
|
|
- "_id": "2",
|
|
|
- "_version": 1,
|
|
|
- "_seq_no": 22,
|
|
|
- "_primary_term": 1,
|
|
|
- "found": true,
|
|
|
- "_source": {
|
|
|
- "foo": "bar"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term": 1/"_primary_term" : $body._primary_term/]
|
|
|
-////
|
|
|
-
|
|
|
-The source document can also use dot delimited fields to represent nested fields.
|
|
|
-
|
|
|
-For example instead the source document defining the fields nested:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "network": {
|
|
|
- "name": "Guest"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-The source document may have the nested fields flattened as such:
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "network.name": "Guest"
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-If this is the case, use the <<dot-expand-processor, Dot Expand Processor>>
|
|
|
-so that the nested fields may be used in a conditional.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/drop_guests_network
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "dot_expander": {
|
|
|
- "field": "network.name"
|
|
|
- }
|
|
|
- },
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": "ctx.network?.name == 'Guest'"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-Now the following input document can be used with a conditional in the pipeline.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/3?pipeline=drop_guests_network
|
|
|
-{
|
|
|
- "network.name": "Guest"
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-The `?.` operators works well for use in the `if` conditional
|
|
|
-because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
|
|
|
-returns null if the object is null and `==` is null safe (as well as many other
|
|
|
-{painless}/painless-operators.html[painless operators]).
|
|
|
-
|
|
|
-However, calling a method such as `.equalsIgnoreCase` is not null safe
|
|
|
-and can result in a NullPointerException.
|
|
|
-
|
|
|
-Some situations allow for the same functionality but done so in a null safe manner.
|
|
|
-For example: `'Guest'.equalsIgnoreCase(ctx.network?.name)` is null safe because
|
|
|
-`Guest` is always non null, but `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe
|
|
|
-since `ctx.network?.name` can return null.
|
|
|
-
|
|
|
-Some situations require an explicit null check. In the following example there
|
|
|
-is not null safe alternative, so an explicit null check is needed.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "drop": {
|
|
|
- "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-[[ingest-conditional-complex]]
|
|
|
-=== Complex Conditionals
|
|
|
-The `if` condition can be more complex than a simple equality check.
|
|
|
-The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
|
|
|
-running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
|
|
|
-
|
|
|
-IMPORTANT: The value of ctx is read-only in `if` conditions.
|
|
|
-
|
|
|
-A more complex `if` condition that drops the document (i.e. not index it)
|
|
|
-unless it has a multi-valued tag field with at least one value that contains the characters
|
|
|
-`prod` (case insensitive).
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/not_prod_dropper
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-The conditional needs to be all on one line since JSON does not
|
|
|
-support new line characters. However, Kibana's console supports
|
|
|
-a triple quote syntax to help with writing and debugging
|
|
|
-scripts like these.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/not_prod_dropper
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": """
|
|
|
- Collection tags = ctx.tags;
|
|
|
- if(tags != null){
|
|
|
- for (String tag : tags) {
|
|
|
- if (tag.toLowerCase().contains('prod')) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- """
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-or it can be built with a stored script:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _scripts/not_prod
|
|
|
-{
|
|
|
- "script": {
|
|
|
- "lang": "painless",
|
|
|
- "source": """
|
|
|
- Collection tags = ctx.tags;
|
|
|
- if(tags != null){
|
|
|
- for (String tag : tags) {
|
|
|
- if (tag.toLowerCase().contains('prod')) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return true;
|
|
|
- """
|
|
|
- }
|
|
|
-}
|
|
|
-PUT _ingest/pipeline/not_prod_dropper
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "drop": {
|
|
|
- "if": { "id": "not_prod" }
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-Either way, you can run it with:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/1?pipeline=not_prod_dropper
|
|
|
-{
|
|
|
- "tags": ["application:myapp", "env:Stage"]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-The document is <<drop-processor,dropped>> since `prod` (case insensitive)
|
|
|
-is not found in the tags.
|
|
|
-
|
|
|
-The following document is indexed (i.e. not dropped) since
|
|
|
-`prod` (case insensitive) is found in the tags.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/2?pipeline=not_prod_dropper
|
|
|
-{
|
|
|
- "tags": ["application:myapp", "env:Production"]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-////
|
|
|
-Hidden example assertion:
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-GET test/_doc/2
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-[source,console-result]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "_index": "test",
|
|
|
- "_id": "2",
|
|
|
- "_version": 1,
|
|
|
- "_seq_no": 34,
|
|
|
- "_primary_term": 1,
|
|
|
- "found": true,
|
|
|
- "_source": {
|
|
|
- "tags": [
|
|
|
- "application:myapp",
|
|
|
- "env:Production"
|
|
|
- ]
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
|
-////
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-The <<simulate-pipeline-api>> with verbose can be used to help build out
|
|
|
-complex conditionals. If the conditional evaluates to false it will be
|
|
|
-omitted from the verbose results of the simulation since the document will not change.
|
|
|
-
|
|
|
-Care should be taken to avoid overly complex or expensive conditional checks
|
|
|
-since the condition needs to be checked for each and every document.
|
|
|
-
|
|
|
-[[conditionals-with-multiple-pipelines]]
|
|
|
-=== Conditionals with the Pipeline Processor
|
|
|
-The combination of the `if` conditional and the <<pipeline-processor>> can result in a simple,
|
|
|
-yet powerful means to process heterogeneous input. For example, you can define a single pipeline
|
|
|
-that delegates to other pipelines based on some criteria.
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/logs_pipeline
|
|
|
-{
|
|
|
- "description": "A pipeline of pipelines for log files",
|
|
|
- "version": 1,
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "pipeline": {
|
|
|
- "if": "ctx.service?.name == 'apache_httpd'",
|
|
|
- "name": "httpd_pipeline"
|
|
|
- }
|
|
|
- },
|
|
|
- {
|
|
|
- "pipeline": {
|
|
|
- "if": "ctx.service?.name == 'syslog'",
|
|
|
- "name": "syslog_pipeline"
|
|
|
- }
|
|
|
- },
|
|
|
- {
|
|
|
- "fail": {
|
|
|
- "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
|
|
|
- "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-The above example allows consumers to point to a single pipeline for all log based index requests.
|
|
|
-Based on the conditional, the correct pipeline will be called to process that type of data.
|
|
|
-
|
|
|
-This pattern works well with a <<dynamic-index-settings, default pipeline>> defined in an index mapping
|
|
|
-template for all indexes that hold data that needs pre-index processing.
|
|
|
-
|
|
|
-[[conditionals-with-regex]]
|
|
|
-=== Conditionals with the Regular Expressions
|
|
|
-The `if` conditional is implemented as a Painless script, which requires
|
|
|
-{painless}//painless-regexes.html[explicit support for regular expressions].
|
|
|
-
|
|
|
-`script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regular
|
|
|
-expressions in the `if` condition.
|
|
|
-
|
|
|
-If regular expressions are enabled, operators such as `=~` can be used against a `/pattern/` for conditions.
|
|
|
-
|
|
|
-For example:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/check_url
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "set": {
|
|
|
- "if": "ctx.href?.url =~ /^http[^s]/",
|
|
|
- "field": "href.insecure",
|
|
|
- "value": true
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-POST test/_doc/1?pipeline=check_url
|
|
|
-{
|
|
|
- "href": {
|
|
|
- "url": "http://www.elastic.co/"
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-
|
|
|
-Results in:
|
|
|
-
|
|
|
-////
|
|
|
-Hidden example assertion:
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-GET test/_doc/1
|
|
|
---------------------------------------------------
|
|
|
-// TEST[continued]
|
|
|
-////
|
|
|
-
|
|
|
-[source,console-result]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "_index": "test",
|
|
|
- "_id": "1",
|
|
|
- "_version": 1,
|
|
|
- "_seq_no": 60,
|
|
|
- "_primary_term": 1,
|
|
|
- "found": true,
|
|
|
- "_source": {
|
|
|
- "href": {
|
|
|
- "insecure": true,
|
|
|
- "url": "http://www.elastic.co/"
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
|
|
|
-
|
|
|
-
|
|
|
-Regular expressions can be expensive and should be avoided if viable
|
|
|
-alternatives exist.
|
|
|
-
|
|
|
-For example in this case `startsWith` can be used to get the same result
|
|
|
-without using a regular expression:
|
|
|
-
|
|
|
-[source,console]
|
|
|
---------------------------------------------------
|
|
|
-PUT _ingest/pipeline/check_url
|
|
|
-{
|
|
|
- "processors": [
|
|
|
- {
|
|
|
- "set": {
|
|
|
- "if": "ctx.href?.url != null && ctx.href.url.startsWith('http://')",
|
|
|
- "field": "href.insecure",
|
|
|
- "value": true
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-[[handling-failure-in-pipelines]]
|
|
|
-== Handling Failures in Pipelines
|
|
|
-
|
|
|
-In its simplest use case, a pipeline defines a list of processors that
|
|
|
-are executed sequentially, and processing halts at the first exception. This
|
|
|
-behavior may not be desirable when failures are expected. For example, you may have logs
|
|
|
-that don't match the specified grok expression. Instead of halting execution, you may
|
|
|
-want to index such documents into a separate index.
|
|
|
-
|
|
|
-To enable this behavior, you can use the `on_failure` parameter. The `on_failure` parameter
|
|
|
-defines a list of processors to be executed immediately following the failed processor.
|
|
|
-You can specify this parameter at the pipeline level, as well as at the processor
|
|
|
-level. If a processor specifies an `on_failure` configuration, whether
|
|
|
-it is empty or not, any exceptions that are thrown by the processor are caught, and the
|
|
|
-pipeline continues executing the remaining processors. Because you can define further processors
|
|
|
-within the scope of an `on_failure` statement, you can nest failure handling.
|
|
|
-
|
|
|
-The following example defines a pipeline that renames the `foo` field in
|
|
|
-the processed document to `bar`. If the document does not contain the `foo` field, the processor
|
|
|
-attaches an error message to the document for later analysis within
|
|
|
-Elasticsearch.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "description" : "my first pipeline with handled exceptions",
|
|
|
- "processors" : [
|
|
|
- {
|
|
|
- "rename" : {
|
|
|
- "field" : "foo",
|
|
|
- "target_field" : "bar",
|
|
|
- "on_failure" : [
|
|
|
- {
|
|
|
- "set" : {
|
|
|
- "field" : "error.message",
|
|
|
- "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-The following example defines an `on_failure` block on a whole pipeline to change
|
|
|
-the index to which failed documents get sent.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "description" : "my first pipeline with handled exceptions",
|
|
|
- "processors" : [ ... ],
|
|
|
- "on_failure" : [
|
|
|
- {
|
|
|
- "set" : {
|
|
|
- "field" : "_index",
|
|
|
- "value" : "failed-{{ _index }}"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-Alternatively instead of defining behaviour in case of processor failure, it is also possible
|
|
|
-to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.
|
|
|
-
|
|
|
-In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline
|
|
|
-continues to execute, which in this case means that the pipeline does nothing.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "description" : "my first pipeline with handled exceptions",
|
|
|
- "processors" : [
|
|
|
- {
|
|
|
- "rename" : {
|
|
|
- "field" : "foo",
|
|
|
- "target_field" : "bar",
|
|
|
- "ignore_failure" : true
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-The `ignore_failure` can be set on any processor and defaults to `false`.
|
|
|
-
|
|
|
-[discrete]
|
|
|
-[[accessing-error-metadata]]
|
|
|
-=== Accessing Error Metadata From Processors Handling Exceptions
|
|
|
-
|
|
|
-You may want to retrieve the actual error message that was thrown
|
|
|
-by a failed processor. To do so you can access metadata fields called
|
|
|
-`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
|
|
|
-`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
|
|
|
-These fields are only accessible from within the context of an `on_failure` block.
|
|
|
-
|
|
|
-Here is an updated version of the example that you
|
|
|
-saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
|
|
|
-metadata field to provide the error message.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "description" : "my first pipeline with handled exceptions",
|
|
|
- "processors" : [
|
|
|
- {
|
|
|
- "rename" : {
|
|
|
- "field" : "foo",
|
|
|
- "to" : "bar",
|
|
|
- "on_failure" : [
|
|
|
- {
|
|
|
- "set" : {
|
|
|
- "field" : "error.message",
|
|
|
- "value" : "{{ _ingest.on_failure_message }}"
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
- }
|
|
|
- }
|
|
|
- ]
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-
|
|
|
-include::enrich.asciidoc[]
|
|
|
-
|
|
|
-
|
|
|
-[[ingest-processors]]
|
|
|
-== Processors
|
|
|
-
|
|
|
-All processors are defined in the following way within a pipeline definition:
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "PROCESSOR_NAME" : {
|
|
|
- ... processor configuration options ...
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-Each processor defines its own configuration parameters, but all processors have
|
|
|
-the ability to declare `tag`, `on_failure` and `if` fields. These fields are optional.
|
|
|
-
|
|
|
-A `tag` is simply a string identifier of the specific instantiation of a certain
|
|
|
-processor in a pipeline. The `tag` field does not affect the processor's behavior,
|
|
|
-but is very useful for bookkeeping and tracing errors to specific processors.
|
|
|
-
|
|
|
-The `if` field must contain a script that returns a boolean value. If the script evaluates to `true`
|
|
|
-then the processor will be executed for the given document otherwise it will be skipped.
|
|
|
-The `if` field takes an object with the script fields defined in <<script-processor, script-options>>
|
|
|
-and accesses a read only version of the document via the same `ctx` variable used by scripts in the
|
|
|
-<<script-processor>>.
|
|
|
-
|
|
|
-[source,js]
|
|
|
---------------------------------------------------
|
|
|
-{
|
|
|
- "set": {
|
|
|
- "if": "ctx.foo == 'someValue'",
|
|
|
- "field": "found",
|
|
|
- "value": true
|
|
|
- }
|
|
|
-}
|
|
|
---------------------------------------------------
|
|
|
-// NOTCONSOLE
|
|
|
-
|
|
|
-See <<ingest-conditionals>> to learn more about the `if` field and conditional execution.
|
|
|
-
|
|
|
-See <<handling-failure-in-pipelines>> to learn more about the `on_failure` field and error handling in pipelines.
|
|
|
-
|
|
|
-The <<cluster-nodes-info,node info API>> will provide a per node list of what processors are available.
|
|
|
-
|
|
|
-Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipeline
|
|
|
-doesn't exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding
|
|
|
-`plugin.mandatory` setting to the `config/elasticsearch.yml` file, for example:
|
|
|
-
|
|
|
-[source,yaml]
|
|
|
---------------------------------------------------
|
|
|
-plugin.mandatory: ingest-attachment
|
|
|
---------------------------------------------------
|
|
|
-
|
|
|
-A node will not start if this plugin is not available.
|
|
|
-
|
|
|
-The <<cluster-nodes-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a per
|
|
|
-pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
|
|
|
-
|
|
|
-[discrete]
|
|
|
-=== Ingest Processor Plugins
|
|
|
-
|
|
|
-Additional ingest processors can be implemented and installed as Elasticsearch {plugins}/intro.html[plugins].
|
|
|
-See {plugins}/ingest.html[Ingest plugins] for information about the available ingest plugins.
|
|
|
-
|
|
|
-include::processors/append.asciidoc[]
|
|
|
-include::processors/bytes.asciidoc[]
|
|
|
-include::processors/circle.asciidoc[]
|
|
|
-include::processors/community-id.asciidoc[]
|
|
|
-include::processors/convert.asciidoc[]
|
|
|
-include::processors/csv.asciidoc[]
|
|
|
-include::processors/date.asciidoc[]
|
|
|
-include::processors/date-index-name.asciidoc[]
|
|
|
-include::processors/dissect.asciidoc[]
|
|
|
-include::processors/dot-expand.asciidoc[]
|
|
|
-include::processors/drop.asciidoc[]
|
|
|
-include::processors/enrich.asciidoc[]
|
|
|
-include::processors/fail.asciidoc[]
|
|
|
-include::processors/fingerprint.asciidoc[]
|
|
|
-include::processors/foreach.asciidoc[]
|
|
|
-include::processors/geoip.asciidoc[]
|
|
|
-include::processors/grok.asciidoc[]
|
|
|
-include::processors/gsub.asciidoc[]
|
|
|
-include::processors/html_strip.asciidoc[]
|
|
|
-include::processors/inference.asciidoc[]
|
|
|
-include::processors/join.asciidoc[]
|
|
|
-include::processors/json.asciidoc[]
|
|
|
-include::processors/kv.asciidoc[]
|
|
|
-include::processors/lowercase.asciidoc[]
|
|
|
-include::processors/network-direction.asciidoc[]
|
|
|
-include::processors/pipeline.asciidoc[]
|
|
|
-include::processors/remove.asciidoc[]
|
|
|
-include::processors/rename.asciidoc[]
|
|
|
-include::processors/script.asciidoc[]
|
|
|
-include::processors/set.asciidoc[]
|
|
|
-include::processors/set-security-user.asciidoc[]
|
|
|
-include::processors/sort.asciidoc[]
|
|
|
-include::processors/split.asciidoc[]
|
|
|
-include::processors/trim.asciidoc[]
|
|
|
-include::processors/uppercase.asciidoc[]
|
|
|
-include::processors/url-decode.asciidoc[]
|
|
|
-include::processors/uri-parts.asciidoc[]
|
|
|
-include::processors/user-agent.asciidoc[]
|