瀏覽代碼

Document how to reindex a TSDS (#99476)

* Document how to reindex a TSDS

Time-series data streams require updating start and end times in the
destination index template, to avoid errors during copying of older
docs.

* Update docs/changelog/99476.yaml

* Spotless fix.

* Refresh indexes in unittest.

* Fix typo.

* Delete docs/changelog/99476.yaml

* Fix page link name.

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

* Update docs/reference/data-streams/tsds-reindex.asciidoc

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>

---------

Co-authored-by: Abdon Pijpelink <abdon.pijpelink@elastic.co>
Kostas Krikellas 2 年之前
父節點
當前提交
b1da97af17

+ 292 - 0
docs/reference/data-streams/tsds-reindex.asciidoc

@@ -0,0 +1,292 @@
+[[tsds-reindex]]
+=== Reindex a time series data stream (TSDS)
+
+++++
+<titleabbrev>Reindex a TSDS</titleabbrev>
+++++
+
+[discrete]
+[[tsds-reindex-intro]]
+==== Introduction
+
+With reindexing, you can copy documents from an old time-series data stream (TSDS) to a new one. Data streams support
+reindexing in general, with a few <<reindex-with-a-data-stream, restrictions>>. Still, time-series data streams
+introduce additional challenges due to tight control on the accepted timestamp range for each backing index they
+contain. Direct use of the reindex API would likely error out due to attempting to insert documents with timestamps that are
+outside the current acceptance window.
+
+To avoid these limitations, use the process that is outlined below:
+
+. Create an index template for the destination data stream that will contain the re-indexed data.
+. Update the template to
+.. Set `index.time_series.start_time` and `index.time_series.end_time` index settings to
+match the lowest and highest `@timestamp` values in the old data stream.
+.. Set the `index.number_of_shards` index setting to the sum of all primary shards of all backing
+indices of the old data stream.
+.. Set `index.number_of_replicas` to zero and unset the `index.lifecycle.name` index setting.
+. Run the reindex operation to completion.
+. Revert the overriden index settings in the destination index template.
+. Invoke the `rollover` api to create a new backing index that can receive new documents.
+
+NOTE: This process only applies to time-series data streams without <<downsampling, downsampling>> configuration. Data
+streams with downsampling can only be re-indexed by re-indexing their backing indexes individually and adding them to an
+empty destination data stream.
+
+In what follows, we elaborate on each step of the process with examples.
+
+[discrete]
+[[tsds-reindex-create-template]]
+==== Create a TSDS template to accept old documents
+
+Consider a TSDS with the following template:
+
+[source,console]
+----
+POST /_component_template/source_template
+{
+  "template": {
+    "settings": {
+      "index": {
+        "number_of_replicas": 2,
+        "number_of_shards": 2,
+        "mode": "time_series",
+        "routing_path": [ "metricset" ]
+      }
+    },
+    "mappings": {
+      "properties": {
+        "@timestamp": { "type": "date" },
+        "metricset": {
+          "type": "keyword",
+          "time_series_dimension": true
+        },
+        "k8s": {
+          "properties": {
+            "tx": { "type": "long" },
+            "rx": { "type": "long" }
+          }
+        }
+      }
+    }
+  }
+}
+
+POST /_index_template/1
+{
+  "index_patterns": [
+    "k8s*"
+  ],
+  "composed_of": [
+    "source_template"
+  ],
+  "data_stream": {}
+}
+----
+// TEST[skip: not expected to match the sample below]
+
+A possible output of `/k8s/_settings` looks like:
+
+[source,console-result]
+----
+
+{
+  ".ds-k8s-2023.09.01-000002": {
+    "settings": {
+      "index": {
+        "mode": "time_series",
+        "routing": {
+          "allocation": {
+            "include": {
+              "_tier_preference": "data_hot"
+            }
+          }
+        },
+        "hidden": "true",
+        "number_of_shards": "2",
+        "time_series": {
+          "end_time": "2023-09-01T14:00:00.000Z",
+          "start_time": "2023-09-01T10:00:00.000Z"
+        },
+        "provided_name": ".ds-k9s-2023.09.01-000002",
+        "creation_date": "1694439857608",
+        "number_of_replicas": "2",
+        "routing_path": [
+          "metricset"
+        ],
+        ...
+      }
+    }
+  },
+  ".ds-k8s-2023.09.01-000001": {
+    "settings": {
+      "index": {
+        "mode": "time_series",
+        "routing": {
+          "allocation": {
+            "include": {
+              "_tier_preference": "data_hot"
+            }
+          }
+        },
+        "hidden": "true",
+        "number_of_shards": "2",
+        "time_series": {
+          "end_time": "2023-09-01T10:00:00.000Z",
+          "start_time": "2023-09-01T06:00:00.000Z"
+        },
+        "provided_name": ".ds-k9s-2023.09.01-000001",
+        "creation_date": "1694439837126",
+        "number_of_replicas": "2",
+        "routing_path": [
+          "metricset"
+        ],
+        ...
+      }
+    }
+  }
+}
+----
+// NOTCONSOLE
+
+To reindex this TSDS, do not to re-use its index template in the destination data stream, to avoid impacting its
+functionality. Instead, clone the template of the source TSDS and apply the following modifications:
+
+* Set `index.time_series.start_time` and `index.time_series.end_time` index settings explicitly. Their values should be
+based on the lowest and highest `@timestamp` values in the data stream to reindex. This way, the initial backing index can
+load all data that is contained in the source data stream.
+* Set `index.number_of_shards` index setting to the sum of all primary shards of all backing indices of the source data
+stream. This helps maintain the same level of search parallelism, as each shard is processed in a separate thread (or
+more).
+* Unset the `index.lifecycle.name` index setting, if any. This prevents ILM from modifying the destination data stream
+during reindexing.
+* (Optional) Set `index.number_of_replicas` to zero. This helps speed up the reindex operation. Since the data gets
+copied, there is limited risk of data loss due to lack of replicas.
+
+Using the example above as source TSDS, the template for the destination TSDS would be:
+
+[source,console]
+----
+POST /_component_template/destination_template
+{
+  "template": {
+    "settings": {
+      "index": {
+        "number_of_replicas": 0,
+        "number_of_shards": 4,
+        "mode": "time_series",
+        "routing_path": [ "metricset" ],
+        "time_series": {
+          "end_time": "2023-09-01T14:00:00.000Z",
+          "start_time": "2023-09-01T06:00:00.000Z"
+        }
+      }
+    },
+    "mappings": {
+      "properties": {
+        "@timestamp": { "type": "date" },
+        "metricset": {
+          "type": "keyword",
+          "time_series_dimension": true
+        },
+        "k8s": {
+          "properties": {
+            "tx": { "type": "long" },
+            "rx": { "type": "long" }
+          }
+        }
+      }
+    }
+  }
+}
+
+POST /_index_template/2
+{
+  "index_patterns": [
+    "k8s*"
+  ],
+  "composed_of": [
+    "destination_template"
+  ],
+  "data_stream": {}
+}
+----
+// TEST[continued]
+
+[discrete]
+[[tsds-reindex-op]]
+==== Reindex
+
+Invoke the reindex api, for instance:
+
+[source,console]
+----
+POST /_reindex
+{
+  "source": {
+    "index": "k8s"
+  },
+  "dest": {
+    "index": "k9s",
+    "op_type": "create"
+  }
+}
+----
+// TEST[continued]
+
+[discrete]
+[[tsds-reindex-restore]]
+==== Restore the destination index template
+
+Once the reindexing operation completes, restore the index template for the destination TSDS as follows:
+
+* Remove the overrides for `index.time_series.start_time` and `index.time_series.end_time`.
+* Restore the values of `index.number_of_shards`, `index.number_of_replicas`  and  `index.lifecycle.name` as
+applicable.
+
+Using the previous example, the destination template is modified as follows:
+
+[source,console]
+----
+POST /_component_template/destination_template
+{
+  "template": {
+    "settings": {
+      "index": {
+        "number_of_replicas": 2,
+        "number_of_shards": 2,
+        "mode": "time_series",
+        "routing_path": [ "metricset" ]
+      }
+    },
+    "mappings": {
+      "properties": {
+        "@timestamp": { "type": "date" },
+        "metricset": {
+          "type": "keyword",
+          "time_series_dimension": true
+        },
+        "k8s": {
+          "properties": {
+            "tx": { "type": "long" },
+            "rx": { "type": "long" }
+          }
+        }
+      }
+    }
+  }
+}
+----
+// TEST[continued]
+
+Next, Invoke the `rollover` api on the destination data stream without any conditions set.
+
+[source,console]
+----
+POST /k9s/_rollover/
+----
+// TEST[continued]
+
+This creates a new backing index with the updated index settings. The destination data stream is now ready to accept new documents.
+
+Note that the initial backing index can still accept documents within the range of timestamps derived from the source data
+stream. If this is not desired, mark it as <<index-blocks-read-only, read-only>> explicitly.

+ 1 - 0
docs/reference/data-streams/tsds.asciidoc

@@ -340,3 +340,4 @@ include::tsds-index-settings.asciidoc[]
 include::downsampling.asciidoc[]
 include::downsampling-ilm.asciidoc[]
 include::downsampling-manual.asciidoc[]
+include::tsds-reindex.asciidoc[]

+ 168 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

@@ -683,6 +683,174 @@ public class TsdbDataStreamRestIT extends DisabledSecurityDataStreamTestCase {
         assertThat(endTimeFirstBackingIndex, notNullValue());
     }
 
+    public void testReindexTsdbDataStream() throws Exception {
+        var deleteRequest = new Request("DELETE", "/_index_template/1");
+        assertOK(client().performRequest(deleteRequest));
+        deleteRequest = new Request("DELETE", "/_component_template/custom_template");
+        assertOK(client().performRequest(deleteRequest));
+
+        final int SECONDS_PER_DAY = 24 * 60 * 60;
+        final String CUSTOM_TEMPLATE_WITH_START_END_TIME = """
+            {
+                "template": {
+                    "settings":{
+                        "index": {
+                            "number_of_replicas": 0,
+                            "number_of_shards": 4,
+                            "mode": "time_series",
+                            "routing_path": ["metricset", "k8s.pod.uid"],
+                            "time_series": {
+                                "start_time": "$start",
+                                "end_time": "$end"
+                            }
+                        }
+                    },
+                    "mappings":{
+                        "properties": {
+                            "@timestamp" : {
+                                "type": "date"
+                            },
+                            "metricset": {
+                                "type": "keyword",
+                                "time_series_dimension": true
+                            },
+                            "k8s": {
+                                "properties": {
+                                    "pod": {
+                                        "properties": {
+                                            "uid": {
+                                                "type": "keyword",
+                                                "time_series_dimension": true
+                                            },
+                                            "name": {
+                                                "type": "keyword"
+                                            },
+                                            "ip": {
+                                                "type": "ip"
+                                            },
+                                            "network": {
+                                                "properties": {
+                                                    "tx": {
+                                                        "type": "long"
+                                                    },
+                                                    "rx": {
+                                                        "type": "long"
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            """;
+
+        // Create a data stream that's one week old.
+        var request = new Request("POST", "/_component_template/source_template");
+        request.setJsonEntity(
+            CUSTOM_TEMPLATE_WITH_START_END_TIME.replace("$start", formatInstantNanos(Instant.now().minusSeconds(8 * SECONDS_PER_DAY)))
+                .replace("$end", formatInstantNanos(Instant.now().minusSeconds(6 * SECONDS_PER_DAY)))
+        );
+        assertOK(client().performRequest(request));
+
+        request = new Request("POST", "/_index_template/1");
+        request.setJsonEntity("""
+            {
+                "index_patterns": ["k8s*"],
+                "composed_of": ["source_template"],
+                "data_stream": {
+                }
+            }""");
+        assertOK(client().performRequest(request));
+
+        // Add some docs to it.
+        var bulkRequest = new Request("POST", "/k8s/_bulk");
+        bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now().minusSeconds(7 * SECONDS_PER_DAY))));
+        bulkRequest.addParameter("refresh", "true");
+        var response = client().performRequest(bulkRequest);
+        assertOK(response);
+        var responseBody = entityAsMap(response);
+        assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
+
+        // Clone the old data stream.
+        request = new Request("POST", "/_component_template/destination_template");
+        request.setJsonEntity(
+            CUSTOM_TEMPLATE_WITH_START_END_TIME.replace("$start", formatInstantNanos(Instant.now().minusSeconds(8 * SECONDS_PER_DAY)))
+                .replace("$end", formatInstantNanos(Instant.now().minusSeconds(6 * SECONDS_PER_DAY)))
+        );
+        assertOK(client().performRequest(request));
+
+        request = new Request("POST", "/_index_template/2");
+        request.setJsonEntity("""
+            {
+                "index_patterns": ["k9s*"],
+                "composed_of": ["destination_template"],
+                "data_stream": {
+                }
+            }""");
+        assertOK(client().performRequest(request));
+
+        // Reindex.
+        request = new Request("POST", "/_reindex");
+        request.setJsonEntity("""
+            {
+                "source": {
+                    "index": "k8s"
+                  },
+                  "dest": {
+                    "index": "k9s",
+                    "op_type": "create"
+                  }
+            }
+            """);
+        assertOK(client().performRequest(request));
+
+        var getDataStreamsRequest = new Request("GET", "/_data_stream");
+        response = client().performRequest(getDataStreamsRequest);
+        assertOK(response);
+        var dataStreams = entityAsMap(response);
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(2));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.1.name"), equalTo("k9s"));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.1.indices"), hasSize(1));
+
+        // Update the start and end time of the new data stream.
+        request = new Request("POST", "/_component_template/destination_template");
+        request.setJsonEntity(
+            CUSTOM_TEMPLATE_WITH_START_END_TIME.replace("$start", formatInstantNanos(Instant.now().minusSeconds(SECONDS_PER_DAY)))
+                .replace("$end", formatInstantNanos(Instant.now().plusSeconds(SECONDS_PER_DAY)))
+        );
+        assertOK(client().performRequest(request));
+
+        // Rollover to create a new index with the new settings.
+        request = new Request("POST", "/k9s/_rollover");
+        client().performRequest(request);
+
+        // Insert a doc with a current timestamp.
+        request = new Request("POST", "/k9s/_doc");
+        request.setJsonEntity(DOC.replace("$time", formatInstantNanos(Instant.now())));
+        assertOK(client().performRequest(request));
+
+        request = new Request("POST", "_refresh");
+        assertOK(client().performRequest(request));
+
+        var searchRequest = new Request("GET", "k9s/_search");
+        response = client().performRequest(searchRequest);
+        assertOK(response);
+        responseBody = entityAsMap(response);
+        try {
+            assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(9));
+            assertThat(ObjectPath.evaluate(responseBody, "hits.total.relation"), equalTo("eq"));
+        } catch (Exception | AssertionError e) {
+            logger.error("search response body causing assertion error [" + responseBody + "]", e);
+            throw e;
+        }
+    }
+
     private static Map<?, ?> getIndex(String indexName) throws IOException {
         var getIndexRequest = new Request("GET", "/" + indexName + "?human");
         var response = client().performRequest(getIndexRequest);