ingest.asciidoc 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992
  1. [[ingest]]
  2. = Ingest pipelines
  3. Ingest pipelines let you perform common transformations on your data before
  4. indexing. For example, you can use pipelines to remove fields, extract values
  5. from text, and enrich your data.
  6. A pipeline consists of a series of configurable tasks called
  7. <<processors,processors>>. Each processor runs sequentially, making specific
  8. changes to incoming documents. After the processors have run, {es} adds the
  9. transformed documents to your data stream or index.
  10. image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
  11. You can create and manage ingest pipelines using {kib}'s **Ingest Pipelines**
  12. feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in the
  13. <<cluster-state,cluster state>>.
  14. [discrete]
  15. [[ingest-prerequisites]]
  16. === Prerequisites
  17. * Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
  18. processing. To use ingest pipelines, your cluster must have at least one node
  19. with the `ingest` role. For heavy ingest loads, we recommend creating
  20. <<node-ingest-node,dedicated ingest nodes>>.
  21. * If the {es} security features are enabled, you must have the `manage_pipeline`
  22. <<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
  23. {kib}'s **Ingest Pipelines** feature, you also need the
  24. `cluster:monitor/nodes/info` cluster privileges.
  25. * Pipelines including the `enrich` processor require additional setup. See
  26. <<ingest-enriching-data>>.
  27. [discrete]
  28. [[create-manage-ingest-pipelines]]
  29. === Create and manage pipelines
  30. In {kib}, open the main menu and click **Stack Management > Ingest
  31. Pipelines**. From the list view, you can:
  32. * View a list of your pipelines and drill down into details
  33. * Edit or clone existing pipelines
  34. * Delete pipelines
  35. [role="screenshot"]
  36. image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Pipelines list view,align="center"]
  37. To create a pipeline, click **Create pipeline > New pipeline**. For an example
  38. tutorial, see <<common-log-format-example>>.
  39. TIP: The **New pipeline from CSV** option lets you use a CSV to create an ingest
  40. pipeline that maps custom data to the {ecs-ref}[Elastic Common Schema (ECS)].
  41. Mapping your custom data to ECS makes the data easier to search and lets you
  42. reuse visualizations from other datasets. To get started, check
  43. {ecs-ref}/ecs-converting.html[Map custom data to ECS].
  44. You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
  45. The following <<put-pipeline-api,create pipeline API>> request creates
  46. a pipeline containing two <<set-processor,`set`>> processors followed by a
  47. <<lowercase-processor,`lowercase`>> processor. The processors run sequentially
  48. in the order specified.
  49. [source,console]
  50. ----
  51. PUT _ingest/pipeline/my-pipeline
  52. {
  53. "description": "My optional pipeline description",
  54. "processors": [
  55. {
  56. "set": {
  57. "description": "My optional processor description",
  58. "field": "my-long-field",
  59. "value": 10
  60. }
  61. },
  62. {
  63. "set": {
  64. "description": "Set 'my-boolean-field' to true",
  65. "field": "my-boolean-field",
  66. "value": true
  67. }
  68. },
  69. {
  70. "lowercase": {
  71. "field": "my-keyword-field"
  72. }
  73. }
  74. ]
  75. }
  76. ----
  77. // TESTSETUP
  78. [discrete]
  79. [[manage-pipeline-versions]]
  80. === Manage pipeline versions
  81. When you create or update a pipeline, you can specify an optional `version`
  82. integer. You can use this version number with the
  83. <<put-pipeline-api-query-params,`if_version`>> parameter to conditionally
  84. update the pipeline. When the `if_version` parameter is specified, a successful
  85. update increments the pipeline's version.
  86. [source,console]
  87. ----
  88. PUT _ingest/pipeline/my-pipeline-id
  89. {
  90. "version": 1,
  91. "processors": [ ... ]
  92. }
  93. ----
  94. // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
  95. To unset the `version` number using the API, replace or update the pipeline
  96. without specifying the `version` parameter.
  97. [discrete]
  98. [[test-pipeline]]
  99. === Test a pipeline
  100. Before using a pipeline in production, we recommend you test it using sample
  101. documents. When creating or editing a pipeline in {kib}, click **Add
  102. documents**. In the **Documents** tab, provide sample documents and click **Run
  103. the pipeline**.
  104. [role="screenshot"]
  105. image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
  106. You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
  107. API>>. You can specify a configured pipeline in the request path. For example,
  108. the following request tests `my-pipeline`.
  109. [source,console]
  110. ----
  111. POST _ingest/pipeline/my-pipeline/_simulate
  112. {
  113. "docs": [
  114. {
  115. "_source": {
  116. "my-keyword-field": "FOO"
  117. }
  118. },
  119. {
  120. "_source": {
  121. "my-keyword-field": "BAR"
  122. }
  123. }
  124. ]
  125. }
  126. ----
  127. Alternatively, you can specify a pipeline and its processors in the request
  128. body.
  129. [source,console]
  130. ----
  131. POST _ingest/pipeline/_simulate
  132. {
  133. "pipeline": {
  134. "processors": [
  135. {
  136. "lowercase": {
  137. "field": "my-keyword-field"
  138. }
  139. }
  140. ]
  141. },
  142. "docs": [
  143. {
  144. "_source": {
  145. "my-keyword-field": "FOO"
  146. }
  147. },
  148. {
  149. "_source": {
  150. "my-keyword-field": "BAR"
  151. }
  152. }
  153. ]
  154. }
  155. ----
  156. The API returns transformed documents:
  157. [source,console-result]
  158. ----
  159. {
  160. "docs": [
  161. {
  162. "doc": {
  163. "_index": "_index",
  164. "_id": "_id",
  165. "_version": "-3",
  166. "_source": {
  167. "my-keyword-field": "foo"
  168. },
  169. "_ingest": {
  170. "timestamp": "2099-03-07T11:04:03.000Z"
  171. }
  172. }
  173. },
  174. {
  175. "doc": {
  176. "_index": "_index",
  177. "_id": "_id",
  178. "_version": "-3",
  179. "_source": {
  180. "my-keyword-field": "bar"
  181. },
  182. "_ingest": {
  183. "timestamp": "2099-03-07T11:04:04.000Z"
  184. }
  185. }
  186. }
  187. ]
  188. }
  189. ----
  190. // TESTRESPONSE[s/"2099-03-07T11:04:03.000Z"/$body.docs.0.doc._ingest.timestamp/]
  191. // TESTRESPONSE[s/"2099-03-07T11:04:04.000Z"/$body.docs.1.doc._ingest.timestamp/]
  192. [discrete]
  193. [[add-pipeline-to-indexing-request]]
  194. === Add a pipeline to an indexing request
  195. Use the `pipeline` query parameter to apply a pipeline to documents in
  196. <<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
  197. [source,console]
  198. ----
  199. POST my-data-stream/_doc?pipeline=my-pipeline
  200. {
  201. "@timestamp": "2099-03-07T11:04:05.000Z",
  202. "my-keyword-field": "foo"
  203. }
  204. PUT my-data-stream/_bulk?pipeline=my-pipeline
  205. { "create":{ } }
  206. { "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
  207. { "create":{ } }
  208. { "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
  209. ----
  210. // TEST[setup:my_data_stream]
  211. // TEST[teardown:data_stream_cleanup]
  212. You can also use the `pipeline` parameter with the <<docs-update-by-query,update
  213. by query>> or <<docs-reindex,reindex>> APIs.
  214. [source,console]
  215. ----
  216. POST my-data-stream/_update_by_query?pipeline=my-pipeline
  217. POST _reindex
  218. {
  219. "source": {
  220. "index": "my-data-stream"
  221. },
  222. "dest": {
  223. "index": "my-new-data-stream",
  224. "op_type": "create",
  225. "pipeline": "my-pipeline"
  226. }
  227. }
  228. ----
  229. // TEST[setup:my_data_stream]
  230. // TEST[teardown:data_stream_cleanup]
  231. [discrete]
  232. [[set-default-pipeline]]
  233. === Set a default pipeline
  234. Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
  235. a default pipeline. {es} applies this pipeline to indexing requests if no
  236. `pipeline` parameter is specified.
  237. [discrete]
  238. [[set-final-pipeline]]
  239. === Set a final pipeline
  240. Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
  241. final pipeline. {es} applies this pipeline after the request or default
  242. pipeline, even if neither is specified.
  243. [discrete]
  244. [[pipelines-for-beats]]
  245. === Pipelines for {beats}
  246. To add an ingest pipeline to an Elastic Beat, specify the `pipeline`
  247. parameter under `output.elasticsearch` in `<BEAT_NAME>.yml`. For example,
  248. for {filebeat}, you'd specify `pipeline` in `filebeat.yml`.
  249. [source,yaml]
  250. ----
  251. output.elasticsearch:
  252. hosts: ["localhost:9200"]
  253. pipeline: my-pipeline
  254. ----
  255. [discrete]
  256. [[pipelines-for-fleet-elastic-agent]]
  257. === Pipelines for {fleet} and {agent}
  258. {agent} integrations ship with default ingest pipelines that preprocess and enrich data before indexing.
  259. {fleet-guide}/index.html[{fleet}] applies these pipelines using <<index-templates,index
  260. templates>> that include <<set-default-pipeline,pipeline index settings>>. {es}
  261. matches these templates to your {fleet} data streams based on the
  262. {fleet-guide}/data-streams.html#data-streams-naming-scheme[stream's naming
  263. scheme].
  264. Each default integration pipeline calls a nonexistent, unversioned `@custom` ingest pipeline.
  265. If unaltered, this pipeline call has no effect on your data. However, you can modify this call to
  266. create custom pipelines for integrations that persist across upgrades.
  267. Refer to {fleet-guide}/data-streams-pipeline-tutorial.html[Tutorial: Transform data with custom ingest pipelines] to learn more.
  268. {fleet} doesn't provide a default ingest pipeline for the **Custom logs** integration,
  269. but you can specify a pipeline for this integration using an
  270. <<pipeline-custom-logs-index-template,index template>> or a
  271. <<pipeline-custom-logs-configuration,custom configuration>>.
  272. [[pipeline-custom-logs-index-template]]
  273. **Option 1: Index template**
  274. // tag::create-name-custom-logs-pipeline[]
  275. . <<create-manage-ingest-pipelines,Create>> and <<test-pipeline,test>> your
  276. ingest pipeline. Name your pipeline `logs-<dataset-name>-default`. This makes
  277. tracking the pipeline for your integration easier.
  278. +
  279. --
  280. For example, the following request creates a pipeline for the `my-app` dataset.
  281. The pipeline's name is `logs-my_app-default`.
  282. [source,console]
  283. ----
  284. PUT _ingest/pipeline/logs-my_app-default
  285. {
  286. "description": "Pipeline for `my_app` dataset",
  287. "processors": [ ... ]
  288. }
  289. ----
  290. // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
  291. --
  292. // end::create-name-custom-logs-pipeline[]
  293. . Create an <<index-templates,index template>> that includes your pipeline in
  294. the <<index-default-pipeline,`index.default_pipeline`>> or
  295. <<index-final-pipeline,`index.final_pipeline`>> index setting. Ensure the
  296. template is <<create-index-template,data stream enabled>>. The
  297. template's index pattern should match `logs-<dataset-name>-*`.
  298. +
  299. --
  300. You can create this template using {kib}'s <<manage-index-templates,**Index
  301. Management**>> feature or the <<indices-put-template,create index template
  302. API>>.
  303. For example, the following request creates a template matching `logs-my_app-*`.
  304. The template uses a component template that contains the
  305. `index.default_pipeline` index setting.
  306. [source,console]
  307. ----
  308. # Creates a component template for index settings
  309. PUT _component_template/logs-my_app-settings
  310. {
  311. "template": {
  312. "settings": {
  313. "index.default_pipeline": "logs-my_app-default",
  314. "index.lifecycle.name": "logs"
  315. }
  316. }
  317. }
  318. # Creates an index template matching `logs-my_app-*`
  319. PUT _index_template/logs-my_app-template
  320. {
  321. "index_patterns": ["logs-my_app-*"],
  322. "data_stream": { },
  323. "priority": 500,
  324. "composed_of": ["logs-my_app-settings", "logs-my_app-mappings"]
  325. }
  326. ----
  327. // TEST[continued]
  328. // TEST[s/, "logs-my_app-mappings"//]
  329. --
  330. // tag::name-custom-logs-dataset[]
  331. . When adding or editing your **Custom logs** integration in {fleet},
  332. click **Configure integration > Custom log file > Advanced options**.
  333. . In **Dataset name**, specify your dataset's name. {fleet} will add new data
  334. for the integration to the resulting `logs-<dataset-name>-default` data stream.
  335. +
  336. For example, if your dataset's name was `my_app`, {fleet} adds new data to the
  337. `logs-my_app-default` data stream.
  338. // end::name-custom-logs-dataset[]
  339. +
  340. [role="screenshot"]
  341. image::images/ingest/custom-logs.png[Set up custom log integration in Fleet,align="center"]
  342. . Use the <<indices-rollover-index,rollover API>> to roll over your data stream.
  343. This ensures {es} applies the index template and its pipeline settings to any
  344. new data for the integration.
  345. +
  346. --
  347. ////
  348. [source,console]
  349. ----
  350. PUT _data_stream/logs-my_app-default
  351. ----
  352. // TEST[continued]
  353. ////
  354. [source,console]
  355. ----
  356. POST logs-my_app-default/_rollover/
  357. ----
  358. // TEST[continued]
  359. ////
  360. [source,console]
  361. ----
  362. DELETE _data_stream/*
  363. DELETE _index_template/*
  364. ----
  365. // TEST[continued]
  366. ////
  367. --
  368. [[pipeline-custom-logs-configuration]]
  369. **Option 2: Custom configuration**
  370. include::ingest.asciidoc[tag=create-name-custom-logs-pipeline]
  371. include::ingest.asciidoc[tag=name-custom-logs-dataset]
  372. . In **Custom Configurations**, specify your pipeline in the `pipeline` policy
  373. setting.
  374. +
  375. [role="screenshot"]
  376. image::images/ingest/custom-logs-pipeline.png[Custom pipeline configuration for custom log integration,align="center"]
  377. **{agent} standalone**
  378. If you run {agent} standalone, you can apply pipelines using an
  379. <<index-templates,index template>> that includes the
  380. <<index-default-pipeline,`index.default_pipeline`>> or
  381. <<index-final-pipeline,`index.final_pipeline`>> index setting. Alternatively,
  382. you can specify the `pipeline` policy setting in your `elastic-agent.yml`
  383. configuration. See {fleet-guide}/install-standalone-elastic-agent.html[Install standalone {agent}s].
  384. [discrete]
  385. [[pipelines-in-enterprise-search]]
  386. === Pipelines in Enterprise Search
  387. When you create Elasticsearch indices for {enterprise-search-ref}/index.html[Enterprise Search^] use cases, for example, using the {enterprise-search-ref}/crawler.html[web crawler^] or {enterprise-search-ref}/connectors.html[connectors^], these indices are automatically set up with specific ingest pipelines.
  388. These processors help optimize your content for search.
  389. Refer to the {enterprise-search-ref}/ingest-pipelines.html[Enterprise Search documentation^] for more information.
  390. [discrete]
  391. [[access-source-fields]]
  392. === Access source fields in a processor
  393. Processors have read and write access to an incoming document's source fields.
  394. To access a field key in a processor, use its field name. The following `set`
  395. processor accesses `my-long-field`.
  396. [source,console]
  397. ----
  398. PUT _ingest/pipeline/my-pipeline
  399. {
  400. "processors": [
  401. {
  402. "set": {
  403. "field": "my-long-field",
  404. "value": 10
  405. }
  406. }
  407. ]
  408. }
  409. ----
  410. You can also prepend the `_source` prefix.
  411. [source,console]
  412. ----
  413. PUT _ingest/pipeline/my-pipeline
  414. {
  415. "processors": [
  416. {
  417. "set": {
  418. "field": "_source.my-long-field",
  419. "value": 10
  420. }
  421. }
  422. ]
  423. }
  424. ----
  425. Use dot notation to access object fields.
  426. IMPORTANT: If your document contains flattened objects, use the
  427. <<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
  428. ingest processors cannot access flattened objects.
  429. [source,console]
  430. ----
  431. PUT _ingest/pipeline/my-pipeline
  432. {
  433. "processors": [
  434. {
  435. "dot_expander": {
  436. "description": "Expand 'my-object-field.my-property'",
  437. "field": "my-object-field.my-property"
  438. }
  439. },
  440. {
  441. "set": {
  442. "description": "Set 'my-object-field.my-property' to 10",
  443. "field": "my-object-field.my-property",
  444. "value": 10
  445. }
  446. }
  447. ]
  448. }
  449. ----
  450. [[template-snippets]]
  451. Several processor parameters support https://mustache.github.io[Mustache]
  452. template snippets. To access field values in a template snippet, enclose the
  453. field name in triple curly brackets:`{{{field-name}}}`. You can use template
  454. snippets to dynamically set field names.
  455. [source,console]
  456. ----
  457. PUT _ingest/pipeline/my-pipeline
  458. {
  459. "processors": [
  460. {
  461. "set": {
  462. "description": "Set dynamic '<service>' field to 'code' value",
  463. "field": "{{{service}}}",
  464. "value": "{{{code}}}"
  465. }
  466. }
  467. ]
  468. }
  469. ----
  470. [discrete]
  471. [[access-metadata-fields]]
  472. === Access metadata fields in a processor
  473. Processors can access the following metadata fields by name:
  474. * `_index`
  475. * `_id`
  476. * `_routing`
  477. * `_dynamic_templates`
  478. [source,console]
  479. ----
  480. PUT _ingest/pipeline/my-pipeline
  481. {
  482. "processors": [
  483. {
  484. "set": {
  485. "description": "Set '_routing' to 'geoip.country_iso_code' value",
  486. "field": "_routing",
  487. "value": "{{{geoip.country_iso_code}}}"
  488. }
  489. }
  490. ]
  491. }
  492. ----
  493. Use a Mustache template snippet to access metadata field values. For example,
  494. `{{{_routing}}}` retrieves a document's routing value.
  495. [source,console]
  496. ----
  497. PUT _ingest/pipeline/my-pipeline
  498. {
  499. "processors": [
  500. {
  501. "set": {
  502. "description": "Use geo_point dynamic template for address field",
  503. "field": "_dynamic_templates",
  504. "value": {
  505. "address": "geo_point"
  506. }
  507. }
  508. }
  509. ]
  510. }
  511. ----
  512. The set processor above tells ES to use the dynamic template named `geo_point`
  513. for the field `address` if this field is not defined in the mapping of the index
  514. yet. This processor overrides the dynamic template for the field `address` if
  515. already defined in the bulk request, but has no effect on other dynamic
  516. templates defined in the bulk request.
  517. WARNING: If you <<create-document-ids-automatically,automatically generate>>
  518. document IDs, you cannot use `{{{_id}}}` in a processor. {es} assigns
  519. auto-generated `_id` values after ingest.
  520. [discrete]
  521. [[access-ingest-metadata]]
  522. === Access ingest metadata in a processor
  523. Ingest processors can add and access ingest metadata using the `_ingest` key.
  524. Unlike source and metadata fields, {es} does not index ingest metadata fields by
  525. default. {es} also allows source fields that start with an `_ingest` key. If
  526. your data includes such source fields, use `_source._ingest` to access them.
  527. Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
  528. This field contains a timestamp of when {es} received the document's indexing
  529. request. To index `_ingest.timestamp` or other ingest metadata fields, use the
  530. `set` processor.
  531. [source,console]
  532. ----
  533. PUT _ingest/pipeline/my-pipeline
  534. {
  535. "processors": [
  536. {
  537. "set": {
  538. "description": "Index the ingest timestamp as 'event.ingested'",
  539. "field": "event.ingested",
  540. "value": "{{{_ingest.timestamp}}}"
  541. }
  542. }
  543. ]
  544. }
  545. ----
  546. [discrete]
  547. [[handling-pipeline-failures]]
  548. === Handling pipeline failures
  549. A pipeline's processors run sequentially. By default, pipeline processing stops
  550. when one of these processors fails or encounters an error.
  551. To ignore a processor failure and run the pipeline's remaining processors, set
  552. `ignore_failure` to `true`.
  553. [source,console]
  554. ----
  555. PUT _ingest/pipeline/my-pipeline
  556. {
  557. "processors": [
  558. {
  559. "rename": {
  560. "description": "Rename 'provider' to 'cloud.provider'",
  561. "field": "provider",
  562. "target_field": "cloud.provider",
  563. "ignore_failure": true
  564. }
  565. }
  566. ]
  567. }
  568. ----
  569. Use the `on_failure` parameter to specify a list of processors to run
  570. immediately after a processor failure. If `on_failure` is specified, {es}
  571. afterward runs the pipeline's remaining processors, even if the `on_failure`
  572. configuration is empty.
  573. [source,console]
  574. ----
  575. PUT _ingest/pipeline/my-pipeline
  576. {
  577. "processors": [
  578. {
  579. "rename": {
  580. "description": "Rename 'provider' to 'cloud.provider'",
  581. "field": "provider",
  582. "target_field": "cloud.provider",
  583. "on_failure": [
  584. {
  585. "set": {
  586. "description": "Set 'error.message'",
  587. "field": "error.message",
  588. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  589. "override": false
  590. }
  591. }
  592. ]
  593. }
  594. }
  595. ]
  596. }
  597. ----
  598. Nest a list of `on_failure` processors for nested error handling.
  599. [source,console]
  600. ----
  601. PUT _ingest/pipeline/my-pipeline
  602. {
  603. "processors": [
  604. {
  605. "rename": {
  606. "description": "Rename 'provider' to 'cloud.provider'",
  607. "field": "provider",
  608. "target_field": "cloud.provider",
  609. "on_failure": [
  610. {
  611. "set": {
  612. "description": "Set 'error.message'",
  613. "field": "error.message",
  614. "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
  615. "override": false,
  616. "on_failure": [
  617. {
  618. "set": {
  619. "description": "Set 'error.message.multi'",
  620. "field": "error.message.multi",
  621. "value": "Document encountered multiple ingest errors",
  622. "override": true
  623. }
  624. }
  625. ]
  626. }
  627. }
  628. ]
  629. }
  630. }
  631. ]
  632. }
  633. ----
  634. You can also specify `on_failure` for a pipeline. If a processor without an
  635. `on_failure` value fails, {es} uses this pipeline-level parameter as a fallback.
  636. {es} will not attempt to run the pipeline's remaining processors.
  637. [source,console]
  638. ----
  639. PUT _ingest/pipeline/my-pipeline
  640. {
  641. "processors": [ ... ],
  642. "on_failure": [
  643. {
  644. "set": {
  645. "description": "Index document to 'failed-<index>'",
  646. "field": "_index",
  647. "value": "failed-{{{ _index }}}"
  648. }
  649. }
  650. ]
  651. }
  652. ----
  653. // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
  654. Additional information about the pipeline failure may be available in the
  655. document metadata fields `on_failure_message`, `on_failure_processor_type`,
  656. `on_failure_processor_tag`, and `on_failure_pipeline`. These fields are
  657. accessible only from within an `on_failure` block.
  658. The following example uses the metadata fields to include information about
  659. pipeline failures in documents.
  660. [source,console]
  661. ----
  662. PUT _ingest/pipeline/my-pipeline
  663. {
  664. "processors": [ ... ],
  665. "on_failure": [
  666. {
  667. "set": {
  668. "description": "Record error information",
  669. "field": "error_information",
  670. "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
  671. }
  672. }
  673. ]
  674. }
  675. ----
  676. // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
  677. [discrete]
  678. [[conditionally-run-processor]]
  679. === Conditionally run a processor
  680. Each processor supports an optional `if` condition, written as a
  681. {painless}/painless-guide.html[Painless script]. If provided, the processor only
  682. runs when the `if` condition is `true`.
  683. IMPORTANT: `if` condition scripts run in Painless's
  684. {painless}/painless-ingest-processor-context.html[ingest processor context]. In
  685. `if` conditions, `ctx` values are read-only.
  686. [source,console]
  687. ----
  688. PUT _ingest/pipeline/my-pipeline
  689. {
  690. "processors": [
  691. {
  692. "drop": {
  693. "description": "Drop documents with 'network.name' of 'Guest'",
  694. "if": "ctx?.network?.name == 'Guest'"
  695. }
  696. }
  697. ]
  698. }
  699. ----
  700. If the <<script-painless-regex-enabled,`script.painless.regex.enabled`>> cluster
  701. setting is enabled, you can use regular expressions in your `if` condition
  702. scripts. For supported syntax, see {painless}/painless-regexes.html[Painless
  703. regular expressions].
  704. TIP: If possible, avoid using regular expressions. Expensive regular expressions
  705. can slow indexing speeds.
  706. [source,console]
  707. ----
  708. PUT _ingest/pipeline/my-pipeline
  709. {
  710. "processors": [
  711. {
  712. "set": {
  713. "description": "If 'url.scheme' is 'http', set 'url.insecure' to true",
  714. "if": "ctx.url?.scheme =~ /^http[^s]/",
  715. "field": "url.insecure",
  716. "value": true
  717. }
  718. }
  719. ]
  720. }
  721. ----
  722. You must specify `if` conditions as valid JSON on a single line. However, you
  723. can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
  724. console]'s triple quote syntax to write and debug larger scripts.
  725. TIP: If possible, avoid using complex or expensive `if` condition scripts.
  726. Expensive condition scripts can slow indexing speeds.
  727. [source,console]
  728. ----
  729. PUT _ingest/pipeline/my-pipeline
  730. {
  731. "processors": [
  732. {
  733. "drop": {
  734. "description": "Drop documents that don't contain 'prod' tag",
  735. "if": """
  736. Collection tags = ctx.tags;
  737. if(tags != null){
  738. for (String tag : tags) {
  739. if (tag.toLowerCase().contains('prod')) {
  740. return false;
  741. }
  742. }
  743. }
  744. return true;
  745. """
  746. }
  747. }
  748. ]
  749. }
  750. ----
  751. You can also specify a <<script-stored-scripts,stored script>> as the
  752. `if` condition.
  753. [source,console]
  754. ----
  755. PUT _scripts/my-prod-tag-script
  756. {
  757. "script": {
  758. "lang": "painless",
  759. "source": """
  760. Collection tags = ctx.tags;
  761. if(tags != null){
  762. for (String tag : tags) {
  763. if (tag.toLowerCase().contains('prod')) {
  764. return false;
  765. }
  766. }
  767. }
  768. return true;
  769. """
  770. }
  771. }
  772. PUT _ingest/pipeline/my-pipeline
  773. {
  774. "processors": [
  775. {
  776. "drop": {
  777. "description": "Drop documents that don't contain 'prod' tag",
  778. "if": { "id": "my-prod-tag-script" }
  779. }
  780. }
  781. ]
  782. }
  783. ----
  784. ////
  785. [source,console]
  786. ----
  787. DELETE _scripts/my-prod-tag-script
  788. DELETE _ingest/pipeline/my-pipeline
  789. ----
  790. // TEST[continued]
  791. ////
  792. Incoming documents often contain object fields. If a processor script attempts
  793. to access a field whose parent object does not exist, {es} returns a
  794. `NullPointerException`. To avoid these exceptions, use
  795. {painless}/painless-operators-reference.html#null-safe-operator[null safe
  796. operators], such as `?.`, and write your scripts to be null safe.
  797. For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
  798. `ctx.network?.name` can return null. Rewrite the script as
  799. `'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
  800. `Guest` is always non-null.
  801. If you can't rewrite a script to be null safe, include an explicit null check.
  802. [source,console]
  803. ----
  804. PUT _ingest/pipeline/my-pipeline
  805. {
  806. "processors": [
  807. {
  808. "drop": {
  809. "description": "Drop documents that contain 'network.name' of 'Guest'",
  810. "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
  811. }
  812. }
  813. ]
  814. }
  815. ----
  816. [discrete]
  817. [[conditionally-apply-pipelines]]
  818. === Conditionally apply pipelines
  819. Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
  820. to apply other pipelines to documents based on your criteria. You can use this
  821. pipeline as the <<set-default-pipeline,default pipeline>> in an
  822. <<index-templates,index template>> used to configure multiple data streams or
  823. indices.
  824. [source,console]
  825. ----
  826. PUT _ingest/pipeline/one-pipeline-to-rule-them-all
  827. {
  828. "processors": [
  829. {
  830. "pipeline": {
  831. "description": "If 'service.name' is 'apache_httpd', use 'httpd_pipeline'",
  832. "if": "ctx.service?.name == 'apache_httpd'",
  833. "name": "httpd_pipeline"
  834. }
  835. },
  836. {
  837. "pipeline": {
  838. "description": "If 'service.name' is 'syslog', use 'syslog_pipeline'",
  839. "if": "ctx.service?.name == 'syslog'",
  840. "name": "syslog_pipeline"
  841. }
  842. },
  843. {
  844. "fail": {
  845. "description": "If 'service.name' is not 'apache_httpd' or 'syslog', return a failure message",
  846. "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  847. "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
  848. }
  849. }
  850. ]
  851. }
  852. ----
  853. [discrete]
  854. [[get-pipeline-usage-stats]]
  855. === Get pipeline usage statistics
  856. Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
  857. ingest statistics. Use these stats to determine which pipelines run most
  858. frequently or spend the most time processing.
  859. [source,console]
  860. ----
  861. GET _nodes/stats/ingest?filter_path=nodes.*.ingest
  862. ----
  863. include::ingest/common-log-format-example.asciidoc[]
  864. include::ingest/enrich.asciidoc[]
  865. include::ingest/processors.asciidoc[]