ingest.asciidoc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. [[ingest]]
  2. = Ingest pipelines
  3. Ingest pipelines let you perform common transformations on your data before
  4. indexing. For example, you can use pipelines to remove fields, extract values
  5. from text, and enrich your data.
  6. A pipeline consists of a series of configurable tasks called
  7. <<processors,processors>>. Each processor runs sequentially, making specific
  8. changes to incoming documents. After the processors have run, {es} adds the
  9. transformed documents to your data stream or index.
  10. image::images/ingest/ingest-process.svg[Ingest pipeline diagram,align="center"]
  11. You can create and manage ingest pipelines using {kib}'s **Ingest Node
  12. Pipelines** feature or the <<ingest-apis,ingest APIs>>. {es} stores pipelines in
  13. the <<cluster-state,cluster state>>.
  14. [discrete]
  15. [[ingest-prerequisites]]
  16. === Prerequisites
  17. * Nodes with the <<node-ingest-node,`ingest`>> node role handle pipeline
  18. processing. To use ingest pipelines, your cluster must have at least one node
  19. with the `ingest` role. For heavy ingest loads, we recommend creating
  20. <<node-ingest-node,dedicated ingest nodes>>.
  21. * If the {es} security features are enabled, you must have the `manage_pipeline`
  22. <<privileges-list-cluster,cluster privilege>> to manage ingest pipelines. To use
  23. {kib}'s **Ingest Node Pipelines** feature, you also need the
  24. `cluster:monitor/nodes/info` cluster privileges.
  25. * Pipelines including the `enrich` processor require additional setup. See
  26. <<ingest-enriching-data>>.
  27. [discrete]
  28. [[create-manage-ingest-pipelines]]
  29. == Create and manage pipelines
  30. In {kib}, open the main menu and click **Stack Management** > **Ingest Node
  31. Pipelines**. From the list view, you can:
  32. * View a list of your pipelines and drill down into details
  33. * Edit or clone existing pipelines
  34. * Delete pipelines
  35. To create a new pipeline, click **Create a pipeline**. For an example tutorial,
  36. see <<common-log-format-example>>.
  37. [role="screenshot"]
  38. image::images/ingest/ingest-pipeline-list.png[Kibana's Ingest Node Pipelines list view,align="center"]
  39. You can also use the <<ingest-apis,ingest APIs>> to create and manage pipelines.
  40. The following <<put-pipeline-api,create pipeline API>> request creates
  41. a pipeline containing two <<set-processor,`set`>> processors followed by a
  42. <<lowercase-processor,`lowercase`>> processor. The processors run sequentially
  43. in the order specified.
  44. [source,console]
  45. ----
  46. PUT _ingest/pipeline/my-pipeline
  47. {
  48. "description": "My pipeline description",
  49. "processors": [
  50. {
  51. "set": {
  52. "field": "my-long-field",
  53. "value": 10
  54. }
  55. },
  56. {
  57. "set": {
  58. "field": "my-boolean-field",
  59. "value": true
  60. }
  61. },
  62. {
  63. "lowercase": {
  64. "field": "my-keyword-field"
  65. }
  66. }
  67. ]
  68. }
  69. ----
  70. // TESTSETUP
  71. [discrete]
  72. [[test-pipeline]]
  73. === Test a pipeline
  74. Before using a pipeline in production, we recommend you test it using sample
  75. documents. When creating or editing a pipeline in {kib}, click **Add
  76. documents**. In the **Documents** tab, provide sample documents and click **Run
  77. the pipeline**.
  78. [role="screenshot"]
  79. image::images/ingest/test-a-pipeline.png[Test a pipeline in Kibana,align="center"]
  80. You can also test pipelines using the <<simulate-pipeline-api,simulate pipeline
  81. API>>.
  82. [source,console]
  83. ----
  84. POST _ingest/pipeline/my-pipeline/_simulate
  85. {
  86. "docs": [
  87. {
  88. "_source": {
  89. "my-keyword-field": "FOO"
  90. }
  91. },
  92. {
  93. "_source": {
  94. "my-keyword-field": "BAR"
  95. }
  96. }
  97. ]
  98. }
  99. ----
  100. The API returns transformed documents:
  101. [source,console-result]
  102. ----
  103. {
  104. "docs": [
  105. {
  106. "doc": {
  107. "_index": "_index",
  108. "_id": "_id",
  109. "_source": {
  110. "my-long-field": 10,
  111. "my-boolean-field": true,
  112. "my-keyword-field": "foo"
  113. },
  114. "_ingest": {
  115. "timestamp": "2099-02-30T22:30:03.187Z"
  116. }
  117. }
  118. },
  119. {
  120. "doc": {
  121. "_index": "_index",
  122. "_id": "_id",
  123. "_source": {
  124. "my-long-field": 10,
  125. "my-boolean-field": true,
  126. "my-keyword-field": "bar"
  127. },
  128. "_ingest": {
  129. "timestamp": "2099-02-30T22:30:03.188Z"
  130. }
  131. }
  132. }
  133. ]
  134. }
  135. ----
  136. // TESTRESPONSE[s/"2099-02-30T22:30:03.187Z"/$body.docs.0.doc._ingest.timestamp/]
  137. // TESTRESPONSE[s/"2099-02-30T22:30:03.188Z"/$body.docs.1.doc._ingest.timestamp/]
  138. [discrete]
  139. [[add-pipeline-to-indexing-request]]
  140. === Add a pipeline to an indexing request
  141. Use the `pipeline` query parameter to apply a pipeline to documents in
  142. <<docs-index_,individual>> or <<docs-bulk,bulk>> indexing requests.
  143. [source,console]
  144. ----
  145. POST my-data-stream/_doc?pipeline=my-pipeline
  146. {
  147. "@timestamp": "2099-03-07T11:04:05.000Z",
  148. "my-keyword-field": "foo"
  149. }
  150. PUT my-data-stream/_bulk?pipeline=my-pipeline
  151. { "create":{ } }
  152. { "@timestamp": "2099-03-08T11:04:05.000Z", "my-keyword-field" : "foo" }
  153. { "create":{ } }
  154. { "@timestamp": "2099-03-08T11:06:07.000Z", "my-keyword-field" : "bar" }
  155. ----
  156. You can also use the `pipeline` parameter with the <<docs-update-by-query,update
  157. by query>> or <<docs-reindex,reindex>> APIs.
  158. [source,console]
  159. ----
  160. POST my-data-stream/_update_by_query?pipeline=my-pipeline
  161. POST _reindex
  162. {
  163. "source": {
  164. "index": "my-data-stream"
  165. },
  166. "dest": {
  167. "index": "my-new-data-stream",
  168. "op_type": "create",
  169. "pipeline": "my-pipeline"
  170. }
  171. }
  172. ----
  173. // TEST[continued]
  174. [discrete]
  175. [[set-default-pipeline]]
  176. === Set a default pipeline
  177. Use the <<index-default-pipeline,`index.default_pipeline`>> index setting to set
  178. a default pipeline. {es} applies this pipeline if no `pipeline` parameter
  179. is specified.
  180. [discrete]
  181. [[set-final-pipeline]]
  182. === Set a final pipeline
  183. Use the <<index-final-pipeline,`index.final_pipeline`>> index setting to set a
  184. final pipeline. {es} applies this pipeline after the request or default
  185. pipeline, even if neither is specified.
  186. [discrete]
  187. [[access-source-fields]]
  188. === Access source fields in a processor
  189. Processors have read and write access to an incoming document's source fields.
  190. To access a field key in a processor, use its field name. The following `set`
  191. processor accesses `my-long-field`.
  192. [source,console]
  193. ----
  194. PUT _ingest/pipeline/my-pipeline
  195. {
  196. "processors": [
  197. {
  198. "set": {
  199. "field": "my-long-field",
  200. "value": 10
  201. }
  202. }
  203. ]
  204. }
  205. ----
  206. You can also prepend the `_source` prefix.
  207. [source,console]
  208. ----
  209. PUT _ingest/pipeline/my-pipeline
  210. {
  211. "processors": [
  212. {
  213. "set": {
  214. "field": "_source.my-long-field",
  215. "value": 10
  216. }
  217. }
  218. ]
  219. }
  220. ----
  221. Use dot notation to access object fields.
  222. IMPORTANT: If your document contains flattened objects, use the
  223. <<dot-expand-processor,`dot_expander`>> processor to expand them first. Other
  224. ingest processors cannot access flattened objects.
  225. [source,console]
  226. ----
  227. PUT _ingest/pipeline/my-pipeline
  228. {
  229. "processors": [
  230. {
  231. "dot_expander": {
  232. "field": "my-object-field.my-property"
  233. }
  234. },
  235. {
  236. "set": {
  237. "field": "my-object-field.my-property",
  238. "value": 10
  239. }
  240. }
  241. ]
  242. }
  243. ----
  244. [[template-snippets]]
  245. To access field values, enclose the field name in double curly brackets `{{ }}`
  246. to create a https://mustache.github.io[Mustache] template snippet. You can use
  247. template snippets to dynamically set field names. The following processor sets a
  248. field name as the `service` field value.
  249. [source,console]
  250. ----
  251. PUT _ingest/pipeline/my-pipeline
  252. {
  253. "processors": [
  254. {
  255. "set": {
  256. "field": "{{service}}",
  257. "value": "{{code}}"
  258. }
  259. }
  260. ]
  261. }
  262. ----
  263. [discrete]
  264. [[access-metadata-fields]]
  265. === Access metadata fields in a processor
  266. Processors can access the following metadata fields by name:
  267. * `_index`
  268. * `_id`
  269. * `_routing`
  270. For example, the following `set` processor sets the document's routing value as
  271. the `geoip.country_iso_code` field value.
  272. [source,console]
  273. ----
  274. PUT _ingest/pipeline/my-pipeline
  275. {
  276. "processors" : [
  277. {
  278. "set" : {
  279. "field": "_routing",
  280. "value": "{{geoip.country_iso_code}}"
  281. }
  282. }
  283. ]
  284. }
  285. ----
  286. Use a Mustache template snippet to access metadata field values. For example,
  287. `{{_routing}}` retrieves a document's routing value.
  288. WARNING: If you <<create-document-ids-automatically,automatically generate>>
  289. document IDs, you cannot use `{{_id}}` in a processor. {es} assigns
  290. auto-generated `_id` values after ingest.
  291. [discrete]
  292. [[access-ingest-metadata]]
  293. === Access ingest metadata in a processor
  294. Ingest processors can add and access ingest metadata using the `_ingest` key.
  295. Unlike source and metadata fields, {es} does not index ingest metadata fields by
  296. default. {es} also allows source fields that start with an `_ingest` key. If
  297. your data includes such source fields, use `_source._ingest` to access them.
  298. Pipelines only create the `_ingest.timestamp` ingest metadata field by default.
  299. This field contains a timestamp of when {es} received the document's indexing
  300. request. To index `_ingest.timestamp` or other ingest metadata fields, use the
  301. `set` processor.
  302. [source,console]
  303. ----
  304. PUT _ingest/pipeline/my-pipeline
  305. {
  306. "processors": [
  307. {
  308. "set": {
  309. "field": "received",
  310. "value": "{{_ingest.timestamp}}"
  311. }
  312. }
  313. ]
  314. }
  315. ----
  316. [discrete]
  317. [[handling-pipeline-failures]]
  318. === Handing pipeline failures
  319. A pipeline's processors run sequentially. By default, pipeline processing stops
  320. when one of these processors fails or encounters an error.
  321. To ignore a processor failure and run the pipeline's remaining processors, set
  322. `ignore_failure` to `true`.
  323. [source,console]
  324. ----
  325. PUT _ingest/pipeline/my-pipeline
  326. {
  327. "processors": [
  328. {
  329. "rename": {
  330. "field": "foo",
  331. "target_field": "bar",
  332. "ignore_failure": true
  333. }
  334. }
  335. ]
  336. }
  337. ----
  338. Use the `on_failure` parameter to specify a list of processors to run
  339. immediately after a processor failure. If `on_failure` is specified, {es}
  340. afterward runs the pipeline's remaining processors , even if the `on_failure`
  341. configuration is empty.
  342. [source,console]
  343. ----
  344. PUT _ingest/pipeline/my-pipeline
  345. {
  346. "processors": [
  347. {
  348. "rename": {
  349. "field": "foo",
  350. "target_field": "bar",
  351. "on_failure": [
  352. {
  353. "set": {
  354. "field": "error.message",
  355. "value": "field \"foo\" does not exist, cannot rename to \"bar\"",
  356. "override": false
  357. }
  358. }
  359. ]
  360. }
  361. }
  362. ]
  363. }
  364. ----
  365. Nest a list of `on_failure` processors for nested error handling.
  366. [source,console]
  367. ----
  368. PUT _ingest/pipeline/my-pipeline
  369. {
  370. "processors": [
  371. {
  372. "rename": {
  373. "field": "foo",
  374. "target_field": "bar",
  375. "on_failure": [
  376. {
  377. "set": {
  378. "field": "error.message",
  379. "value": "field \"foo\" does not exist, cannot rename to \"bar\"",
  380. "override": false,
  381. "on_failure": [
  382. {
  383. "set": {
  384. "field": "error.message.multi",
  385. "value": "Document encountered multiple ingest errors",
  386. "override": true
  387. }
  388. }
  389. ]
  390. }
  391. }
  392. ]
  393. }
  394. }
  395. ]
  396. }
  397. ----
  398. You can also specify `on_failure` for a pipeline.
  399. [source,console]
  400. ----
  401. PUT _ingest/pipeline/my-pipeline
  402. {
  403. "processors": [ ... ],
  404. "on_failure": [
  405. {
  406. "set": {
  407. "field": "_index",
  408. "value": "failed-{{ _index }}"
  409. }
  410. }
  411. ]
  412. }
  413. ----
  414. // TEST[s/\.\.\./{"lowercase": {"field":"my-keyword-field"}}/]
  415. [discrete]
  416. [[conditionally-run-processor]]
  417. === Conditionally run a processor
  418. Each processor supports an optional `if` condition, written as a
  419. {painless}/painless-guide.html[Painless script]. If provided, the processor only
  420. runs when the `if` condition is `true`.
  421. IMPORTANT: `if` condition scripts run in Painless's
  422. {painless}/painless-ingest-processor-context.html[ingest processor context]. In
  423. `if` conditions, `ctx` values are read-only.
  424. The following <<drop-processor,`drop`>> processor uses an `if` condition to drop
  425. documents with a `network_name` of `Guest`.
  426. [source,console]
  427. ----
  428. PUT _ingest/pipeline/my-pipeline
  429. {
  430. "processors": [
  431. {
  432. "drop": {
  433. "if": "ctx?.network_name == 'Guest'"
  434. }
  435. }
  436. ]
  437. }
  438. ----
  439. If the static `script.painless.regex.enabled` cluster setting is enabled, you
  440. can use regular expressions in your `if` condition scripts. For supported
  441. syntax, see the {painless}/painless-regexes.html[Painless regexes]
  442. documentation.
  443. TIP: If possible, avoid using regular expressions. Expensive regular expressions
  444. can slow indexing speeds.
  445. [source,console]
  446. ----
  447. PUT _ingest/pipeline/my-pipeline
  448. {
  449. "processors": [
  450. {
  451. "set": {
  452. "if": "ctx.href?.url =~ /^http[^s]/",
  453. "field": "href.insecure",
  454. "value": true
  455. }
  456. }
  457. ]
  458. }
  459. ----
  460. You must specify `if` conditions as valid JSON on a single line. However, you
  461. can use the {kibana-ref}/console-kibana.html#configuring-console[{kib}
  462. console]'s triple quote syntax to write and debug larger scripts.
  463. TIP: If possible, avoid using complex or expensive `if` condition scripts.
  464. Expensive condition scripts can slow indexing speeds.
  465. [source,console]
  466. ----
  467. PUT _ingest/pipeline/my-pipeline
  468. {
  469. "processors": [
  470. {
  471. "drop": {
  472. "if": """
  473. Collection tags = ctx.tags;
  474. if(tags != null){
  475. for (String tag : tags) {
  476. if (tag.toLowerCase().contains('prod')) {
  477. return false;
  478. }
  479. }
  480. }
  481. return true;
  482. """
  483. }
  484. }
  485. ]
  486. }
  487. ----
  488. You can also specify a <<modules-scripting-stored-scripts,stored script>> as the
  489. `if` condition.
  490. [source,console]
  491. ----
  492. PUT _scripts/my-stored-script
  493. {
  494. "script": {
  495. "lang": "painless",
  496. "source": """
  497. Collection tags = ctx.tags;
  498. if(tags != null){
  499. for (String tag : tags) {
  500. if (tag.toLowerCase().contains('prod')) {
  501. return false;
  502. }
  503. }
  504. }
  505. return true;
  506. """
  507. }
  508. }
  509. PUT _ingest/pipeline/my-pipeline
  510. {
  511. "processors": [
  512. {
  513. "drop": {
  514. "if": { "id": "my-stored-script" }
  515. }
  516. }
  517. ]
  518. }
  519. ----
  520. Incoming documents often contain object fields. If a processor script attempts
  521. to access a field whose parent object does not exist, {es} returns a
  522. `NullPointerException`. To avoid these exceptions, use
  523. {painless}/painless-operators-reference.html#null-safe-operator[null safe
  524. operators], such as `?.`, and write your scripts to be null safe.
  525. For example, `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe.
  526. `ctx.network?.name` can return null. Rewrite the script as
  527. `'Guest'.equalsIgnoreCase(ctx.network?.name)`, which is null safe because
  528. `Guest` is always non-null.
  529. If you can't rewrite a script to be null safe, include an explicit null check.
  530. [source,console]
  531. ----
  532. PUT _ingest/pipeline/my-pipeline
  533. {
  534. "processors": [
  535. {
  536. "drop": {
  537. "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
  538. }
  539. }
  540. ]
  541. }
  542. ----
  543. [discrete]
  544. [[conditionally-apply-pipelines]]
  545. === Conditionally apply pipelines
  546. Combine an `if` condition with the <<pipeline-processor,`pipeline`>> processor
  547. to apply other pipelines to documents based on your criteria. You can use this
  548. pipeline as the <<set-default-pipeline,default pipeline>> in an
  549. <<index-templates,index template>> used to configure multiple data streams or
  550. indices.
  551. The following pipeline applies different pipelines to incoming documents based
  552. on the `service.name` field value.
  553. [source,console]
  554. ----
  555. PUT _ingest/pipeline/one-pipeline-to-rule-them-all
  556. {
  557. "processors": [
  558. {
  559. "pipeline": {
  560. "if": "ctx.service?.name == 'apache_httpd'",
  561. "name": "httpd_pipeline"
  562. }
  563. },
  564. {
  565. "pipeline": {
  566. "if": "ctx.service?.name == 'syslog'",
  567. "name": "syslog_pipeline"
  568. }
  569. },
  570. {
  571. "fail": {
  572. "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
  573. "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
  574. }
  575. }
  576. ]
  577. }
  578. ----
  579. [discrete]
  580. [[get-pipeline-usage-stats]]
  581. === Get pipeline usage statistics
  582. Use the <<cluster-nodes-stats,node stats>> API to get global and per-pipeline
  583. ingest statistics. Use these stats to determine which pipelines run most
  584. frequently or spend the most time processing.
  585. [source,console]
  586. ----
  587. GET _nodes/stats/ingest?filter_path=nodes.*.ingest
  588. ----
  589. include::ingest/common-log-format-example.asciidoc[]
  590. include::ingest/enrich.asciidoc[]
  591. include::ingest/processors.asciidoc[]