123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992 |
- [[ingest]]
- = Ingest pipelines
- Ingest pipelines let you perform common transformations on your data before
- indexing. For example, you can use pipelines to remove fields, extract values
- from text, and enrich your data.
- A pipeline consists of a series of configurable tasks called
- <<processors,processors>>. Each processor runs sequentially, making specific
- changes to incoming documents. After the processors have run, {es} adds the
- transformed documents to your data stream or index.
- image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
- You can create and manage ingest pipelines using {kib}'s **Ingest Pipelines**
- feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in the
- <<cluster-state,cluster state>>.
- [discrete]
- [[ingest-prerequisites]]
- === Prerequisites
- * Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
- processing. To use ingest pipelines, your cluster must have at least one node
- with the `ingest` role. For heavy ingest loads, we recommend creating
- <<node-ingest-node,dedicated ingest nodes>>.
- * If the {es} security features are enabled, you must have the `manage_pipeline`
- <<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
- {kib}'s **Ingest Pipelines** feature, you also need the
- `cluster:monitor/nodes/info` cluster privileges.
- * Pipelines including the `enrich` processor require additional setup. See
- <<ingest-enriching-data>>.
- [discrete]
- [[create-manage-ingest-pipelines]]
- === Create and manage pipelines
- In {kib}, open the main menu and click **Stack Management > Ingest
- Pipelines**. From the list view, you can:
- * View a list of your pipelines and drill down into details
- * Edit or clone existing pipelines
- * Delete pipelines
- [role="screenshot"]
- image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Pipelines list view,align="center"]
- To create a pipeline, click **Create pipeline > New pipeline**. For an example
- tutorial, see <<common-log-format-example>>.
- TIP: The **New pipeline from CSV** option lets you use a CSV to create an ingest
- pipeline that maps custom data to the {ecs-ref}[Elastic Common Schema (ECS)].
- Mapping your custom data to ECS makes the data easier to search and lets you
- reuse visualizations from other datasets. To get started, check
- {ecs-ref}/ecs-converting.html[Map custom data to ECS].
- You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
- The following <<put-pipeline-api,create pipeline API>> request creates
- a pipeline containing two <<set-processor,`set`>> processors followed by a
- <<lowercase-processor,`lowercase`>> processor. The processors run sequentially
- in the order specified.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "description": "My optional pipeline description",
- "processors": [
- {
- "set": {
- "description": "My optional processor description",
- "field": "my-long-field",
- "value": 10
- }
- },
- {
- "set": {
- "description": "Set 'my-boolean-field' to true",
- "field": "my-boolean-field",
- "value": true
- }
- },
- {
- "lowercase": {
- "field": "my-keyword-field"
- }
- }
- ]
- }
- ----
- // TESTSETUP
- [discrete]
- [[manage-pipeline-versions]]
- === Manage pipeline versions
- When you create or update a pipeline, you can specify an optional `version`
- integer. You can use this version number with the
- <<put-pipeline-api-query-params,`if_version`>> parameter to conditionally
- update the pipeline. When the `if_version` parameter is specified, a successful
- update increments the pipeline's version.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline-id
- {
- "version": 1,
- "processors": [ ... ]
- }
- ----
- // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
- To unset the `version` number using the API, replace or update the pipeline
- without specifying the `version` parameter.
- [discrete]
- [[test-pipeline]]
- === Test a pipeline
- Before using a pipeline in production, we recommend you test it using sample
- documents. When creating or editing a pipeline in {kib}, click **Add
- documents**. In the **Documents** tab, provide sample documents and click **Run
- the pipeline**.
- [role="screenshot"]
- image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
- You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
- API>>. You can specify a configured pipeline in the request path. For example,
- the following request tests `my-pipeline`.
- [source,console]
- ----
- POST _ingest/pipeline/my-pipeline/_simulate
- {
- "docs": [
- {
- "_source": {
- "my-keyword-field": "FOO"
- }
- },
- {
- "_source": {
- "my-keyword-field": "BAR"
- }
- }
- ]
- }
- ----
- Alternatively, you can specify a pipeline and its processors in the request
- body.
- [source,console]
- ----
- POST _ingest/pipeline/_simulate
- {
- "pipeline": {
- "processors": [
- {
- "lowercase": {
- "field": "my-keyword-field"
- }
- }
- ]
- },
- "docs": [
- {
- "_source": {
- "my-keyword-field": "FOO"
- }
- },
- {
- "_source": {
- "my-keyword-field": "BAR"
- }
- }
- ]
- }
- ----
- The API returns transformed documents:
- [source,console-result]
- ----
- {
- "docs": [
- {
- "doc": {
- "_index": "_index",
- "_id": "_id",
- "_version": "-3",
- "_source": {
- "my-keyword-field": "foo"
- },
- "_ingest": {
- "timestamp": "2099-03-07T11:04:03.000Z"
- }
- }
- },
- {
- "doc": {
- "_index": "_index",
- "_id": "_id",
- "_version": "-3",
- "_source": {
- "my-keyword-field": "bar"
- },
- "_ingest": {
- "timestamp": "2099-03-07T11:04:04.000Z"
- }
- }
- }
- ]
- }
- ----
- // TESTRESPONSE[s/"2099-03-07T11:04:03.000Z"/$body.docs.0.doc._ingest.timestamp/]
- // TESTRESPONSE[s/"2099-03-07T11:04:04.000Z"/$body.docs.1.doc._ingest.timestamp/]
- [discrete]
- [[add-pipeline-to-indexing-request]]
- === Add a pipeline to an indexing request
- Use the `pipeline` query parameter to apply a pipeline to documents in
- <<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
- [source,console]
- ----
- POST my-data-stream/_doc?pipeline=my-pipeline
- {
- "@timestamp": "2099-03-07T11:04:05.000Z",
- "my-keyword-field": "foo"
- }
- PUT my-data-stream/_bulk?pipeline=my-pipeline
- { "create":{ } }
- { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
- { "create":{ } }
- { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
- ----
- // TEST[setup:my_data_stream]
- // TEST[teardown:data_stream_cleanup]
- You can also use the `pipeline` parameter with the <<docs-update-by-query,update
- by query>> or <<docs-reindex,reindex>> APIs.
- [source,console]
- ----
- POST my-data-stream/_update_by_query?pipeline=my-pipeline
- POST _reindex
- {
- "source": {
- "index": "my-data-stream"
- },
- "dest": {
- "index": "my-new-data-stream",
- "op_type": "create",
- "pipeline": "my-pipeline"
- }
- }
- ----
- // TEST[setup:my_data_stream]
- // TEST[teardown:data_stream_cleanup]
- [discrete]
- [[set-default-pipeline]]
- === Set a default pipeline
- Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
- a default pipeline. {es} applies this pipeline to indexing requests if no
- `pipeline` parameter is specified.
- [discrete]
- [[set-final-pipeline]]
- === Set a final pipeline
- Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
- final pipeline. {es} applies this pipeline after the request or default
- pipeline, even if neither is specified.
- [discrete]
- [[pipelines-for-beats]]
- === Pipelines for {beats}
- To add an ingest pipeline to an Elastic Beat, specify the `pipeline`
- parameter under `output.elasticsearch` in `<BEAT_NAME>.yml`. For example,
- for {filebeat}, you'd specify `pipeline` in `filebeat.yml`.
- [source,yaml]
- ----
- output.elasticsearch:
- hosts: ["localhost:9200"]
- pipeline: my-pipeline
- ----
- [discrete]
- [[pipelines-for-fleet-elastic-agent]]
- === Pipelines for {fleet} and {agent}
- {agent} integrations ship with default ingest pipelines that preprocess and enrich data before indexing.
- {fleet-guide}/index.html[{fleet}] applies these pipelines using <<index-templates,index
- templates>> that include <<set-default-pipeline,pipeline index settings>>. {es}
- matches these templates to your {fleet} data streams based on the
- {fleet-guide}/data-streams.html#data-streams-naming-scheme[stream's naming
- scheme].
- Each default integration pipeline calls a nonexistent, unversioned `@custom` ingest pipeline.
- If unaltered, this pipeline call has no effect on your data. However, you can modify this call to
- create custom pipelines for integrations that persist across upgrades.
- Refer to {fleet-guide}/data-streams-pipeline-tutorial.html[Tutorial: Transform data with custom ingest pipelines] to learn more.
- {fleet} doesn't provide a default ingest pipeline for the **Custom logs** integration,
- but you can specify a pipeline for this integration using an
- <<pipeline-custom-logs-index-template,index template>> or a
- <<pipeline-custom-logs-configuration,custom configuration>>.
- [[pipeline-custom-logs-index-template]]
- **Option 1: Index template**
- // tag::create-name-custom-logs-pipeline[]
- . <<create-manage-ingest-pipelines,Create>> and <<test-pipeline,test>> your
- ingest pipeline. Name your pipeline `logs-<dataset-name>-default`. This makes
- tracking the pipeline for your integration easier.
- +
- --
- For example, the following request creates a pipeline for the `my-app` dataset.
- The pipeline's name is `logs-my_app-default`.
- [source,console]
- ----
- PUT _ingest/pipeline/logs-my_app-default
- {
- "description": "Pipeline for `my_app` dataset",
- "processors": [ ... ]
- }
- ----
- // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
- --
- // end::create-name-custom-logs-pipeline[]
- . Create an <<index-templates,index template>> that includes your pipeline in
- the <<index-default-pipeline,`index.default_pipeline`>> or
- <<index-final-pipeline,`index.final_pipeline`>> index setting. Ensure the
- template is <<create-index-template,data stream enabled>>. The
- template's index pattern should match `logs-<dataset-name>-*`.
- +
- --
- You can create this template using {kib}'s <<manage-index-templates,**Index
- Management**>> feature or the <<indices-put-template,create index template
- API>>.
- For example, the following request creates a template matching `logs-my_app-*`.
- The template uses a component template that contains the
- `index.default_pipeline` index setting.
- [source,console]
- ----
- # Creates a component template for index settings
- PUT _component_template/logs-my_app-settings
- {
- "template": {
- "settings": {
- "index.default_pipeline": "logs-my_app-default",
- "index.lifecycle.name": "logs"
- }
- }
- }
- # Creates an index template matching `logs-my_app-*`
- PUT _index_template/logs-my_app-template
- {
- "index_patterns": ["logs-my_app-*"],
- "data_stream": { },
- "priority": 500,
- "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
- }
- ----
- // TEST[continued]
- // TEST[s/, "logs-my_app-mappings"//]
- --
- // tag::name-custom-logs-dataset[]
- . When adding or editing your **Custom logs** integration in {fleet},
- click **Configure integration > Custom log file > Advanced options**.
- . In **Dataset name**, specify your dataset's name. {fleet} will add new data
- for the integration to the resulting `logs-<dataset-name>-default` data stream.
- +
- For example, if your dataset's name was `my_app`, {fleet} adds new data to the
- `logs-my_app-default` data stream.
- // end::name-custom-logs-dataset[]
- +
- [role="screenshot"]
- image::images/ingest/custom-logs.png[Set up custom log integration in Fleet,align="center"]
- . Use the <<indices-rollover-index,rollover API>> to roll over your data stream.
- This ensures {es} applies the index template and its pipeline settings to any
- new data for the integration.
- +
- --
- ////
- [source,console]
- ----
- PUT _data_stream/logs-my_app-default
- ----
- // TEST[continued]
- ////
- [source,console]
- ----
- POST logs-my_app-default/_rollover/
- ----
- // TEST[continued]
- ////
- [source,console]
- ----
- DELETE _data_stream/*
- DELETE _index_template/*
- ----
- // TEST[continued]
- ////
- --
- [[pipeline-custom-logs-configuration]]
- **Option 2: Custom configuration**
- include::ingest.asciidoc[tag=create-name-custom-logs-pipeline]
- include::ingest.asciidoc[tag=name-custom-logs-dataset]
- . In **Custom Configurations**, specify your pipeline in the `pipeline` policy
- setting.
- +
- [role="screenshot"]
- image::images/ingest/custom-logs-pipeline.png[Custom pipeline configuration for custom log integration,align="center"]
- **{agent} standalone**
- If you run {agent} standalone, you can apply pipelines using an
- <<index-templates,index template>> that includes the
- <<index-default-pipeline,`index.default_pipeline`>> or
- <<index-final-pipeline,`index.final_pipeline`>> index setting. Alternatively,
- you can specify the `pipeline` policy setting in your `elastic-agent.yml`
- configuration. See {fleet-guide}/install-standalone-elastic-agent.html[Install standalone {agent}s].
- [discrete]
- [[pipelines-in-enterprise-search]]
- === Pipelines in Enterprise Search
- When you create Elasticsearch indices for {enterprise-search-ref}/index.html[Enterprise Search^] use cases, for example, using the {enterprise-search-ref}/crawler.html[web crawler^] or {enterprise-search-ref}/connectors.html[connectors^], these indices are automatically set up with specific ingest pipelines.
- These processors help optimize your content for search.
- Refer to the {enterprise-search-ref}/ingest-pipelines.html[Enterprise Search documentation^] for more information.
- [discrete]
- [[access-source-fields]]
- === Access source fields in a processor
- Processors have read and write access to an incoming document's source fields.
- To access a field key in a processor, use its field name. The following `set`
- processor accesses `my-long-field`.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "field": "my-long-field",
- "value": 10
- }
- }
- ]
- }
- ----
- You can also prepend the `_source` prefix.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "field": "_source.my-long-field",
- "value": 10
- }
- }
- ]
- }
- ----
- Use dot notation to access object fields.
- IMPORTANT: If your document contains flattened objects, use the
- <<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
- ingest processors cannot access flattened objects.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "dot_expander": {
- "description": "Expand 'my-object-field.my-property'",
- "field": "my-object-field.my-property"
- }
- },
- {
- "set": {
- "description": "Set 'my-object-field.my-property' to 10",
- "field": "my-object-field.my-property",
- "value": 10
- }
- }
- ]
- }
- ----
- [[template-snippets]]
- Several processor parameters support https://mustache.github.io[Mustache]
- template snippets. To access field values in a template snippet, enclose the
- field name in triple curly brackets:`{{{field-name}}}`. You can use template
- snippets to dynamically set field names.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "description": "Set dynamic '<service>' field to 'code' value",
- "field": "{{{service}}}",
- "value": "{{{code}}}"
- }
- }
- ]
- }
- ----
- [discrete]
- [[access-metadata-fields]]
- === Access metadata fields in a processor
- Processors can access the following metadata fields by name:
- * `_index`
- * `_id`
- * `_routing`
- * `_dynamic_templates`
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "description": "Set '_routing' to 'geoip.country_iso_code' value",
- "field": "_routing",
- "value": "{{{geoip.country_iso_code}}}"
- }
- }
- ]
- }
- ----
- Use a Mustache template snippet to access metadata field values. For example,
- `{{{_routing}}}` retrieves a document's routing value.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "description": "Use geo_point dynamic template for address field",
- "field": "_dynamic_templates",
- "value": {
- "address": "geo_point"
- }
- }
- }
- ]
- }
- ----
- The set processor above tells ES to use the dynamic template named `geo_point`
- for the field `address` if this field is not defined in the mapping of the index
- yet. This processor overrides the dynamic template for the field `address` if
- already defined in the bulk request, but has no effect on other dynamic
- templates defined in the bulk request.
- WARNING: If you <<create-document-ids-automatically,automatically generate>>
- document IDs, you cannot use `{{{_id}}}` in a processor. {es} assigns
- auto-generated `_id` values after ingest.
- [discrete]
- [[access-ingest-metadata]]
- === Access ingest metadata in a processor
- Ingest processors can add and access ingest metadata using the `_ingest` key.
- Unlike source and metadata fields, {es} does not index ingest metadata fields by
- default. {es} also allows source fields that start with an `_ingest` key. If
- your data includes such source fields, use `_source._ingest` to access them.
- Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
- This field contains a timestamp of when {es} received the document's indexing
- request. To index `_ingest.timestamp` or other ingest metadata fields, use the
- `set` processor.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "description": "Index the ingest timestamp as 'event.ingested'",
- "field": "event.ingested",
- "value": "{{{_ingest.timestamp}}}"
- }
- }
- ]
- }
- ----
- [discrete]
- [[handling-pipeline-failures]]
- === Handling pipeline failures
- A pipeline's processors run sequentially. By default, pipeline processing stops
- when one of these processors fails or encounters an error.
- To ignore a processor failure and run the pipeline's remaining processors, set
- `ignore_failure` to `true`.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "rename": {
- "description": "Rename 'provider' to 'cloud.provider'",
- "field": "provider",
- "target_field": "cloud.provider",
- "ignore_failure": true
- }
- }
- ]
- }
- ----
- Use the `on_failure` parameter to specify a list of processors to run
- immediately after a processor failure. If `on_failure` is specified, {es}
- afterward runs the pipeline's remaining processors, even if the `on_failure`
- configuration is empty.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "rename": {
- "description": "Rename 'provider' to 'cloud.provider'",
- "field": "provider",
- "target_field": "cloud.provider",
- "on_failure": [
- {
- "set": {
- "description": "Set 'error.message'",
- "field": "error.message",
- "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
- "override": false
- }
- }
- ]
- }
- }
- ]
- }
- ----
- Nest a list of `on_failure` processors for nested error handling.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "rename": {
- "description": "Rename 'provider' to 'cloud.provider'",
- "field": "provider",
- "target_field": "cloud.provider",
- "on_failure": [
- {
- "set": {
- "description": "Set 'error.message'",
- "field": "error.message",
- "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
- "override": false,
- "on_failure": [
- {
- "set": {
- "description": "Set 'error.message.multi'",
- "field": "error.message.multi",
- "value": "Document encountered multiple ingest errors",
- "override": true
- }
- }
- ]
- }
- }
- ]
- }
- }
- ]
- }
- ----
- You can also specify `on_failure` for a pipeline. If a processor without an
- `on_failure` value fails, {es} uses this pipeline-level parameter as a fallback.
- {es} will not attempt to run the pipeline's remaining processors.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [ ... ],
- "on_failure": [
- {
- "set": {
- "description": "Index document to 'failed-<index>'",
- "field": "_index",
- "value": "failed-{{{ _index }}}"
- }
- }
- ]
- }
- ----
- // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
- Additional information about the pipeline failure may be available in the
- document metadata fields `on_failure_message`, `on_failure_processor_type`,
- `on_failure_processor_tag`, and `on_failure_pipeline`. These fields are
- accessible only from within an `on_failure` block.
- The following example uses the metadata fields to include information about
- pipeline failures in documents.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [ ... ],
- "on_failure": [
- {
- "set": {
- "description": "Record error information",
- "field": "error_information",
- "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
- }
- }
- ]
- }
- ----
- // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
- [discrete]
- [[conditionally-run-processor]]
- === Conditionally run a processor
- Each processor supports an optional `if` condition, written as a
- {painless}/painless-guide.html[Painless script]. If provided, the processor only
- runs when the `if` condition is `true`.
- IMPORTANT: `if` condition scripts run in Painless's
- {painless}/painless-ingest-processor-context.html[ingest processor context]. In
- `if` conditions, `ctx` values are read-only.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "drop": {
- "description": "Drop documents with 'network.name' of 'Guest'",
- "if": "ctx?.network?.name == 'Guest'"
- }
- }
- ]
- }
- ----
- If the <<script-painless-regex-enabled,`script.painless.regex.enabled`>> cluster
- setting is enabled, you can use regular expressions in your `if` condition
- scripts. For supported syntax, see {painless}/painless-regexes.html[Painless
- regular expressions].
- TIP: If possible, avoid using regular expressions. Expensive regular expressions
- can slow indexing speeds.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "set": {
- "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
- "if": "ctx.url?.scheme =~ /^http[^s]/",
- "field": "url.insecure",
- "value": true
- }
- }
- ]
- }
- ----
- You must specify `if` conditions as valid JSON on a single line. However, you
- can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
- console]'s triple quote syntax to write and debug larger scripts.
- TIP: If possible, avoid using complex or expensive `if` condition scripts.
- Expensive condition scripts can slow indexing speeds.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "drop": {
- "description": "Drop documents that don't contain 'prod' tag",
- "if": """
- Collection tags = ctx.tags;
- if(tags != null){
- for (String tag : tags) {
- if (tag.toLowerCase().contains('prod')) {
- return false;
- }
- }
- }
- return true;
- """
- }
- }
- ]
- }
- ----
- You can also specify a <<script-stored-scripts,stored script>> as the
- `if` condition.
- [source,console]
- ----
- PUT _scripts/my-prod-tag-script
- {
- "script": {
- "lang": "painless",
- "source": """
- Collection tags = ctx.tags;
- if(tags != null){
- for (String tag : tags) {
- if (tag.toLowerCase().contains('prod')) {
- return false;
- }
- }
- }
- return true;
- """
- }
- }
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "drop": {
- "description": "Drop documents that don't contain 'prod' tag",
- "if": { "id": "my-prod-tag-script" }
- }
- }
- ]
- }
- ----
- ////
- [source,console]
- ----
- DELETE _scripts/my-prod-tag-script
- DELETE _ingest/pipeline/my-pipeline
- ----
- // TEST[continued]
- ////
- Incoming documents often contain object fields. If a processor script attempts
- to access a field whose parent object does not exist, {es} returns a
- `NullPointerException`. To avoid these exceptions, use
- {painless}/painless-operators-reference.html#null-safe-operator[null safe
- operators], such as `?.`, and write your scripts to be null safe.
- For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
- `ctx.network?.name` can return null. Rewrite the script as
- `'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
- `Guest` is always non-null.
- If you can't rewrite a script to be null safe, include an explicit null check.
- [source,console]
- ----
- PUT _ingest/pipeline/my-pipeline
- {
- "processors": [
- {
- "drop": {
- "description": "Drop documents that contain 'network.name' of 'Guest'",
- "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
- }
- }
- ]
- }
- ----
- [discrete]
- [[conditionally-apply-pipelines]]
- === Conditionally apply pipelines
- Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
- to apply other pipelines to documents based on your criteria. You can use this
- pipeline as the <<set-default-pipeline,default pipeline>> in an
- <<index-templates,index template>> used to configure multiple data streams or
- indices.
- [source,console]
- ----
- PUT _ingest/pipeline/one-pipeline-to-rule-them-all
- {
- "processors": [
- {
- "pipeline": {
- "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
- "if": "ctx.service?.name == 'apache_httpd'",
- "name": "httpd_pipeline"
- }
- },
- {
- "pipeline": {
- "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
- "if": "ctx.service?.name == 'syslog'",
- "name": "syslog_pipeline"
- }
- },
- {
- "fail": {
- "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
- "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
- "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
- }
- }
- ]
- }
- ----
- [discrete]
- [[get-pipeline-usage-stats]]
- === Get pipeline usage statistics
- Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
- ingest statistics. Use these stats to determine which pipelines run most
- frequently or spend the most time processing.
- [source,console]
- ----
- GET _nodes/stats/ingest?filter_path=nodes.*.ingest
- ----
- include::ingest/common-log-format-example.asciidoc[]
- include::ingest/enrich.asciidoc[]
- include::ingest/processors.asciidoc[]
|