use-a-data-stream.asciidoc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. [role="xpack"]
  2. [[use-a-data-stream]]
  3. == Use a data stream
  4. After you <<set-up-a-data-stream,set up a data stream>>, you can do
  5. the following:
  6. * <<add-documents-to-a-data-stream>>
  7. * <<search-a-data-stream>>
  8. * <<get-stats-for-a-data-stream>>
  9. * <<manually-roll-over-a-data-stream>>
  10. * <<open-closed-backing-indices>>
  11. * <<reindex-with-a-data-stream>>
  12. * <<update-delete-docs-in-a-data-stream>>
  13. * <<update-delete-docs-in-a-backing-index>>
  14. ////
  15. [source,console]
  16. ----
  17. PUT /_index_template/logs_data_stream
  18. {
  19. "index_patterns": [ "logs*" ],
  20. "data_stream": { }
  21. }
  22. PUT /_data_stream/logs
  23. POST /logs/_rollover/
  24. POST /logs/_rollover/
  25. PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
  26. {
  27. "@timestamp": "2020-12-07T11:06:07.000Z",
  28. "user": {
  29. "id": "yWIumJd7"
  30. },
  31. "message": "Login successful"
  32. }
  33. PUT /_data_stream/logs_alt
  34. ----
  35. // TESTSETUP
  36. [source,console]
  37. ----
  38. DELETE /_data_stream/*
  39. DELETE /_index_template/*
  40. ----
  41. // TEARDOWN
  42. ////
  43. [discrete]
  44. [[add-documents-to-a-data-stream]]
  45. === Add documents to a data stream
  46. You can add documents to a data stream using the following requests:
  47. * An <<docs-index_,index API>> request with an
  48. <<docs-index-api-op_type,`op_type`>> set to `create`. Specify the data
  49. stream's name in place of an index name.
  50. +
  51. --
  52. NOTE: The `op_type` parameter defaults to `create` when adding new documents.
  53. .*Example: Index API request*
  54. [%collapsible]
  55. ====
  56. The following index API request adds a new document to the `logs` data
  57. stream.
  58. [source,console]
  59. ----
  60. POST /logs/_doc/
  61. {
  62. "@timestamp": "2020-12-07T11:06:07.000Z",
  63. "user": {
  64. "id": "8a4f500d"
  65. },
  66. "message": "Login successful"
  67. }
  68. ----
  69. ====
  70. IMPORTANT: You cannot add new documents to a data stream using the index API's
  71. `PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
  72. `PUT /<target>/_create/<_id>` format instead.
  73. --
  74. * A <<docs-bulk,bulk API>> request using the `create` action. Specify the data
  75. stream's name in place of an index name.
  76. +
  77. --
  78. NOTE: Data streams do not support other bulk actions, such as `index`.
  79. .*Example: Bulk API request*
  80. [%collapsible]
  81. ====
  82. The following bulk API request adds several new documents to
  83. the `logs` data stream. Note that only the `create` action is used.
  84. [source,console]
  85. ----
  86. PUT /logs/_bulk?refresh
  87. {"create":{ }}
  88. { "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
  89. {"create":{ }}
  90. { "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
  91. {"create":{ }}
  92. { "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
  93. ----
  94. ====
  95. --
  96. You can use an <<ingest,ingest pipeline>> with these requests to pre-process
  97. data before it's indexed.
  98. .*Example: Ingest pipeline*
  99. [%collapsible]
  100. ====
  101. The following <<put-pipeline-api,put pipeline API>> request creates the
  102. `lowercase_message_field` ingest pipeline. The pipeline uses the
  103. <<lowercase-processor,`lowercase` ingest processor>> to change the `message`
  104. field value to lowercase before indexing.
  105. [source,console]
  106. ----
  107. PUT /_ingest/pipeline/lowercase_message_field
  108. {
  109. "description" : "Lowercases the message field value",
  110. "processors" : [
  111. {
  112. "lowercase" : {
  113. "field" : "message"
  114. }
  115. }
  116. ]
  117. }
  118. ----
  119. // TEST[continued]
  120. The following index API request adds a new document to the `logs` data stream.
  121. The request includes a `?pipeline=lowercase_message_field` query parameter.
  122. This parameter indicates {es} should use the `lowercase_message_field` pipeline
  123. to pre-process the document before indexing it.
  124. During pre-processing, the pipeline changes the letter case of the document's
  125. `message` field value from `LOGIN Successful` to `login successful`.
  126. [source,console]
  127. ----
  128. POST /logs/_doc?pipeline=lowercase_message_field
  129. {
  130. "@timestamp": "2020-12-08T11:12:01.000Z",
  131. "user": {
  132. "id": "I1YBEOxJ"
  133. },
  134. "message": "LOGIN Successful"
  135. }
  136. ----
  137. // TEST[continued]
  138. ////
  139. [source,console]
  140. ----
  141. DELETE /_ingest/pipeline/lowercase_message_field
  142. ----
  143. // TEST[continued]
  144. ////
  145. ====
  146. [discrete]
  147. [[search-a-data-stream]]
  148. === Search a data stream
  149. The following search APIs support data streams:
  150. * <<search-search, Search>>
  151. * <<async-search, Async search>>
  152. * <<search-multi-search, Multi search>>
  153. * <<search-field-caps, Field capabilities>>
  154. ////
  155. * <<eql-search-api, EQL search>>
  156. ////
  157. .*Example*
  158. [%collapsible]
  159. ====
  160. The following <<search-search,search API>> request searches the `logs` data
  161. stream for documents with a timestamp between today and yesterday that also have
  162. `message` value of `login successful`.
  163. [source,console]
  164. ----
  165. GET /logs/_search
  166. {
  167. "query": {
  168. "bool": {
  169. "must": {
  170. "range": {
  171. "@timestamp": {
  172. "gte": "now-1d/d",
  173. "lt": "now/d"
  174. }
  175. }
  176. },
  177. "should": {
  178. "match": {
  179. "message": "login successful"
  180. }
  181. }
  182. }
  183. }
  184. }
  185. ----
  186. ====
  187. You can use a comma-separated list or wildcard (`*`) expression to search
  188. multiple data streams, indices, and index aliases in the same request.
  189. .*Example*
  190. [%collapsible]
  191. ====
  192. The following request searches the `logs` and `logs_alt` data streams, which are
  193. specified as a comma-separated list in the request path.
  194. [source,console]
  195. ----
  196. GET /logs,logs_alt/_search
  197. {
  198. "query": {
  199. "match": {
  200. "user.id": "8a4f500d"
  201. }
  202. }
  203. }
  204. ----
  205. The following request uses the `logs*` wildcard expression to search any data
  206. stream, index, or index alias beginning with `logs`.
  207. [source,console]
  208. ----
  209. GET /logs*/_search
  210. {
  211. "query": {
  212. "match": {
  213. "user.id": "vlb44hny"
  214. }
  215. }
  216. }
  217. ----
  218. The following search request omits a target in the request path. The request
  219. searches all data streams and indices in the cluster.
  220. [source,console]
  221. ----
  222. GET /_search
  223. {
  224. "query": {
  225. "match": {
  226. "user.id": "l7gk7f82"
  227. }
  228. }
  229. }
  230. ----
  231. ====
  232. [discrete]
  233. [[get-stats-for-a-data-stream]]
  234. === Get statistics for a data stream
  235. You can use the <<data-stream-stats-api,data stream stats API>> to retrieve
  236. statistics for one or more data streams. These statistics include:
  237. * A count of the stream's backing indices
  238. * The total store size of all shards for the stream's backing indices
  239. * The highest `@timestamp` value for the stream
  240. .*Example*
  241. [%collapsible]
  242. ====
  243. The following data stream stats API request retrieves statistics for the
  244. `logs` data stream.
  245. [source,console]
  246. ----
  247. GET /_data_stream/logs/_stats?human=true
  248. ----
  249. The API returns the following response.
  250. [source,console-result]
  251. ----
  252. {
  253. "_shards": {
  254. "total": 6,
  255. "successful": 3,
  256. "failed": 0
  257. },
  258. "data_stream_count": 1,
  259. "backing_indices": 3,
  260. "total_store_size": "624b",
  261. "total_store_size_bytes": 624,
  262. "data_streams": [
  263. {
  264. "data_stream": "logs",
  265. "backing_indices": 3,
  266. "store_size": "624b",
  267. "store_size_bytes": 624,
  268. "maximum_timestamp": 1607339167000
  269. }
  270. ]
  271. }
  272. ----
  273. // TESTRESPONSE[s/"total_store_size": "624b"/"total_store_size": $body.total_store_size/]
  274. // TESTRESPONSE[s/"total_store_size_bytes": 624/"total_store_size_bytes": $body.total_store_size_bytes/]
  275. // TESTRESPONSE[s/"store_size": "624b"/"store_size": $body.data_streams.0.store_size/]
  276. // TESTRESPONSE[s/"store_size_bytes": 624/"store_size_bytes": $body.data_streams.0.store_size_bytes/]
  277. ====
  278. [discrete]
  279. [[manually-roll-over-a-data-stream]]
  280. === Manually roll over a data stream
  281. A rollover creates a new backing index for a data stream. This new backing index
  282. becomes the stream's <<data-stream-write-index,write index>> and increments
  283. the stream's <<data-streams-generation,generation>>.
  284. In most cases, we recommend using <<index-lifecycle-management,{ilm-init}>> to
  285. automate rollovers for data streams. This lets you automatically roll over the
  286. current write index when it meets specified criteria, such as a maximum age or
  287. size.
  288. However, you can also use the <<indices-rollover-index,rollover API>> to
  289. manually perform a rollover. This can be useful if you want to
  290. <<data-streams-change-mappings-and-settings,apply mapping or setting changes>>
  291. to the stream's write index after updating a data stream's template.
  292. .*Example*
  293. [%collapsible]
  294. ====
  295. The following <<indices-rollover-index,rollover API>> request submits a manual
  296. rollover request for the `logs` data stream.
  297. [source,console]
  298. ----
  299. POST /logs/_rollover/
  300. ----
  301. ====
  302. [discrete]
  303. [[open-closed-backing-indices]]
  304. === Open closed backing indices
  305. You may <<indices-close,close>> one or more of a data stream's backing indices
  306. as part of its {ilm-init} lifecycle or another workflow. A closed backing index
  307. cannot be searched, even for searches targeting its data stream. You also can't
  308. <<update-delete-docs-in-a-data-stream,update or delete documents>> in a closed
  309. index.
  310. You can re-open individual backing indices by sending an
  311. <<indices-open-close,open request>> directly to the index.
  312. You also can conveniently re-open all closed backing indices for a data stream
  313. by sending an open request directly to the stream.
  314. .*Example*
  315. [%collapsible]
  316. ====
  317. The following <<cat-indices,cat indices>> API request retrieves the status for
  318. the `logs` data stream's backing indices.
  319. ////
  320. [source,console]
  321. ----
  322. POST /.ds-logs-000001,.ds-logs-000002/_close/
  323. ----
  324. ////
  325. [source,console]
  326. ----
  327. GET /_cat/indices/logs?v&s=index&h=index,status
  328. ----
  329. // TEST[continued]
  330. The API returns the following response. The response indicates the `logs` data
  331. stream contains two closed backing indices: `.ds-logs-000001` and
  332. `.ds-logs-000002`.
  333. [source,txt]
  334. ----
  335. index status
  336. .ds-logs-000001 close
  337. .ds-logs-000002 close
  338. .ds-logs-000003 open
  339. ----
  340. // TESTRESPONSE[non_json]
  341. The following <<indices-open-close,open API>> request re-opens any closed
  342. backing indices for the `logs` data stream, including `.ds-logs-000001` and
  343. `.ds-logs-000002`.
  344. [source,console]
  345. ----
  346. POST /logs/_open/
  347. ----
  348. // TEST[continued]
  349. You can resubmit the original cat indices API request to verify the
  350. `.ds-logs-000001` and `.ds-logs-000002` backing indices were re-opened.
  351. [source,console]
  352. ----
  353. GET /_cat/indices/logs?v&s=index&h=index,status
  354. ----
  355. // TEST[continued]
  356. The API returns the following response.
  357. [source,txt]
  358. ----
  359. index status
  360. .ds-logs-000001 open
  361. .ds-logs-000002 open
  362. .ds-logs-000003 open
  363. ----
  364. // TESTRESPONSE[non_json]
  365. ====
  366. [discrete]
  367. [[reindex-with-a-data-stream]]
  368. === Reindex with a data stream
  369. You can use the <<docs-reindex,reindex API>> to copy documents to a data stream
  370. from an existing index, index alias, or data stream.
  371. A reindex copies documents from a _source_ to a _destination_. The source and
  372. destination can be any pre-existing index, index alias, or data stream. However,
  373. the source and destination must be different. You cannot reindex a data stream
  374. into itself.
  375. Because data streams are <<data-streams-append-only,append-only>>, a reindex
  376. request to a data stream destination must have an `op_type` of `create`. This
  377. means a reindex can only add new documents to a data stream. It cannot update
  378. existing documents in the data stream destination.
  379. A reindex can be used to:
  380. * Convert an existing index alias and collection of time-based indices into a
  381. data stream.
  382. * Apply a new or updated <<create-a-data-stream-template,index template>>
  383. by reindexing an existing data stream into a new one. This applies mapping
  384. and setting changes in the template to each document and backing index of the
  385. data stream destination. See
  386. <<data-streams-use-reindex-to-change-mappings-settings>>.
  387. TIP: If you only want to update the mappings or settings of a data stream's
  388. write index, we recommend you update the <<create-a-data-stream-template,data
  389. stream's template>> and perform a <<manually-roll-over-a-data-stream,rollover>>.
  390. .*Example*
  391. [%collapsible]
  392. ====
  393. The following reindex request copies documents from the `archive` index alias to
  394. the existing `logs` data stream. Because the destination is a data stream, the
  395. request's `op_type` is `create`.
  396. ////
  397. [source,console]
  398. ----
  399. PUT /_bulk?refresh=wait_for
  400. {"create":{"_index" : "archive_1"}}
  401. { "@timestamp": "2020-12-08T11:04:05.000Z" }
  402. {"create":{"_index" : "archive_2"}}
  403. { "@timestamp": "2020-12-08T11:06:07.000Z" }
  404. {"create":{"_index" : "archive_2"}}
  405. { "@timestamp": "2020-12-09T11:07:08.000Z" }
  406. {"create":{"_index" : "archive_2"}}
  407. { "@timestamp": "2020-12-09T11:07:08.000Z" }
  408. POST /_aliases
  409. {
  410. "actions" : [
  411. { "add" : { "index" : "archive_1", "alias" : "archive" } },
  412. { "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} }
  413. ]
  414. }
  415. ----
  416. ////
  417. [source,console]
  418. ----
  419. POST /_reindex
  420. {
  421. "source": {
  422. "index": "archive"
  423. },
  424. "dest": {
  425. "index": "logs",
  426. "op_type": "create"
  427. }
  428. }
  429. ----
  430. // TEST[continued]
  431. ====
  432. You can also reindex documents from a data stream to an index, index
  433. alias, or data stream.
  434. .*Example*
  435. [%collapsible]
  436. ====
  437. The following reindex request copies documents from the `logs` data stream
  438. to the existing `archive` index alias. Because the destination is not a data
  439. stream, the `op_type` does not need to be specified.
  440. [source,console]
  441. ----
  442. POST /_reindex
  443. {
  444. "source": {
  445. "index": "logs"
  446. },
  447. "dest": {
  448. "index": "archive"
  449. }
  450. }
  451. ----
  452. // TEST[continued]
  453. ====
  454. [discrete]
  455. [[update-delete-docs-in-a-data-stream]]
  456. === Update or delete documents in a data stream
  457. You can update or delete documents in a data stream using the following
  458. requests:
  459. * An <<docs-update-by-query,update by query API>> request
  460. +
  461. .*Example*
  462. [%collapsible]
  463. ====
  464. The following update by query API request updates documents in the `logs` data
  465. stream with a `user.id` of `l7gk7f82`. The request uses a
  466. <<modules-scripting-using,script>> to assign matching documents a new `user.id`
  467. value of `XgdX0NoX`.
  468. [source,console]
  469. ----
  470. POST /logs/_update_by_query
  471. {
  472. "query": {
  473. "match": {
  474. "user.id": "l7gk7f82"
  475. }
  476. },
  477. "script": {
  478. "source": "ctx._source.user.id = params.new_id",
  479. "params": {
  480. "new_id": "XgdX0NoX"
  481. }
  482. }
  483. }
  484. ----
  485. ====
  486. * A <<docs-delete-by-query,delete by query API>> request
  487. +
  488. .*Example*
  489. [%collapsible]
  490. ====
  491. The following delete by query API request deletes documents in the `logs` data
  492. stream with a `user.id` of `vlb44hny`.
  493. [source,console]
  494. ----
  495. POST /logs/_delete_by_query
  496. {
  497. "query": {
  498. "match": {
  499. "user.id": "vlb44hny"
  500. }
  501. }
  502. }
  503. ----
  504. ====
  505. [discrete]
  506. [[update-delete-docs-in-a-backing-index]]
  507. === Update or delete documents in a backing index
  508. Alternatively, you can update or delete documents in a data stream by sending
  509. the update or deletion request to the backing index containing the document. To
  510. do this, you first need to get:
  511. * The <<mapping-id-field,document ID>>
  512. * The name of the backing index that contains the document
  513. If you want to update a document, you must also get its current
  514. <<optimistic-concurrency-control,sequence number and primary term>>.
  515. You can use a <<search-a-data-stream,search request>> to retrieve this
  516. information.
  517. .*Example*
  518. [%collapsible]
  519. ====
  520. The following search request retrieves documents in the `logs` data stream with
  521. a `user.id` of `yWIumJd7`. By default, this search returns the document ID and
  522. backing index for any matching documents.
  523. The request includes a `"seq_no_primary_term": true` argument. This means the
  524. search also returns the sequence number and primary term for any matching
  525. documents.
  526. [source,console]
  527. ----
  528. GET /logs/_search
  529. {
  530. "seq_no_primary_term": true,
  531. "query": {
  532. "match": {
  533. "user.id": "yWIumJd7"
  534. }
  535. }
  536. }
  537. ----
  538. The API returns the following response. The `hits.hits` property contains
  539. information for any documents matching the search.
  540. [source,console-result]
  541. ----
  542. {
  543. "took": 20,
  544. "timed_out": false,
  545. "_shards": {
  546. "total": 3,
  547. "successful": 3,
  548. "skipped": 0,
  549. "failed": 0
  550. },
  551. "hits": {
  552. "total": {
  553. "value": 1,
  554. "relation": "eq"
  555. },
  556. "max_score": 0.2876821,
  557. "hits": [
  558. {
  559. "_index": ".ds-logs-000003", <1>
  560. "_id": "bfspvnIBr7VVZlfp2lqX", <2>
  561. "_seq_no": 0, <3>
  562. "_primary_term": 1, <4>
  563. "_score": 0.2876821,
  564. "_source": {
  565. "@timestamp": "2020-12-07T11:06:07.000Z",
  566. "user": {
  567. "id": "yWIumJd7"
  568. },
  569. "message": "Login successful"
  570. }
  571. }
  572. ]
  573. }
  574. }
  575. ----
  576. // TESTRESPONSE[s/"took": 20/"took": $body.took/]
  577. // TESTRESPONSE[s/"max_score": 0.2876821/"max_score": $body.hits.max_score/]
  578. // TESTRESPONSE[s/"_score": 0.2876821/"_score": $body.hits.hits.0._score/]
  579. <1> Backing index containing the matching document
  580. <2> Document ID for the document
  581. <3> Current sequence number for the document
  582. <4> Primary term for the document
  583. ====
  584. You can use an <<docs-index_,index API>> request to update an individual
  585. document. To prevent an accidental overwrite, this request must include valid
  586. `if_seq_no` and `if_primary_term` arguments.
  587. .*Example*
  588. [%collapsible]
  589. ====
  590. The following index API request updates an existing document in the `logs` data
  591. stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
  592. `.ds-logs-000003` backing index.
  593. The request also includes the current sequence number and primary term in the
  594. respective `if_seq_no` and `if_primary_term` query parameters. The request body
  595. contains a new JSON source for the document.
  596. [source,console]
  597. ----
  598. PUT /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
  599. {
  600. "@timestamp": "2020-12-07T11:06:07.000Z",
  601. "user": {
  602. "id": "8a4f500d"
  603. },
  604. "message": "Login successful"
  605. }
  606. ----
  607. ====
  608. You use the <<docs-delete,delete API>> to delete individual documents. Deletion
  609. requests do not require a sequence number or primary term.
  610. .*Example*
  611. [%collapsible]
  612. ====
  613. The following index API request deletes an existing document in the `logs` data
  614. stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
  615. `.ds-logs-000003` backing index.
  616. [source,console]
  617. ----
  618. DELETE /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX
  619. ----
  620. ====
  621. You can use the <<docs-bulk,bulk API>> to delete or update multiple documents in
  622. one request using `delete`, `index`, or `update` actions.
  623. If the action type is `index`, the action must include valid
  624. <<bulk-optimistic-concurrency-control,`if_seq_no` and `if_primary_term`>>
  625. arguments.
  626. .*Example*
  627. [%collapsible]
  628. ====
  629. The following bulk API request uses an `index` action to update an existing
  630. document in the `logs` data stream.
  631. The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the
  632. `.ds-logs-000003` backing index. The action also includes the current sequence
  633. number and primary term in the respective `if_seq_no` and `if_primary_term`
  634. parameters.
  635. [source,console]
  636. ----
  637. PUT /_bulk?refresh
  638. { "index": { "_index": ".ds-logs-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
  639. { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
  640. ----
  641. ====