ingest-node.asciidoc 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354
  1. [[pipeline]]
  2. == Pipeline Definition
  3. A pipeline is a definition of a series of <<ingest-processors, processors>> that are to be executed
  4. in the same order as they are declared. A pipeline consists of two main fields: a `description`
  5. and a list of `processors`:
  6. [source,js]
  7. --------------------------------------------------
  8. {
  9. "description" : "...",
  10. "processors" : [ ... ]
  11. }
  12. --------------------------------------------------
  13. // NOTCONSOLE
  14. The `description` is a special field to store a helpful description of
  15. what the pipeline does.
  16. The `processors` parameter defines a list of processors to be executed in
  17. order.
  18. [[ingest-apis]]
  19. == Ingest APIs
  20. The following ingest APIs are available for managing pipelines:
  21. * <<put-pipeline-api>> to add or update a pipeline
  22. * <<get-pipeline-api>> to return a specific pipeline
  23. * <<delete-pipeline-api>> to delete a pipeline
  24. * <<simulate-pipeline-api>> to simulate a call to a pipeline
  25. [[put-pipeline-api]]
  26. === Put Pipeline API
  27. The put pipeline API adds pipelines and updates existing pipelines in the cluster.
  28. [source,js]
  29. --------------------------------------------------
  30. PUT _ingest/pipeline/my-pipeline-id
  31. {
  32. "description" : "describe pipeline",
  33. "processors" : [
  34. {
  35. "set" : {
  36. "field": "foo",
  37. "value": "bar"
  38. }
  39. }
  40. ]
  41. }
  42. --------------------------------------------------
  43. // CONSOLE
  44. NOTE: The put pipeline API also instructs all ingest nodes to reload their in-memory representation of pipelines, so that
  45. pipeline changes take effect immediately.
  46. [[get-pipeline-api]]
  47. === Get Pipeline API
  48. The get pipeline API returns pipelines based on ID. This API always returns a local reference of the pipeline.
  49. [source,js]
  50. --------------------------------------------------
  51. GET _ingest/pipeline/my-pipeline-id
  52. --------------------------------------------------
  53. // CONSOLE
  54. // TEST[continued]
  55. Example response:
  56. [source,js]
  57. --------------------------------------------------
  58. {
  59. "my-pipeline-id" : {
  60. "description" : "describe pipeline",
  61. "processors" : [
  62. {
  63. "set" : {
  64. "field" : "foo",
  65. "value" : "bar"
  66. }
  67. }
  68. ]
  69. }
  70. }
  71. --------------------------------------------------
  72. // TESTRESPONSE
  73. For each returned pipeline, the source and the version are returned.
  74. The version is useful for knowing which version of the pipeline the node has.
  75. You can specify multiple IDs to return more than one pipeline. Wildcards are also supported.
  76. [float]
  77. [[versioning-pipelines]]
  78. ==== Pipeline Versioning
  79. Pipelines can optionally add a `version` number, which can be any integer value,
  80. in order to simplify pipeline management by external systems. The `version`
  81. field is completely optional and it is meant solely for external management of
  82. pipelines. To unset a `version`, simply replace the pipeline without specifying
  83. one.
  84. [source,js]
  85. --------------------------------------------------
  86. PUT _ingest/pipeline/my-pipeline-id
  87. {
  88. "description" : "describe pipeline",
  89. "version" : 123,
  90. "processors" : [
  91. {
  92. "set" : {
  93. "field": "foo",
  94. "value": "bar"
  95. }
  96. }
  97. ]
  98. }
  99. --------------------------------------------------
  100. // CONSOLE
  101. To check for the `version`, you can
  102. <<common-options-response-filtering, filter responses>>
  103. using `filter_path` to limit the response to just the `version`:
  104. [source,js]
  105. --------------------------------------------------
  106. GET /_ingest/pipeline/my-pipeline-id?filter_path=*.version
  107. --------------------------------------------------
  108. // CONSOLE
  109. // TEST[continued]
  110. This should give a small response that makes it both easy and inexpensive to parse:
  111. [source,js]
  112. --------------------------------------------------
  113. {
  114. "my-pipeline-id" : {
  115. "version" : 123
  116. }
  117. }
  118. --------------------------------------------------
  119. // TESTRESPONSE
  120. [[delete-pipeline-api]]
  121. === Delete Pipeline API
  122. The delete pipeline API deletes pipelines by ID or wildcard match (`my-*`, `*`).
  123. [source,js]
  124. --------------------------------------------------
  125. DELETE _ingest/pipeline/my-pipeline-id
  126. --------------------------------------------------
  127. // CONSOLE
  128. // TEST[continued]
  129. ////
  130. Hidden setup for wildcard test:
  131. [source,js]
  132. --------------------------------------------------
  133. PUT _ingest/pipeline/wild-one
  134. {
  135. "description" : "first pipeline to be wildcard deleted",
  136. "processors" : [ ]
  137. }
  138. PUT _ingest/pipeline/wild-two
  139. {
  140. "description" : "second pipeline to be wildcard deleted",
  141. "processors" : [ ]
  142. }
  143. DELETE _ingest/pipeline/*
  144. --------------------------------------------------
  145. // CONSOLE
  146. Hidden expected response:
  147. [source,js]
  148. --------------------------------------------------
  149. {
  150. "acknowledged": true
  151. }
  152. --------------------------------------------------
  153. // TESTRESPONSE
  154. ////
  155. [[simulate-pipeline-api]]
  156. === Simulate Pipeline API
  157. The simulate pipeline API executes a specific pipeline against
  158. the set of documents provided in the body of the request.
  159. You can either specify an existing pipeline to execute
  160. against the provided documents, or supply a pipeline definition in
  161. the body of the request.
  162. Here is the structure of a simulate request with a pipeline definition provided
  163. in the body of the request:
  164. [source,js]
  165. --------------------------------------------------
  166. POST _ingest/pipeline/_simulate
  167. {
  168. "pipeline" : {
  169. // pipeline definition here
  170. },
  171. "docs" : [
  172. { "_source": {/** first document **/} },
  173. { "_source": {/** second document **/} },
  174. // ...
  175. ]
  176. }
  177. --------------------------------------------------
  178. // NOTCONSOLE
  179. Here is the structure of a simulate request against an existing pipeline:
  180. [source,js]
  181. --------------------------------------------------
  182. POST _ingest/pipeline/my-pipeline-id/_simulate
  183. {
  184. "docs" : [
  185. { "_source": {/** first document **/} },
  186. { "_source": {/** second document **/} },
  187. // ...
  188. ]
  189. }
  190. --------------------------------------------------
  191. // NOTCONSOLE
  192. Here is an example of a simulate request with a pipeline defined in the request
  193. and its response:
  194. [source,js]
  195. --------------------------------------------------
  196. POST _ingest/pipeline/_simulate
  197. {
  198. "pipeline" :
  199. {
  200. "description": "_description",
  201. "processors": [
  202. {
  203. "set" : {
  204. "field" : "field2",
  205. "value" : "_value"
  206. }
  207. }
  208. ]
  209. },
  210. "docs": [
  211. {
  212. "_index": "index",
  213. "_type": "_doc",
  214. "_id": "id",
  215. "_source": {
  216. "foo": "bar"
  217. }
  218. },
  219. {
  220. "_index": "index",
  221. "_type": "_doc",
  222. "_id": "id",
  223. "_source": {
  224. "foo": "rab"
  225. }
  226. }
  227. ]
  228. }
  229. --------------------------------------------------
  230. // CONSOLE
  231. Response:
  232. [source,js]
  233. --------------------------------------------------
  234. {
  235. "docs": [
  236. {
  237. "doc": {
  238. "_id": "id",
  239. "_index": "index",
  240. "_type": "_doc",
  241. "_source": {
  242. "field2": "_value",
  243. "foo": "bar"
  244. },
  245. "_ingest": {
  246. "timestamp": "2017-05-04T22:30:03.187Z"
  247. }
  248. }
  249. },
  250. {
  251. "doc": {
  252. "_id": "id",
  253. "_index": "index",
  254. "_type": "_doc",
  255. "_source": {
  256. "field2": "_value",
  257. "foo": "rab"
  258. },
  259. "_ingest": {
  260. "timestamp": "2017-05-04T22:30:03.188Z"
  261. }
  262. }
  263. }
  264. ]
  265. }
  266. --------------------------------------------------
  267. // TESTRESPONSE[s/"2017-05-04T22:30:03.187Z"/$body.docs.0.doc._ingest.timestamp/]
  268. // TESTRESPONSE[s/"2017-05-04T22:30:03.188Z"/$body.docs.1.doc._ingest.timestamp/]
  269. [[ingest-verbose-param]]
  270. ==== Viewing Verbose Results
  271. You can use the simulate pipeline API to see how each processor affects the ingest document
  272. as it passes through the pipeline. To see the intermediate results of
  273. each processor in the simulate request, you can add the `verbose` parameter
  274. to the request.
  275. Here is an example of a verbose request and its response:
  276. [source,js]
  277. --------------------------------------------------
  278. POST _ingest/pipeline/_simulate?verbose
  279. {
  280. "pipeline" :
  281. {
  282. "description": "_description",
  283. "processors": [
  284. {
  285. "set" : {
  286. "field" : "field2",
  287. "value" : "_value2"
  288. }
  289. },
  290. {
  291. "set" : {
  292. "field" : "field3",
  293. "value" : "_value3"
  294. }
  295. }
  296. ]
  297. },
  298. "docs": [
  299. {
  300. "_index": "index",
  301. "_type": "_doc",
  302. "_id": "id",
  303. "_source": {
  304. "foo": "bar"
  305. }
  306. },
  307. {
  308. "_index": "index",
  309. "_type": "_doc",
  310. "_id": "id",
  311. "_source": {
  312. "foo": "rab"
  313. }
  314. }
  315. ]
  316. }
  317. --------------------------------------------------
  318. // CONSOLE
  319. Response:
  320. [source,js]
  321. --------------------------------------------------
  322. {
  323. "docs": [
  324. {
  325. "processor_results": [
  326. {
  327. "doc": {
  328. "_id": "id",
  329. "_index": "index",
  330. "_type": "_doc",
  331. "_source": {
  332. "field2": "_value2",
  333. "foo": "bar"
  334. },
  335. "_ingest": {
  336. "timestamp": "2017-05-04T22:46:09.674Z"
  337. }
  338. }
  339. },
  340. {
  341. "doc": {
  342. "_id": "id",
  343. "_index": "index",
  344. "_type": "_doc",
  345. "_source": {
  346. "field3": "_value3",
  347. "field2": "_value2",
  348. "foo": "bar"
  349. },
  350. "_ingest": {
  351. "timestamp": "2017-05-04T22:46:09.675Z"
  352. }
  353. }
  354. }
  355. ]
  356. },
  357. {
  358. "processor_results": [
  359. {
  360. "doc": {
  361. "_id": "id",
  362. "_index": "index",
  363. "_type": "_doc",
  364. "_source": {
  365. "field2": "_value2",
  366. "foo": "rab"
  367. },
  368. "_ingest": {
  369. "timestamp": "2017-05-04T22:46:09.676Z"
  370. }
  371. }
  372. },
  373. {
  374. "doc": {
  375. "_id": "id",
  376. "_index": "index",
  377. "_type": "_doc",
  378. "_source": {
  379. "field3": "_value3",
  380. "field2": "_value2",
  381. "foo": "rab"
  382. },
  383. "_ingest": {
  384. "timestamp": "2017-05-04T22:46:09.677Z"
  385. }
  386. }
  387. }
  388. ]
  389. }
  390. ]
  391. }
  392. --------------------------------------------------
  393. // TESTRESPONSE[s/"2017-05-04T22:46:09.674Z"/$body.docs.0.processor_results.0.doc._ingest.timestamp/]
  394. // TESTRESPONSE[s/"2017-05-04T22:46:09.675Z"/$body.docs.0.processor_results.1.doc._ingest.timestamp/]
  395. // TESTRESPONSE[s/"2017-05-04T22:46:09.676Z"/$body.docs.1.processor_results.0.doc._ingest.timestamp/]
  396. // TESTRESPONSE[s/"2017-05-04T22:46:09.677Z"/$body.docs.1.processor_results.1.doc._ingest.timestamp/]
  397. [[accessing-data-in-pipelines]]
  398. == Accessing Data in Pipelines
  399. The processors in a pipeline have read and write access to documents that pass through the pipeline.
  400. The processors can access fields in the source of a document and the document's metadata fields.
  401. [float]
  402. [[accessing-source-fields]]
  403. === Accessing Fields in the Source
  404. Accessing a field in the source is straightforward. You simply refer to fields by
  405. their name. For example:
  406. [source,js]
  407. --------------------------------------------------
  408. {
  409. "set": {
  410. "field": "my_field",
  411. "value": 582.1
  412. }
  413. }
  414. --------------------------------------------------
  415. // NOTCONSOLE
  416. On top of this, fields from the source are always accessible via the `_source` prefix:
  417. [source,js]
  418. --------------------------------------------------
  419. {
  420. "set": {
  421. "field": "_source.my_field",
  422. "value": 582.1
  423. }
  424. }
  425. --------------------------------------------------
  426. // NOTCONSOLE
  427. [float]
  428. [[accessing-metadata-fields]]
  429. === Accessing Metadata Fields
  430. You can access metadata fields in the same way that you access fields in the source. This
  431. is possible because Elasticsearch doesn't allow fields in the source that have the
  432. same name as metadata fields.
  433. The following example sets the `_id` metadata field of a document to `1`:
  434. [source,js]
  435. --------------------------------------------------
  436. {
  437. "set": {
  438. "field": "_id",
  439. "value": "1"
  440. }
  441. }
  442. --------------------------------------------------
  443. // NOTCONSOLE
  444. The following metadata fields are accessible by a processor: `_index`, `_type`, `_id`, `_routing`.
  445. [float]
  446. [[accessing-ingest-metadata]]
  447. === Accessing Ingest Metadata Fields
  448. Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.
  449. These metadata properties are accessible under the `_ingest` key. Currently ingest adds the ingest timestamp
  450. under the `_ingest.timestamp` key of the ingest metadata. The ingest timestamp is the time when Elasticsearch
  451. received the index or bulk request to pre-process the document.
  452. Any processor can add ingest-related metadata during document processing. Ingest metadata is transient
  453. and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won't be indexed.
  454. The following example adds a field with the name `received`. The value is the ingest timestamp:
  455. [source,js]
  456. --------------------------------------------------
  457. {
  458. "set": {
  459. "field": "received",
  460. "value": "{{_ingest.timestamp}}"
  461. }
  462. }
  463. --------------------------------------------------
  464. // NOTCONSOLE
  465. Unlike Elasticsearch metadata fields, the ingest metadata field name `_ingest` can be used as a valid field name
  466. in the source of a document. Use `_source._ingest` to refer to the field in the source document. Otherwise, `_ingest`
  467. will be interpreted as an ingest metadata field.
  468. [float]
  469. [[accessing-template-fields]]
  470. === Accessing Fields and Metafields in Templates
  471. A number of processor settings also support templating. Settings that support templating can have zero or more
  472. template snippets. A template snippet begins with `{{` and ends with `}}`.
  473. Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
  474. The following example adds a field named `field_c`. Its value is a concatenation of
  475. the values of `field_a` and `field_b`.
  476. [source,js]
  477. --------------------------------------------------
  478. {
  479. "set": {
  480. "field": "field_c",
  481. "value": "{{field_a}} {{field_b}}"
  482. }
  483. }
  484. --------------------------------------------------
  485. // NOTCONSOLE
  486. The following example uses the value of the `geoip.country_iso_code` field in the source
  487. to set the index that the document will be indexed into:
  488. [source,js]
  489. --------------------------------------------------
  490. {
  491. "set": {
  492. "field": "_index",
  493. "value": "{{geoip.country_iso_code}}"
  494. }
  495. }
  496. --------------------------------------------------
  497. // NOTCONSOLE
  498. Dynamic field names are also supported. This example sets the field named after the
  499. value of `service` to the value of the field `code`:
  500. [source,js]
  501. --------------------------------------------------
  502. {
  503. "set": {
  504. "field": "{{service}}",
  505. "value": "{{code}}"
  506. }
  507. }
  508. --------------------------------------------------
  509. // NOTCONSOLE
  510. [[ingest-conditionals]]
  511. == Conditional Execution in Pipelines
  512. Each processor allows for an optional `if` condition to determine if that
  513. processor should be executed or skipped. The value of the `if` is a
  514. <<modules-scripting-painless, Painless>> script that needs to evaluate
  515. to `true` or `false`.
  516. For example the following processor will <<drop-processor,drop>> the document
  517. (i.e. not index it) if the input document has a field named `network_name`
  518. and it is equal to `Guest`.
  519. [source,js]
  520. --------------------------------------------------
  521. PUT _ingest/pipeline/drop_guests_network
  522. {
  523. "processors": [
  524. {
  525. "drop": {
  526. "if": "ctx.network_name == 'Guest'"
  527. }
  528. }
  529. ]
  530. }
  531. --------------------------------------------------
  532. // CONSOLE
  533. Using that pipeline for an index request:
  534. [source,js]
  535. --------------------------------------------------
  536. POST test/_doc/1?pipeline=drop_guests_network
  537. {
  538. "network_name" : "Guest"
  539. }
  540. --------------------------------------------------
  541. // CONSOLE
  542. // TEST[continued]
  543. Results in nothing indexed since the conditional evaluated to `true`.
  544. [source,js]
  545. --------------------------------------------------
  546. {
  547. "_index": "test",
  548. "_type": "_doc",
  549. "_id": "1",
  550. "_version": -3,
  551. "result": "noop",
  552. "_shards": {
  553. "total": 0,
  554. "successful": 0,
  555. "failed": 0
  556. }
  557. }
  558. --------------------------------------------------
  559. // TESTRESPONSE
  560. [[ingest-conditional-nullcheck]]
  561. === Handling Nested Fields in Conditionals
  562. Source documents often contain nested fields. Care should be taken
  563. to avoid NullPointerExceptions if the parent object does not exist
  564. in the document. For example `ctx.a.b.c` can throw an NullPointerExceptions
  565. if the source document does not have top level `a` object, or a second
  566. level `b` object.
  567. To help protect against NullPointerExceptions, null safe operations should be used.
  568. Fortunately, Painless makes {painless}/painless-operators-reference.html#null-safe-operator[null safe]
  569. operations easy with the `?.` operator.
  570. [source,js]
  571. --------------------------------------------------
  572. PUT _ingest/pipeline/drop_guests_network
  573. {
  574. "processors": [
  575. {
  576. "drop": {
  577. "if": "ctx.network?.name == 'Guest'"
  578. }
  579. }
  580. ]
  581. }
  582. --------------------------------------------------
  583. // CONSOLE
  584. The following document will get <<drop-processor,dropped>> correctly:
  585. [source,js]
  586. --------------------------------------------------
  587. POST test/_doc/1?pipeline=drop_guests_network
  588. {
  589. "network": {
  590. "name": "Guest"
  591. }
  592. }
  593. --------------------------------------------------
  594. // CONSOLE
  595. // TEST[continued]
  596. ////
  597. Hidden example assertion:
  598. [source,js]
  599. --------------------------------------------------
  600. GET test/_doc/1
  601. --------------------------------------------------
  602. // CONSOLE
  603. // TEST[continued]
  604. // TEST[catch:missing]
  605. [source,js]
  606. --------------------------------------------------
  607. {
  608. "_index": "test",
  609. "_type": "_doc",
  610. "_id": "1",
  611. "found": false
  612. }
  613. --------------------------------------------------
  614. // TESTRESPONSE
  615. ////
  616. Thanks to the `?.` operator the following document will not throw an error.
  617. If the pipeline used a `.` the following document would throw a NullPointerException
  618. since the `network` object is not part of the source document.
  619. [source,js]
  620. --------------------------------------------------
  621. POST test/_doc/2?pipeline=drop_guests_network
  622. {
  623. "foo" : "bar"
  624. }
  625. --------------------------------------------------
  626. // CONSOLE
  627. // TEST[continued]
  628. ////
  629. Hidden example assertion:
  630. [source,js]
  631. --------------------------------------------------
  632. GET test/_doc/2
  633. --------------------------------------------------
  634. // CONSOLE
  635. // TEST[continued]
  636. [source,js]
  637. --------------------------------------------------
  638. {
  639. "_index": "test",
  640. "_type": "_doc",
  641. "_id": "2",
  642. "_version": 1,
  643. "_seq_no": 22,
  644. "_primary_term": 1,
  645. "found": true,
  646. "_source": {
  647. "foo": "bar"
  648. }
  649. }
  650. --------------------------------------------------
  651. // TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term": 1/"_primary_term" : $body._primary_term/]
  652. ////
  653. The source document can also use dot delimited fields to represent nested fields.
  654. For example instead the source document defining the fields nested:
  655. [source,js]
  656. --------------------------------------------------
  657. {
  658. "network": {
  659. "name": "Guest"
  660. }
  661. }
  662. --------------------------------------------------
  663. // NOTCONSOLE
  664. The source document may have the nested fields flattened as such:
  665. [source,js]
  666. --------------------------------------------------
  667. {
  668. "network.name": "Guest"
  669. }
  670. --------------------------------------------------
  671. // NOTCONSOLE
  672. If this is the case, use the <<dot-expand-processor, Dot Expand Processor>>
  673. so that the nested fields may be used in a conditional.
  674. [source,js]
  675. --------------------------------------------------
  676. PUT _ingest/pipeline/drop_guests_network
  677. {
  678. "processors": [
  679. {
  680. "dot_expander": {
  681. "field": "network.name"
  682. }
  683. },
  684. {
  685. "drop": {
  686. "if": "ctx.network?.name == 'Guest'"
  687. }
  688. }
  689. ]
  690. }
  691. --------------------------------------------------
  692. // CONSOLE
  693. Now the following input document can be used with a conditional in the pipeline.
  694. [source,js]
  695. --------------------------------------------------
  696. POST test/_doc/3?pipeline=drop_guests_network
  697. {
  698. "network.name": "Guest"
  699. }
  700. --------------------------------------------------
  701. // CONSOLE
  702. // TEST[continued]
  703. ////
  704. Hidden example assertion:
  705. [source,js]
  706. --------------------------------------------------
  707. GET test/_doc/3
  708. --------------------------------------------------
  709. // CONSOLE
  710. // TEST[continued]
  711. // TEST[catch:missing]
  712. [source,js]
  713. --------------------------------------------------
  714. {
  715. "_index": "test",
  716. "_type": "_doc",
  717. "_id": "3",
  718. "found": false
  719. }
  720. --------------------------------------------------
  721. // TESTRESPONSE
  722. ////
  723. The `?.` operators works well for use in the `if` conditional
  724. because the {painless}/painless-operators-reference.html#null-safe-operator[null safe operator]
  725. returns null if the object is null and `==` is null safe (as well as many other
  726. {painless}/painless-operators.html[painless operators]).
  727. However, calling a method such as `.equalsIgnoreCase` is not null safe
  728. and can result in a NullPointerException.
  729. Some situations allow for the same functionality but done so in a null safe manner.
  730. For example: `'Guest'.equalsIgnoreCase(ctx.network?.name)` is null safe because
  731. `Guest` is always non null, but `ctx.network?.name.equalsIgnoreCase('Guest')` is not null safe
  732. since `ctx.network?.name` can return null.
  733. Some situations require an explicit null check. In the following example there
  734. is not null safe alternative, so an explict null check is needed.
  735. [source,js]
  736. --------------------------------------------------
  737. {
  738. "drop": {
  739. "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
  740. }
  741. }
  742. --------------------------------------------------
  743. // NOTCONSOLE
  744. [[ingest-conditional-complex]]
  745. === Complex Conditionals
  746. The `if` condition can be more then a simple equality check.
  747. The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
  748. running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
  749. IMPORTANT: The value of ctx is read-only in `if` conditions.
  750. A more complex `if` condition that drops the document (i.e. not index it)
  751. unless it has a multi-valued tag field with at least one value that contains the characters
  752. `prod` (case insensitive).
  753. [source,js]
  754. --------------------------------------------------
  755. PUT _ingest/pipeline/not_prod_dropper
  756. {
  757. "processors": [
  758. {
  759. "drop": {
  760. "if": "Collection tags = ctx.tags;if(tags != null){for (String tag : tags) {if (tag.toLowerCase().contains('prod')) { return false;}}} return true;"
  761. }
  762. }
  763. ]
  764. }
  765. --------------------------------------------------
  766. // CONSOLE
  767. The conditional needs to be all on one line since JSON does not
  768. support new line characters. However, Kibana's console supports
  769. a triple quote syntax to help with writing and debugging
  770. scripts like these.
  771. [source,js]
  772. --------------------------------------------------
  773. PUT _ingest/pipeline/not_prod_dropper
  774. {
  775. "processors": [
  776. {
  777. "drop": {
  778. "if": """
  779. Collection tags = ctx.tags;
  780. if(tags != null){
  781. for (String tag : tags) {
  782. if (tag.toLowerCase().contains('prod')) {
  783. return false;
  784. }
  785. }
  786. }
  787. return true;
  788. """
  789. }
  790. }
  791. ]
  792. }
  793. --------------------------------------------------
  794. // NOTCONSOLE
  795. // TEST[continued]
  796. [source,js]
  797. --------------------------------------------------
  798. POST test/_doc/1?pipeline=not_prod_dropper
  799. {
  800. "tags": ["application:myapp", "env:Stage"]
  801. }
  802. --------------------------------------------------
  803. // CONSOLE
  804. // TEST[continued]
  805. The document is <<drop-processor,dropped>> since `prod` (case insensitive)
  806. is not found in the tags.
  807. ////
  808. Hidden example assertion:
  809. [source,js]
  810. --------------------------------------------------
  811. GET test/_doc/1
  812. --------------------------------------------------
  813. // CONSOLE
  814. // TEST[continued]
  815. // TEST[catch:missing]
  816. [source,js]
  817. --------------------------------------------------
  818. {
  819. "_index": "test",
  820. "_type": "_doc",
  821. "_id": "1",
  822. "found": false
  823. }
  824. --------------------------------------------------
  825. // TESTRESPONSE
  826. ////
  827. The following document is indexed (i.e. not dropped) since
  828. `prod` (case insensitive) is found in the tags.
  829. [source,js]
  830. --------------------------------------------------
  831. POST test/_doc/2?pipeline=not_prod_dropper
  832. {
  833. "tags": ["application:myapp", "env:Production"]
  834. }
  835. --------------------------------------------------
  836. // CONSOLE
  837. // TEST[continued]
  838. ////
  839. Hidden example assertion:
  840. [source,js]
  841. --------------------------------------------------
  842. GET test/_doc/2
  843. --------------------------------------------------
  844. // CONSOLE
  845. // TEST[continued]
  846. [source,js]
  847. --------------------------------------------------
  848. {
  849. "_index": "test",
  850. "_type": "_doc",
  851. "_id": "2",
  852. "_version": 1,
  853. "_seq_no": 34,
  854. "_primary_term": 1,
  855. "found": true,
  856. "_source": {
  857. "tags": [
  858. "application:myapp",
  859. "env:Production"
  860. ]
  861. }
  862. }
  863. --------------------------------------------------
  864. // TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
  865. ////
  866. The <<simulate-pipeline-api>> with verbose can be used to help build out
  867. complex conditionals. If the conditional evaluates to false it will be
  868. omitted from the verbose results of the simulation since the document will not change.
  869. Care should be taken to avoid overly complex or expensive conditional checks
  870. since the condition needs to be checked for each and every document.
  871. [[conditionals-with-multiple-pipelines]]
  872. === Conditionals with the Pipeline Processor
  873. The combination of the `if` conditional and the <<pipeline-processor>> can result in a simple,
  874. yet powerful means to process heterogeneous input. For example, you can define a single pipeline
  875. that delegates to other pipelines based on some criteria.
  876. [source,js]
  877. --------------------------------------------------
  878. PUT _ingest/pipeline/logs_pipeline
  879. {
  880. "description": "A pipeline of pipelines for log files",
  881. "version": 1,
  882. "processors": [
  883. {
  884. "pipeline": {
  885. "if": "ctx.service?.name == 'apache_httpd'",
  886. "name": "httpd_pipeline"
  887. }
  888. },
  889. {
  890. "pipeline": {
  891. "if": "ctx.service?.name == 'syslog'",
  892. "name": "syslog_pipeline"
  893. }
  894. },
  895. {
  896. "fail": {
  897. "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
  898. }
  899. }
  900. ]
  901. }
  902. --------------------------------------------------
  903. // CONSOLE
  904. The above example allows consumers to point to a single pipeline for all log based index requests.
  905. Based on the conditional, the correct pipeline will be called to process that type of data.
  906. This pattern works well with a <<dynamic-index-settings, default pipeline>> defined in an index mapping
  907. template for all indexes that hold data that needs pre-index processing.
  908. [[conditionals-with-regex]]
  909. === Conditionals with the Regular Expressions
  910. The `if` conditional is implemented as a Painless script, which requires
  911. {painless}//painless-examples.html#modules-scripting-painless-regex[explicit support for regular expressions].
  912. `script.painless.regex.enabled: true` must be set in `elasticsearch.yml` to use regular
  913. expressions in the `if` condition.
  914. If regular expressions are enabled, operators such as `=~` can be used against a `/pattern/` for conditions.
  915. For example:
  916. [source,js]
  917. --------------------------------------------------
  918. PUT _ingest/pipeline/check_url
  919. {
  920. "processors": [
  921. {
  922. "set": {
  923. "if": "ctx.href?.url =~ /^http[^s]/",
  924. "field": "href.insecure",
  925. "value": true
  926. }
  927. }
  928. ]
  929. }
  930. --------------------------------------------------
  931. // CONSOLE
  932. [source,js]
  933. --------------------------------------------------
  934. POST test/_doc/1?pipeline=check_url
  935. {
  936. "href": {
  937. "url": "http://www.elastic.co/"
  938. }
  939. }
  940. --------------------------------------------------
  941. // CONSOLE
  942. // TEST[continued]
  943. Results in:
  944. ////
  945. Hidden example assertion:
  946. [source,js]
  947. --------------------------------------------------
  948. GET test/_doc/1
  949. --------------------------------------------------
  950. // CONSOLE
  951. // TEST[continued]
  952. ////
  953. [source,js]
  954. --------------------------------------------------
  955. {
  956. "_index": "test",
  957. "_type": "_doc",
  958. "_id": "1",
  959. "_version": 1,
  960. "_seq_no": 60,
  961. "_primary_term": 1,
  962. "found": true,
  963. "_source": {
  964. "href": {
  965. "insecure": true,
  966. "url": "http://www.elastic.co/"
  967. }
  968. }
  969. }
  970. --------------------------------------------------
  971. // TESTRESPONSE[s/"_seq_no": \d+/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/]
  972. Regular expressions can be expensive and should be avoided if viable
  973. alternatives exist.
  974. For example in this case `startsWith` can be used to get the same result
  975. without using a regular expression:
  976. [source,js]
  977. --------------------------------------------------
  978. PUT _ingest/pipeline/check_url
  979. {
  980. "processors": [
  981. {
  982. "set": {
  983. "if": "ctx.href?.url != null && ctx.href.url.startsWith('http://')",
  984. "field": "href.insecure",
  985. "value": true
  986. }
  987. }
  988. ]
  989. }
  990. --------------------------------------------------
  991. // CONSOLE
  992. [[handling-failure-in-pipelines]]
  993. == Handling Failures in Pipelines
  994. In its simplest use case, a pipeline defines a list of processors that
  995. are executed sequentially, and processing halts at the first exception. This
  996. behavior may not be desirable when failures are expected. For example, you may have logs
  997. that don't match the specified grok expression. Instead of halting execution, you may
  998. want to index such documents into a separate index.
  999. To enable this behavior, you can use the `on_failure` parameter. The `on_failure` parameter
  1000. defines a list of processors to be executed immediately following the failed processor.
  1001. You can specify this parameter at the pipeline level, as well as at the processor
  1002. level. If a processor specifies an `on_failure` configuration, whether
  1003. it is empty or not, any exceptions that are thrown by the processor are caught, and the
  1004. pipeline continues executing the remaining processors. Because you can define further processors
  1005. within the scope of an `on_failure` statement, you can nest failure handling.
  1006. The following example defines a pipeline that renames the `foo` field in
  1007. the processed document to `bar`. If the document does not contain the `foo` field, the processor
  1008. attaches an error message to the document for later analysis within
  1009. Elasticsearch.
  1010. [source,js]
  1011. --------------------------------------------------
  1012. {
  1013. "description" : "my first pipeline with handled exceptions",
  1014. "processors" : [
  1015. {
  1016. "rename" : {
  1017. "field" : "foo",
  1018. "target_field" : "bar",
  1019. "on_failure" : [
  1020. {
  1021. "set" : {
  1022. "field" : "error",
  1023. "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
  1024. }
  1025. }
  1026. ]
  1027. }
  1028. }
  1029. ]
  1030. }
  1031. --------------------------------------------------
  1032. // NOTCONSOLE
  1033. The following example defines an `on_failure` block on a whole pipeline to change
  1034. the index to which failed documents get sent.
  1035. [source,js]
  1036. --------------------------------------------------
  1037. {
  1038. "description" : "my first pipeline with handled exceptions",
  1039. "processors" : [ ... ],
  1040. "on_failure" : [
  1041. {
  1042. "set" : {
  1043. "field" : "_index",
  1044. "value" : "failed-{{ _index }}"
  1045. }
  1046. }
  1047. ]
  1048. }
  1049. --------------------------------------------------
  1050. // NOTCONSOLE
  1051. Alternatively instead of defining behaviour in case of processor failure, it is also possible
  1052. to ignore a failure and continue with the next processor by specifying the `ignore_failure` setting.
  1053. In case in the example below the field `foo` doesn't exist the failure will be caught and the pipeline
  1054. continues to execute, which in this case means that the pipeline does nothing.
  1055. [source,js]
  1056. --------------------------------------------------
  1057. {
  1058. "description" : "my first pipeline with handled exceptions",
  1059. "processors" : [
  1060. {
  1061. "rename" : {
  1062. "field" : "foo",
  1063. "target_field" : "bar",
  1064. "ignore_failure" : true
  1065. }
  1066. }
  1067. ]
  1068. }
  1069. --------------------------------------------------
  1070. // NOTCONSOLE
  1071. The `ignore_failure` can be set on any processor and defaults to `false`.
  1072. [float]
  1073. [[accessing-error-metadata]]
  1074. === Accessing Error Metadata From Processors Handling Exceptions
  1075. You may want to retrieve the actual error message that was thrown
  1076. by a failed processor. To do so you can access metadata fields called
  1077. `on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
  1078. from within the context of an `on_failure` block.
  1079. Here is an updated version of the example that you
  1080. saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`
  1081. metadata field to provide the error message.
  1082. [source,js]
  1083. --------------------------------------------------
  1084. {
  1085. "description" : "my first pipeline with handled exceptions",
  1086. "processors" : [
  1087. {
  1088. "rename" : {
  1089. "field" : "foo",
  1090. "to" : "bar",
  1091. "on_failure" : [
  1092. {
  1093. "set" : {
  1094. "field" : "error",
  1095. "value" : "{{ _ingest.on_failure_message }}"
  1096. }
  1097. }
  1098. ]
  1099. }
  1100. }
  1101. ]
  1102. }
  1103. --------------------------------------------------
  1104. // NOTCONSOLE
  1105. [[ingest-processors]]
  1106. == Processors
  1107. All processors are defined in the following way within a pipeline definition:
  1108. [source,js]
  1109. --------------------------------------------------
  1110. {
  1111. "PROCESSOR_NAME" : {
  1112. ... processor configuration options ...
  1113. }
  1114. }
  1115. --------------------------------------------------
  1116. // NOTCONSOLE
  1117. Each processor defines its own configuration parameters, but all processors have
  1118. the ability to declare `tag`, `on_failure` and `if` fields. These fields are optional.
  1119. A `tag` is simply a string identifier of the specific instantiation of a certain
  1120. processor in a pipeline. The `tag` field does not affect the processor's behavior,
  1121. but is very useful for bookkeeping and tracing errors to specific processors.
  1122. The `if` field must contain a script that returns a boolean value. If the script evaluates to `true`
  1123. then the processor will be executed for the given document otherwise it will be skipped.
  1124. The `if` field takes an object with the script fields defined in <<script-processor, script-options>>
  1125. and accesses a read only version of the document via the same `ctx` variable used by scripts in the
  1126. <<script-processor>>.
  1127. [source,js]
  1128. --------------------------------------------------
  1129. {
  1130. "set": {
  1131. "if": "ctx.foo == 'someValue'",
  1132. "field": "found",
  1133. "value": true
  1134. }
  1135. }
  1136. --------------------------------------------------
  1137. // NOTCONSOLE
  1138. See <<ingest-conditionals>> to learn more about the `if` field and conditional execution.
  1139. See <<handling-failure-in-pipelines>> to learn more about the `on_failure` field and error handling in pipelines.
  1140. The <<ingest-info,node info API>> can be used to figure out what processors are available in a cluster.
  1141. The <<ingest-info,node info API>> will provide a per node list of what processors are available.
  1142. Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipeline
  1143. doesn't exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding
  1144. `plugin.mandatory` setting to the `config/elasticsearch.yml` file, for example:
  1145. [source,yaml]
  1146. --------------------------------------------------
  1147. plugin.mandatory: ingest-attachment
  1148. --------------------------------------------------
  1149. A node will not start if this plugin is not available.
  1150. The <<ingest-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a per
  1151. pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
  1152. [float]
  1153. === Ingest Processor Plugins
  1154. Additional ingest processors can be implemented and installed as Elasticsearch {plugins}/intro.html[plugins].
  1155. See {plugins}/ingest.html[Ingest plugins] for information about the available ingest plugins.
  1156. include::processors/append.asciidoc[]
  1157. include::processors/bytes.asciidoc[]
  1158. include::processors/convert.asciidoc[]
  1159. include::processors/date.asciidoc[]
  1160. include::processors/date-index-name.asciidoc[]
  1161. include::processors/dissect.asciidoc[]
  1162. include::processors/dot-expand.asciidoc[]
  1163. include::processors/drop.asciidoc[]
  1164. include::processors/fail.asciidoc[]
  1165. include::processors/foreach.asciidoc[]
  1166. include::processors/geoip.asciidoc[]
  1167. include::processors/grok.asciidoc[]
  1168. include::processors/gsub.asciidoc[]
  1169. include::processors/join.asciidoc[]
  1170. include::processors/json.asciidoc[]
  1171. include::processors/kv.asciidoc[]
  1172. include::processors/pipeline.asciidoc[]
  1173. include::processors/remove.asciidoc[]
  1174. include::processors/rename.asciidoc[]
  1175. include::processors/script.asciidoc[]
  1176. include::processors/set.asciidoc[]
  1177. include::processors/set-security-user.asciidoc[]
  1178. include::processors/split.asciidoc[]
  1179. include::processors/sort.asciidoc[]
  1180. include::processors/trim.asciidoc[]
  1181. include::processors/uppercase.asciidoc[]
  1182. include::processors/url-decode.asciidoc[]
  1183. include::processors/user-agent.asciidoc[]