123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740 |
- [role="xpack"]
- [[use-a-data-stream]]
- == Use a data stream
- After you <<set-up-a-data-stream,set up a data stream>>, you can do
- the following:
- * <<add-documents-to-a-data-stream>>
- * <<search-a-data-stream>>
- * <<get-stats-for-a-data-stream>>
- * <<manually-roll-over-a-data-stream>>
- * <<open-closed-backing-indices>>
- * <<reindex-with-a-data-stream>>
- * <<update-docs-in-a-data-stream-by-query>>
- * <<delete-docs-in-a-data-stream-by-query>>
- * <<update-delete-docs-in-a-backing-index>>
- ////
- [source,console]
- ----
- PUT /_index_template/logs_data_stream
- {
- "index_patterns": [ "logs*" ],
- "data_stream": { }
- }
- PUT /_data_stream/logs
- POST /logs/_rollover/
- POST /logs/_rollover/
- PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
- {
- "@timestamp": "2020-12-07T11:06:07.000Z",
- "user": {
- "id": "yWIumJd7"
- },
- "message": "Login successful"
- }
- PUT /_data_stream/logs_alt
- ----
- // TESTSETUP
- [source,console]
- ----
- DELETE /_data_stream/*
- DELETE /_index_template/*
- ----
- // TEARDOWN
- ////
- [discrete]
- [[add-documents-to-a-data-stream]]
- === Add documents to a data stream
- You can add documents to a data stream using two types of indexing requests:
- * <<data-streams-individual-indexing-requests>>
- * <<data-streams-bulk-indexing-requests>>
- Adding a document to a data stream adds the document to stream's current
- <<data-stream-write-index,write index>>.
- You cannot add new documents to a stream's other backing indices, even by
- sending requests directly to the index. This means you cannot submit the
- following requests directly to any backing index except the write index:
- * An <<docs-index_,index API>> request with an
- <<docs-index-api-op_type,`op_type`>> of `create`. The `op_type` parameter
- defaults to `create` when adding new documents.
- * A <<docs-bulk,bulk API>> request using a `create` action
- [discrete]
- [[data-streams-individual-indexing-requests]]
- ==== Individual indexing requests
- You can use an <<docs-index_,index API>> request with an
- <<docs-index-api-op_type,`op_type`>> of `create` to add individual documents
- to a data stream.
- NOTE: The `op_type` parameter defaults to `create` when adding new documents.
- The following index API request adds a new document to the `logs` data
- stream.
- [source,console]
- ----
- POST /logs/_doc/
- {
- "@timestamp": "2020-12-07T11:06:07.000Z",
- "user": {
- "id": "8a4f500d"
- },
- "message": "Login successful"
- }
- ----
- IMPORTANT: You cannot add new documents to a data stream using the index API's
- `PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
- `PUT /<target>/_create/<_id>` format instead.
- [discrete]
- [[data-streams-bulk-indexing-requests]]
- ==== Bulk indexing requests
- You can use the <<docs-bulk,bulk API>> to add multiple documents to a data
- stream in a single request. Each action in the bulk request must use the
- `create` action.
- NOTE: Data streams do not support other bulk actions, such as `index`.
- The following bulk API request adds several new documents to
- the `logs` data stream. Note that only the `create` action is used.
- [source,console]
- ----
- PUT /logs/_bulk?refresh
- {"create":{ }}
- { "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
- {"create":{ }}
- { "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
- {"create":{ }}
- { "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
- ----
- [discrete]
- [[data-streams-index-with-an-ingest-pipeline]]
- ==== Index with an ingest pipeline
- You can use an <<ingest,ingest pipeline>> with an indexing request to
- pre-process data before it's indexed to a data stream.
- The following <<put-pipeline-api,put pipeline API>> request creates the
- `lowercase_message_field` ingest pipeline. The pipeline uses the
- <<lowercase-processor,`lowercase` ingest processor>> to change the `message`
- field value to lowercase before indexing.
- [source,console]
- ----
- PUT /_ingest/pipeline/lowercase_message_field
- {
- "description" : "Lowercases the message field value",
- "processors" : [
- {
- "lowercase" : {
- "field" : "message"
- }
- }
- ]
- }
- ----
- // TEST[continued]
- The following index API request adds a new document to the `logs` data stream.
- The request includes a `?pipeline=lowercase_message_field` query parameter.
- This parameter indicates {es} should use the `lowercase_message_field` pipeline
- to pre-process the document before indexing it.
- During pre-processing, the pipeline changes the letter case of the document's
- `message` field value from `LOGIN Successful` to `login successful`.
- [source,console]
- ----
- POST /logs/_doc?pipeline=lowercase_message_field
- {
- "@timestamp": "2020-12-08T11:12:01.000Z",
- "user": {
- "id": "I1YBEOxJ"
- },
- "message": "LOGIN Successful"
- }
- ----
- // TEST[continued]
- ////
- [source,console]
- ----
- DELETE /_ingest/pipeline/lowercase_message_field
- ----
- // TEST[continued]
- ////
- [discrete]
- [[search-a-data-stream]]
- === Search a data stream
- The following search APIs support data streams:
- * <<search-search, Search>>
- * <<async-search, Async search>>
- * <<search-multi-search, Multi search>>
- * <<search-field-caps, Field capabilities>>
- ////
- * <<eql-search-api, EQL search>>
- ////
- The following <<search-search,search API>> request searches the `logs` data
- stream for documents with a timestamp between today and yesterday that also have
- `message` value of `login successful`.
- [source,console]
- ----
- GET /logs/_search
- {
- "query": {
- "bool": {
- "must": {
- "range": {
- "@timestamp": {
- "gte": "now-1d/d",
- "lt": "now/d"
- }
- }
- },
- "should": {
- "match": {
- "message": "login successful"
- }
- }
- }
- }
- }
- ----
- You can use a comma-separated list or wildcard (`*`) expression to search
- multiple data streams, indices, and index aliases in the same request.
- The following request searches the `logs` and `logs_alt` data streams, which are
- specified as a comma-separated list in the request path.
- [source,console]
- ----
- GET /logs,logs_alt/_search
- {
- "query": {
- "match": {
- "user.id": "8a4f500d"
- }
- }
- }
- ----
- The following request uses the `logs*` wildcard expression to search any data
- stream, index, or index alias beginning with `logs`.
- [source,console]
- ----
- GET /logs*/_search
- {
- "query": {
- "match": {
- "user.id": "vlb44hny"
- }
- }
- }
- ----
- The following search request omits a target in the request path. The request
- searches all data streams and indices in the cluster.
- [source,console]
- ----
- GET /_search
- {
- "query": {
- "match": {
- "user.id": "l7gk7f82"
- }
- }
- }
- ----
- [discrete]
- [[get-stats-for-a-data-stream]]
- === Get statistics for a data stream
- You can use the <<data-stream-stats-api,data stream stats API>> to retrieve
- statistics for one or more data streams. These statistics include:
- * A count of the stream's backing indices
- * The total store size of all shards for the stream's backing indices
- * The highest `@timestamp` value for the stream
- .*Example*
- [%collapsible]
- ====
- The following data stream stats API request retrieves statistics for the
- `logs` data stream.
- [source,console]
- ----
- GET /_data_stream/logs/_stats?human=true
- ----
- The API returns the following response.
- [source,console-result]
- ----
- {
- "_shards": {
- "total": 6,
- "successful": 3,
- "failed": 0
- },
- "data_stream_count": 1,
- "backing_indices": 3,
- "total_store_size": "624b",
- "total_store_size_bytes": 624,
- "data_streams": [
- {
- "data_stream": "logs",
- "backing_indices": 3,
- "store_size": "624b",
- "store_size_bytes": 624,
- "maximum_timestamp": 1607339167000
- }
- ]
- }
- ----
- // TESTRESPONSE[s/"total_store_size": "624b"/"total_store_size": $body.total_store_size/]
- // TESTRESPONSE[s/"total_store_size_bytes": 624/"total_store_size_bytes": $body.total_store_size_bytes/]
- // TESTRESPONSE[s/"store_size": "624b"/"store_size": $body.data_streams.0.store_size/]
- // TESTRESPONSE[s/"store_size_bytes": 624/"store_size_bytes": $body.data_streams.0.store_size_bytes/]
- ====
- [discrete]
- [[manually-roll-over-a-data-stream]]
- === Manually roll over a data stream
- A rollover creates a new backing index for a data stream. This new backing index
- becomes the stream's <<data-stream-write-index,write index>> and increments
- the stream's <<data-streams-generation,generation>>.
- In most cases, we recommend using <<index-lifecycle-management,{ilm-init}>> to
- automate rollovers for data streams. This lets you automatically roll over the
- current write index when it meets specified criteria, such as a maximum age or
- size.
- However, you can also use the <<indices-rollover-index,rollover API>> to
- manually perform a rollover. This can be useful if you want to
- <<data-streams-change-mappings-and-settings,apply mapping or setting changes>>
- to the stream's write index after updating a data stream's template.
- The following <<indices-rollover-index,rollover API>> request submits a manual
- rollover request for the `logs` data stream.
- [source,console]
- ----
- POST /logs/_rollover/
- ----
- [discrete]
- [[open-closed-backing-indices]]
- === Open closed backing indices
- You may <<indices-close,close>> one or more of a data stream's backing indices
- as part of its {ilm-init} lifecycle or another workflow. A closed backing index
- cannot be searched, even for searches targeting its data stream. You also can't
- <<update-docs-in-a-data-stream-by-query,update>> or
- <<delete-docs-in-a-data-stream-by-query,delete>> documents in a closed index.
- You can re-open individual backing indices by sending an
- <<indices-open-close,open request>> directly to the index.
- You also can conveniently re-open all closed backing indices for a data stream
- by sending an open request directly to the stream.
- The following <<cat-indices,cat indices>> API request retrieves the status for
- the `logs` data stream's backing indices.
- ////
- [source,console]
- ----
- POST /.ds-logs-000001,.ds-logs-000002/_close/
- ----
- ////
- [source,console]
- ----
- GET /_cat/indices/logs?v&s=index&h=index,status
- ----
- // TEST[continued]
- The API returns the following response. The response indicates the `logs` data
- stream contains two closed backing indices: `.ds-logs-000001` and
- `.ds-logs-000002`.
- [source,txt]
- ----
- index status
- .ds-logs-000001 close
- .ds-logs-000002 close
- .ds-logs-000003 open
- ----
- // TESTRESPONSE[non_json]
- The following <<indices-open-close,open API>> request re-opens any closed
- backing indices for the `logs` data stream, including `.ds-logs-000001` and
- `.ds-logs-000002`.
- [source,console]
- ----
- POST /logs/_open/
- ----
- // TEST[continued]
- You can resubmit the original cat indices API request to verify the
- `.ds-logs-000001` and `.ds-logs-000002` backing indices were re-opened.
- [source,console]
- ----
- GET /_cat/indices/logs?v&s=index&h=index,status
- ----
- // TEST[continued]
- The API returns the following response.
- [source,txt]
- ----
- index status
- .ds-logs-000001 open
- .ds-logs-000002 open
- .ds-logs-000003 open
- ----
- // TESTRESPONSE[non_json]
- [discrete]
- [[reindex-with-a-data-stream]]
- === Reindex with a data stream
- You can use the <<docs-reindex,reindex API>> to copy documents to a data stream
- from an existing index, index alias, or data stream.
- A reindex copies documents from a _source_ to a _destination_. The source and
- destination can be any pre-existing index, index alias, or data stream. However,
- the source and destination must be different. You cannot reindex a data stream
- into itself.
- Because data streams are <<data-streams-append-only,append-only>>, a reindex
- request to a data stream destination must have an `op_type` of `create`. This
- means a reindex can only add new documents to a data stream. It cannot update
- existing documents in the data stream destination.
- A reindex can be used to:
- * Convert an existing index alias and collection of time-based indices into a
- data stream.
- * Apply a new or updated <<create-a-data-stream-template,index template>>
- by reindexing an existing data stream into a new one. This applies mapping
- and setting changes in the template to each document and backing index of the
- data stream destination. See
- <<data-streams-use-reindex-to-change-mappings-settings>>.
- TIP: If you only want to update the mappings or settings of a data stream's
- write index, we recommend you update the <<create-a-data-stream-template,data
- stream's template>> and perform a <<manually-roll-over-a-data-stream,rollover>>.
- The following reindex request copies documents from the `archive` index alias to
- the existing `logs` data stream. Because the destination is a data stream, the
- request's `op_type` is `create`.
- ////
- [source,console]
- ----
- PUT /_bulk?refresh=wait_for
- {"create":{"_index" : "archive_1"}}
- { "@timestamp": "2020-12-08T11:04:05.000Z" }
- {"create":{"_index" : "archive_2"}}
- { "@timestamp": "2020-12-08T11:06:07.000Z" }
- {"create":{"_index" : "archive_2"}}
- { "@timestamp": "2020-12-09T11:07:08.000Z" }
- {"create":{"_index" : "archive_2"}}
- { "@timestamp": "2020-12-09T11:07:08.000Z" }
- POST /_aliases
- {
- "actions" : [
- { "add" : { "index" : "archive_1", "alias" : "archive" } },
- { "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} }
- ]
- }
- ----
- ////
- [source,console]
- ----
- POST /_reindex
- {
- "source": {
- "index": "archive"
- },
- "dest": {
- "index": "logs",
- "op_type": "create"
- }
- }
- ----
- // TEST[continued]
- You can also reindex documents from a data stream to an index, index
- alias, or data stream.
- The following reindex request copies documents from the `logs` data stream
- to the existing `archive` index alias. Because the destination is not a data
- stream, the `op_type` does not need to be specified.
- [source,console]
- ----
- POST /_reindex
- {
- "source": {
- "index": "logs"
- },
- "dest": {
- "index": "archive"
- }
- }
- ----
- // TEST[continued]
- [discrete]
- [[update-docs-in-a-data-stream-by-query]]
- === Update documents in a data stream by query
- You cannot send indexing or update requests for existing documents directly to a
- data stream. These prohibited requests include:
- * An <<docs-index_,index API>> request with an
- <<docs-index-api-op_type,`op_type`>> of `index`. The `op_type` parameter
- defaults to `index` for existing documents.
- * A <<docs-bulk,bulk API>> request using the `index` or `update`
- action.
- Instead, you can use the <<docs-update-by-query,update by query API>> to update
- documents in a data stream that matches a provided query.
- The following update by query request updates documents in the `logs` data
- stream with a `user.id` of `l7gk7f82`. The request uses a
- <<modules-scripting-using,script>> to assign matching documents a new `user.id`
- value of `XgdX0NoX`.
- [source,console]
- ----
- POST /logs/_update_by_query
- {
- "query": {
- "match": {
- "user.id": "l7gk7f82"
- }
- },
- "script": {
- "source": "ctx._source.user.id = params.new_id",
- "params": {
- "new_id": "XgdX0NoX"
- }
- }
- }
- ----
- [discrete]
- [[delete-docs-in-a-data-stream-by-query]]
- === Delete documents in a data stream by query
- You cannot send document deletion requests directly to a data stream. These
- prohibited requests include:
- * A <<docs-delete,delete API>> request
- * A <<docs-bulk,bulk API>> request using the `delete` action.
- Instead, you can use the <<docs-delete-by-query,delete by query API>> to delete
- documents in a data stream that matches a provided query.
- The following delete by query request deletes documents in the `logs` data
- stream with a `user.id` of `vlb44hny`.
- [source,console]
- ----
- POST /logs/_delete_by_query
- {
- "query": {
- "match": {
- "user.id": "vlb44hny"
- }
- }
- }
- ----
- [discrete]
- [[update-delete-docs-in-a-backing-index]]
- === Update or delete documents in a backing index
- Alternatively, you can update or delete documents in a data stream by sending
- the update or deletion request to the backing index containing the document. To
- do this, you first need to get:
- * The <<mapping-id-field,document ID>>
- * The name of the backing index that contains the document
- If you want to update a document, you must also get its current
- <<optimistic-concurrency-control,sequence number and primary term>>.
- You can use a <<search-a-data-stream,search request>> to retrieve this
- information.
- The following search request retrieves documents in the `logs` data stream with
- a `user.id` of `yWIumJd7`. By default, this search returns the document ID and
- backing index for any matching documents.
- The request includes a `"seq_no_primary_term": true` argument. This means the
- search also returns the sequence number and primary term for any matching
- documents.
- [source,console]
- ----
- GET /logs/_search
- {
- "seq_no_primary_term": true,
- "query": {
- "match": {
- "user.id": "yWIumJd7"
- }
- }
- }
- ----
- The API returns the following response. The `hits.hits` property contains
- information for any documents matching the search.
- [source,console-result]
- ----
- {
- "took": 20,
- "timed_out": false,
- "_shards": {
- "total": 3,
- "successful": 3,
- "skipped": 0,
- "failed": 0
- },
- "hits": {
- "total": {
- "value": 1,
- "relation": "eq"
- },
- "max_score": 0.2876821,
- "hits": [
- {
- "_index": ".ds-logs-000003", <1>
- "_id": "bfspvnIBr7VVZlfp2lqX", <2>
- "_seq_no": 0, <3>
- "_primary_term": 1, <4>
- "_score": 0.2876821,
- "_source": {
- "@timestamp": "2020-12-07T11:06:07.000Z",
- "user": {
- "id": "yWIumJd7"
- },
- "message": "Login successful"
- }
- }
- ]
- }
- }
- ----
- // TESTRESPONSE[s/"took": 20/"took": $body.took/]
- // TESTRESPONSE[s/"max_score": 0.2876821/"max_score": $body.hits.max_score/]
- // TESTRESPONSE[s/"_score": 0.2876821/"_score": $body.hits.hits.0._score/]
- <1> Backing index containing the matching document
- <2> Document ID for the document
- <3> Current sequence number for the document
- <4> Primary term for the document
- You can use an <<docs-index_,index API>> request to update an individual
- document. To prevent an accidental overwrite, this request must include valid
- `if_seq_no` and `if_primary_term` arguments.
- The following index API request updates an existing document in the `logs` data
- stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
- `.ds-logs-000003` backing index.
- The request also includes the current sequence number and primary term in the
- respective `if_seq_no` and `if_primary_term` query parameters. The request body
- contains a new JSON source for the document.
- [source,console]
- ----
- PUT /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
- {
- "@timestamp": "2020-12-07T11:06:07.000Z",
- "user": {
- "id": "8a4f500d"
- },
- "message": "Login successful"
- }
- ----
- You use the <<docs-delete,delete API>> to delete individual documents. Deletion
- requests do not require a sequence number or primary term.
- The following index API request deletes an existing document in the `logs` data
- stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
- `.ds-logs-000003` backing index.
- [source,console]
- ----
- DELETE /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX
- ----
- You can use the <<docs-bulk,bulk API>> to delete or update multiple documents in
- one request using `delete`, `index`, or `update` actions.
- If the action type is `index`, the action must include valid
- <<bulk-optimistic-concurrency-control,`if_seq_no` and `if_primary_term`>>
- arguments.
- The following bulk API request uses an `index` action to update an existing
- document in the `logs` data stream.
- The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the
- `.ds-logs-000003` backing index. The action also includes the current sequence
- number and primary term in the respective `if_seq_no` and `if_primary_term`
- parameters.
- [source,console]
- ----
- PUT /_bulk?refresh
- { "index": { "_index": ".ds-logs-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
- { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
- ----
|