| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883 | [[pipeline]]== Pipeline DefinitionA pipeline is a definition of  a series of <<ingest-processors, processors>> that are to be executedin the same order as they are declared. A pipeline consists of two main fields: a `description`and a list of `processors`:[source,js]--------------------------------------------------{  "description" : "...",  "processors" : [ ... ]}--------------------------------------------------// NOTCONSOLEThe `description` is a special field to store a helpful description ofwhat the pipeline does.The `processors` parameter defines a list of processors to be executed inorder.[[ingest-apis]]== Ingest APIsThe following ingest APIs are available for managing pipelines:* <<put-pipeline-api>> to add or update a pipeline* <<get-pipeline-api>> to return a specific pipeline* <<delete-pipeline-api>> to delete a pipeline* <<simulate-pipeline-api>> to simulate a call to a pipelineinclude::apis/put-pipeline.asciidoc[]include::apis/get-pipeline.asciidoc[]include::apis/delete-pipeline.asciidoc[]include::apis/simulate-pipeline.asciidoc[][[accessing-data-in-pipelines]]== Accessing Data in PipelinesThe processors in a pipeline have read and write access to documents that pass through the pipeline.The processors can access fields in the source of a document and the document's metadata fields.[float][[accessing-source-fields]]=== Accessing Fields in the SourceAccessing a field in the source is straightforward. You simply refer to fields bytheir name. For example:[source,js]--------------------------------------------------{  "set": {    "field": "my_field",    "value": 582.1  }}--------------------------------------------------// NOTCONSOLEOn top of this, fields from the source are always accessible via the `_source` prefix:[source,js]--------------------------------------------------{  "set": {    "field": "_source.my_field",    "value": 582.1  }}--------------------------------------------------// NOTCONSOLE[float][[accessing-metadata-fields]]=== Accessing Metadata FieldsYou can access metadata fields in the same way that you access fields in the source. Thisis possible because Elasticsearch doesn't allow fields in the source that have thesame name as metadata fields.The following example sets the `_id` metadata field of a document to `1`:[source,js]--------------------------------------------------{  "set": {    "field": "_id",    "value": "1"  }}--------------------------------------------------// NOTCONSOLEThe following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`.[float][[accessing-ingest-metadata]]=== Accessing Ingest Metadata FieldsBeyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestampunder the `_ingest.timestamp` key of the ingest metadata. The ingest timestamp is the time when Elasticsearchreceived the index or bulk request to pre-process the document.Any processor can add ingest-related metadata during document processing. Ingest metadata is transientand is lost after a document has been processed by the pipeline. Therefore, ingest metadata won't be indexed.The following example adds a field with the name `received`. The value is the ingest timestamp:[source,js]--------------------------------------------------{  "set": {    "field": "received",    "value": "{{_ingest.timestamp}}"  }}--------------------------------------------------// NOTCONSOLEUnlike Elasticsearch metadata fields, the ingest metadata field name `_ingest` can be used as a valid field namein the source of a document. Use `_source._ingest` to refer to the field in the source document. Otherwise, `_ingest`will be interpreted as an ingest metadata field.[float][[accessing-template-fields]]=== Accessing Fields and Metafields in TemplatesA number of processor settings also support templating. Settings that support templating can have zero or moretemplate snippets. A template snippet begins with `{{` and ends with `}}`.Accessing fields and metafields in templates is exactly the same as via regular processor field settings.The following example adds a field named `field_c`. Its value is a concatenation ofthe values of `field_a` and `field_b`.[source,js]--------------------------------------------------{  "set": {    "field": "field_c",    "value": "{{field_a}} {{field_b}}"  }}--------------------------------------------------// NOTCONSOLEThe following example uses the value of the `geoip.country_iso_code` field in the sourceto set the index that the document will be indexed into:[source,js]--------------------------------------------------{  "set": {    "field": "_index",    "value": "{{geoip.country_iso_code}}"  }}--------------------------------------------------// NOTCONSOLEDynamic field names are also supported. This example sets the field named after thevalue of `service` to the value of the field `code`:[source,js]--------------------------------------------------{  "set": {    "field": "{{service}}",    "value": "{{code}}"  }}--------------------------------------------------// NOTCONSOLE[[ingest-conditionals]]== Conditional Execution in PipelinesEach processor allows for an optional `if` condition to determine if thatprocessor should be executed or skipped. The value of the `if` is a<<modules-scripting-painless, Painless>> script that needs to evaluateto `true` or `false`.For example the following processor will <<drop-processor,drop>> the document(i.e. not index it) if the input document has a field named `network_name`and it is equal to `Guest`.[source,js]--------------------------------------------------PUT _ingest/pipeline/drop_guests_network{  "processors": [    {      "drop": {        "if": "ctx.network_name == 'Guest'"      }    }  ]}--------------------------------------------------// CONSOLEUsing that pipeline for an index request:[source,js]--------------------------------------------------POST test/_doc/1?pipeline=drop_guests_network{  "network_name" : "Guest"}--------------------------------------------------// CONSOLE// TEST[continued]Results in nothing indexed since the conditional evaluated to `true`.[source,js]--------------------------------------------------{  "_index": "test",  "_type": "_doc",  "_id": "1",  "_version": -3,  "result": "noop",  "_shards": {    "total": 0,    "successful": 0,    "failed": 0  }}--------------------------------------------------// TESTRESPONSE[[ingest-conditional-nullcheck]]=== Handling Nested Fields in ConditionalsSource documents often contain nested fields. Care should be takento avoid NullPointerExceptions if the parent object does not existin the document. For example `ctx.a.b.c` can throw an NullPointerExceptionsif the source document does not have top level `a` object, or a secondlevel `b` object.To help protect against NullPointerExceptions, null safe operations should be used.Fortunately, Painless makes {painless}/painless-operators-reference.html#null-safe-operator[null safe]operations easy with the `?.` operator.[source,js]--------------------------------------------------PUT _ingest/pipeline/drop_guests_network{  "processors": [    {      "drop": {        "if": "ctx.network?.name == 'Guest'"      }    }  ]}--------------------------------------------------// CONSOLEThe following document will get <<drop-processor,dropped>> correctly:[source,js]--------------------------------------------------POST test/_doc/1?pipeline=drop_guests_network{  "network": {    "name": "Guest"  }}--------------------------------------------------// CONSOLE// TEST[continued]Thanks to the `?.` operator the following document will not throw an error.If the pipeline used a `.` the following document would throw a NullPointerExceptionsince the `network` object is not part of the source document.[source,js]--------------------------------------------------POST test/_doc/2?pipeline=drop_guests_network{  "foo" : "bar"}--------------------------------------------------// CONSOLE// TEST[continued]////Hidden example assertion:[source,js]--------------------------------------------------GET test/_doc/2--------------------------------------------------// CONSOLE// TEST[continued][source,js]--------------------------------------------------{  "_index": "test",  "_type": "_doc",  "_id": "2",  "_version": 1,  "_seq_no": 22,  "_primary_term": 1,  "found": true,  "_source": {    "foo": "bar"  }}--------------------------------------------------// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term": 1/"_primary_term" : $body._primary_term/]////The source document can also use dot delimited fields to represent nested fields.For example instead the source document defining the fields nested:[source,js]--------------------------------------------------{  "network": {    "name": "Guest"  }}--------------------------------------------------// NOTCONSOLEThe source document may have the nested fields flattened as such:[source,js]--------------------------------------------------{  "network.name": "Guest"}--------------------------------------------------// NOTCONSOLEIf this is the case, use the <<dot-expand-processor, Dot Expand Processor>>so that the nested fields may be used in a conditional.[source,js]--------------------------------------------------PUT _ingest/pipeline/drop_guests_network{  "processors": [    {      "dot_expander": {        "field": "network.name"      }    },    {      "drop": {        "if": "ctx.network?.name == 'Guest'"      }    }  ]}--------------------------------------------------// CONSOLENow the following input document can be used with a conditional in the pipeline.[source,js]--------------------------------------------------POST test/_doc/3?pipeline=drop_guests_network{  "network.name": "Guest"}--------------------------------------------------// CONSOLE// TEST[continued]The `?.` operators works well for use in the `if` conditionalbecause the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]returns null if the object is null and `==` is null safe (as well as many other{painless}/painless-operators.html[painless operators]).However, calling a method such as `.equalsIgnoreCase` is not null safeand can result in a NullPointerException.Some situations allow for the same functionality but done so in a null safe manner.For example: `'Guest'.equalsIgnoreCase(ctx.network?.name)` is null safe because`Guest` is always non null, but `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safesince `ctx.network?.name` can return null.Some situations require an explicit null check. In the following example thereis not null safe alternative, so an explicit null check is needed.[source,js]--------------------------------------------------{  "drop": {    "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"  }}--------------------------------------------------// NOTCONSOLE[[ingest-conditional-complex]]=== Complex ConditionalsThe `if` condition can be more then a simple equality check.The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available andrunning in the {painless}/painless-ingest-processor-context.html[ingest processor context].IMPORTANT: The value of ctx is read-only in `if` conditions. A more complex `if` condition that drops the document (i.e. not index it)unless it has a multi-valued tag field with at least one value that contains the characters`prod` (case insensitive).[source,js]--------------------------------------------------PUT _ingest/pipeline/not_prod_dropper{  "processors": [    {      "drop": {        "if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"      }    }  ]}--------------------------------------------------// CONSOLEThe conditional needs to be all on one line since JSON does notsupport new line characters. However, Kibana's console supportsa triple quote syntax to help with writing and debuggingscripts like these.[source,js]--------------------------------------------------PUT _ingest/pipeline/not_prod_dropper{  "processors": [    {      "drop": {        "if": """            Collection tags = ctx.tags;            if(tags != null){              for (String tag : tags) {                  if (tag.toLowerCase().contains('prod')) {                      return false;                  }              }            }            return true;        """      }    }  ]}--------------------------------------------------// NOTCONSOLE// TEST[continued][source,js]--------------------------------------------------POST test/_doc/1?pipeline=not_prod_dropper{  "tags": ["application:myapp", "env:Stage"]}--------------------------------------------------// CONSOLE// TEST[continued]The document is <<drop-processor,dropped>> since `prod` (case insensitive)is not found in the tags.The following document is indexed (i.e. not dropped) since`prod` (case insensitive) is found in the tags.[source,js]--------------------------------------------------POST test/_doc/2?pipeline=not_prod_dropper{  "tags": ["application:myapp", "env:Production"]}--------------------------------------------------// CONSOLE// TEST[continued]////Hidden example assertion:[source,js]--------------------------------------------------GET test/_doc/2--------------------------------------------------// CONSOLE// TEST[continued][source,js]--------------------------------------------------{  "_index": "test",  "_type": "_doc",  "_id": "2",  "_version": 1,  "_seq_no": 34,  "_primary_term": 1,  "found": true,  "_source": {    "tags": [      "application:myapp",      "env:Production"    ]  }}--------------------------------------------------// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]////The <<simulate-pipeline-api>> with verbose can be used to help build outcomplex conditionals. If the conditional evaluates to false it will beomitted from the verbose results of the simulation since the document will not change.Care should be taken to avoid overly complex or expensive conditional checkssince the condition needs to be checked for each and every document.[[conditionals-with-multiple-pipelines]]=== Conditionals with the Pipeline ProcessorThe combination of the `if` conditional and the <<pipeline-processor>> can result in a simple,yet powerful means to process heterogeneous input. For example, you can define a single pipelinethat delegates to other pipelines based on some criteria.[source,js]--------------------------------------------------PUT _ingest/pipeline/logs_pipeline{  "description": "A pipeline of pipelines for log files",  "version": 1,  "processors": [    {      "pipeline": {        "if": "ctx.service?.name == 'apache_httpd'",        "name": "httpd_pipeline"      }    },    {      "pipeline": {        "if": "ctx.service?.name == 'syslog'",        "name": "syslog_pipeline"      }    },    {      "fail": {        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"      }    }  ]}--------------------------------------------------// CONSOLEThe above example allows consumers to point to a single pipeline for all log based index requests.Based on the conditional, the correct pipeline will be called to process that type of data.This pattern works well with a <<dynamic-index-settings, default pipeline>> defined in an index mappingtemplate for all indexes that hold data that needs pre-index processing.[[conditionals-with-regex]]=== Conditionals with the Regular ExpressionsThe `if` conditional is implemented as a Painless script, which requires{painless}//painless-regexes.html[explicit support for regular expressions].`script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regularexpressions in the `if` condition.If regular expressions are enabled, operators such as `=~` can be used against a `/pattern/` for conditions.For example:[source,js]--------------------------------------------------PUT _ingest/pipeline/check_url{  "processors": [    {      "set": {        "if": "ctx.href?.url =~ /^http[^s]/",        "field": "href.insecure",        "value": true      }    }  ]}--------------------------------------------------// CONSOLE[source,js]--------------------------------------------------POST test/_doc/1?pipeline=check_url{  "href": {    "url": "http://www.elastic.co/"  }}--------------------------------------------------// CONSOLE// TEST[continued]Results in:////Hidden example assertion:[source,js]--------------------------------------------------GET test/_doc/1--------------------------------------------------// CONSOLE// TEST[continued]////[source,js]--------------------------------------------------{  "_index": "test",  "_type": "_doc",  "_id": "1",  "_version": 1,  "_seq_no": 60,  "_primary_term": 1,  "found": true,  "_source": {    "href": {      "insecure": true,      "url": "http://www.elastic.co/"    }  }}--------------------------------------------------// TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]Regular expressions can be expensive and should be avoided if viablealternatives exist.For example in this case `startsWith` can be used to get the same resultwithout using a regular expression:[source,js]--------------------------------------------------PUT _ingest/pipeline/check_url{  "processors": [    {      "set": {        "if": "ctx.href?.url != null && ctx.href.url.startsWith('http://')",        "field": "href.insecure",        "value": true      }    }  ]}--------------------------------------------------// CONSOLE[[handling-failure-in-pipelines]]== Handling Failures in PipelinesIn its simplest use case, a pipeline defines a list of processors thatare executed sequentially, and processing halts at the first exception. Thisbehavior may not be desirable when failures are expected. For example, you may have logsthat don't match the specified grok expression. Instead of halting execution, you maywant to index such documents into a separate index.To enable this behavior, you can use the `on_failure` parameter. The `on_failure` parameterdefines a list of processors to be executed immediately following the failed processor.You can specify this parameter at the pipeline level, as well as at the processorlevel. If a processor specifies an `on_failure` configuration, whetherit is empty or not, any exceptions that are thrown by the processor are caught, and thepipeline continues executing the remaining processors. Because you can define further processorswithin the scope of an `on_failure` statement, you can nest failure handling.The following example defines a pipeline that renames the `foo` field inthe processed document to `bar`. If the document does not contain the `foo` field, the processorattaches an error message to the document for later analysis withinElasticsearch.[source,js]--------------------------------------------------{  "description" : "my first pipeline with handled exceptions",  "processors" : [    {      "rename" : {        "field" : "foo",        "target_field" : "bar",        "on_failure" : [          {            "set" : {              "field" : "error",              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""            }          }        ]      }    }  ]}--------------------------------------------------// NOTCONSOLEThe following example defines an `on_failure` block on a whole pipeline to changethe index to which failed documents get sent.[source,js]--------------------------------------------------{  "description" : "my first pipeline with handled exceptions",  "processors" : [ ... ],  "on_failure" : [    {      "set" : {        "field" : "_index",        "value" : "failed-{{ _index }}"      }    }  ]}--------------------------------------------------// NOTCONSOLEAlternatively instead of defining behaviour in case of processor failure, it is also possibleto ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.In case in the example below the field `foo` doesn't exist the failure will be caught and the pipelinecontinues to execute, which in this case means that the pipeline does nothing.[source,js]--------------------------------------------------{  "description" : "my first pipeline with handled exceptions",  "processors" : [    {      "rename" : {        "field" : "foo",        "target_field" : "bar",        "ignore_failure" : true      }    }  ]}--------------------------------------------------// NOTCONSOLEThe `ignore_failure` can be set on any processor and defaults to `false`.[float][[accessing-error-metadata]]=== Accessing Error Metadata From Processors Handling ExceptionsYou may want to retrieve the actual error message that was thrownby a failed processor. To do so you can access metadata fields called`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessiblefrom within the context of an `on_failure` block.Here is an updated version of the example that yousaw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`metadata field to provide the error message.[source,js]--------------------------------------------------{  "description" : "my first pipeline with handled exceptions",  "processors" : [    {      "rename" : {        "field" : "foo",        "to" : "bar",        "on_failure" : [          {            "set" : {              "field" : "error",              "value" : "{{ _ingest.on_failure_message }}"            }          }        ]      }    }  ]}--------------------------------------------------// NOTCONSOLE[[ingest-processors]]== ProcessorsAll processors are defined in the following way within a pipeline definition:[source,js]--------------------------------------------------{  "PROCESSOR_NAME" : {    ... processor configuration options ...  }}--------------------------------------------------// NOTCONSOLEEach processor defines its own configuration parameters, but all processors havethe ability to declare `tag`, `on_failure` and `if` fields. These fields are optional.A `tag` is simply a string identifier of the specific instantiation of a certainprocessor in a pipeline. The `tag` field does not affect the processor's behavior,but is very useful for bookkeeping and tracing errors to specific processors.The `if` field must contain a script that returns a boolean value. If the script evaluates to `true`then the processor will be executed for the given document otherwise it will be skipped.The `if` field takes an object with the script fields defined in <<script-processor, script-options>>and accesses a read only version of the document via the same `ctx` variable used by scripts in the<<script-processor>>.[source,js]--------------------------------------------------{  "set": {    "if": "ctx.foo == 'someValue'",    "field": "found",    "value": true  }}--------------------------------------------------// NOTCONSOLESee <<ingest-conditionals>> to learn more about the `if` field and conditional execution.See <<handling-failure-in-pipelines>> to learn more about the `on_failure` field and error handling in pipelines.The <<ingest-info,node info API>> can be used to figure out what processors are available in a cluster.The <<ingest-info,node info API>> will provide a per node list of what processors are available.Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipelinedoesn't exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding`plugin.mandatory` setting to the `config/elasticsearch.yml` file, for example:[source,yaml]--------------------------------------------------plugin.mandatory: ingest-attachment--------------------------------------------------A node will not start if this plugin is not available.The <<ingest-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a perpipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.[float]=== Ingest Processor PluginsAdditional ingest processors can be implemented and installed as Elasticsearch {plugins}/intro.html[plugins].See {plugins}/ingest.html[Ingest plugins] for information about the available ingest plugins.include::processors/append.asciidoc[]include::processors/bytes.asciidoc[]include::processors/convert.asciidoc[]include::processors/date.asciidoc[]include::processors/date-index-name.asciidoc[]include::processors/dissect.asciidoc[]include::processors/dot-expand.asciidoc[]include::processors/drop.asciidoc[]include::processors/fail.asciidoc[]include::processors/foreach.asciidoc[]include::processors/geoip.asciidoc[]include::processors/grok.asciidoc[]include::processors/gsub.asciidoc[]include::processors/html_strip.asciidoc[]include::processors/join.asciidoc[]include::processors/json.asciidoc[]include::processors/kv.asciidoc[]include::processors/lowercase.asciidoc[]include::processors/pipeline.asciidoc[]include::processors/remove.asciidoc[]include::processors/rename.asciidoc[]include::processors/script.asciidoc[]include::processors/set.asciidoc[]include::processors/set-security-user.asciidoc[]include::processors/split.asciidoc[]include::processors/sort.asciidoc[]include::processors/trim.asciidoc[]include::processors/uppercase.asciidoc[]include::processors/url-decode.asciidoc[]include::processors/user-agent.asciidoc[]
 |