Selaa lähdekoodia

Add index.look_back_time setting for tsdb data streams (#98518)

This change adds a `index.look_back_time` index setting that sets the `index.time_series.start_time` setting for the first backing index when a data stream is created.

This allows accepting data that is older for initial indexing without changing the `index.look_ahead_time` setting. This setting also controls the `index.time_series.end_time` setting and would affect rollovers as well.

The default for the `index.look_back_time` is `2h`, which means documents with `@timestamp` up to 2 hours after creation of the data stream are allowed to be indexed. This is the same as is without this change, because `index.look_ahead_time` is used to set `index.time_series.start_time` of the first backing index.

Closes #98463
Martijn van Groningen 2 vuotta sitten
vanhempi
commit
3e3ee42589

+ 6 - 0
docs/changelog/98518.yaml

@@ -0,0 +1,6 @@
+pr: 98518
+summary: Add `index.look_back_time` setting for tsdb data streams
+area: TSDB
+type: enhancement
+issues:
+ - 98463

+ 1 - 0
docs/reference/data-streams/set-up-tsds.asciidoc

@@ -177,6 +177,7 @@ Optionally, the index settings component template for a TSDS can include:
 
 * Your lifecycle policy in the `index.lifecycle.name` index setting.
 * The <<tsds-look-ahead-time,`index.look_ahead_time`>> index setting.
+* The <<tsds-look-back-time,`index.look_back_time`>> index setting.
 * Other index settings, such as <<index-codec,`index.codec`>>, for your TSDS's
 backing indices.
 

+ 9 - 0
docs/reference/data-streams/tsds-index-settings.asciidoc

@@ -33,6 +33,15 @@ days). Only indices with an `index.mode` of `time_series` support this setting.
 For more information, refer to <<tsds-look-ahead-time>>. Additionally this setting
 can not be less than `time_series.poll_interval` cluster setting.
 
+[[index-look-back-time]]
+`index.look_back_time`::
+(<<_static_index_settings,Static>>, <<time-units,time units>>)
+Interval used to calculate the `index.time_series.start_time` for a TSDS's first
+backing index when a tsdb data stream is created. Defaults to `2h` (2 hours).
+Accepts `1m` (one minute) to `7d` (seven days). Only indices with an `index.mode`
+of `time_series` support this setting. For more information,
+refer to <<tsds-look-back-time>>.
+
 [[index-routing-path]] `index.routing_path`::
 (<<_static_index_settings,Static>>, string or array of strings) Plain `keyword`
 fields used to route documents in a TSDS to index shards. Supports wildcards

+ 16 - 0
docs/reference/data-streams/tsds.asciidoc

@@ -253,6 +253,22 @@ value borders the `index.time_series.start_time` for the new write index. This
 ensures the `@timestamp` ranges for neighboring backing indices always border
 but never overlap.
 
+[discrete]
+[[tsds-look-back-time]]
+==== Look-back time
+
+Use the <<index-look-back-time,`index.look_back_time`>> index setting to
+configure how far in the past you can add documents to an index. When you
+create a data stream for a TSDS, {es} calculates the index's
+`index.time_series.start_time` value as:
+
+`now - index.look_back_time`
+
+This setting is only used when a data stream gets created and controls
+the `index.time_series.start_time` index setting of the first backing index.
+Configuring this index setting can be useful to accept documents with `@timestamp`
+field values that are older than 2 hours (the `index.look_back_time` default).
+
 [discrete]
 [[tsds-accepted-time-range]]
 ==== Accepted time range for adding data

+ 65 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

@@ -10,7 +10,9 @@ package org.elasticsearch.datastreams;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.time.DateFormatter;
+import org.elasticsearch.common.time.DateFormatters;
 import org.elasticsearch.common.time.FormatNames;
+import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.test.rest.ObjectPath;
 import org.junit.Before;
 
@@ -618,6 +620,69 @@ public class TsdbDataStreamRestIT extends DisabledSecurityDataStreamTestCase {
         client().performRequest(request);
     }
 
+    public void testLookBackTime() throws IOException {
+        // Create template that uses index.look_back_time index setting:
+        String template = """
+            {
+                "index_patterns": ["test*"],
+                "template": {
+                    "settings":{
+                        "index": {
+                            "look_back_time": "24h",
+                            "number_of_replicas": 0,
+                            "mode": "time_series"
+                        }
+                    },
+                    "mappings":{
+                        "properties": {
+                            "@timestamp" : {
+                                "type": "date"
+                            },
+                            "field": {
+                                "type": "keyword",
+                                "time_series_dimension": true
+                            }
+                        }
+                    }
+                },
+                "data_stream": {}
+            }""";
+        var putIndexTemplateRequest = new Request("PUT", "/_index_template/2");
+        putIndexTemplateRequest.setJsonEntity(template);
+        assertOK(client().performRequest(putIndexTemplateRequest));
+
+        // Create data stream:
+        var createDataStreamRequest = new Request("PUT", "/_data_stream/test123");
+        assertOK(client().performRequest(createDataStreamRequest));
+
+        // Check data stream has been created:
+        var getDataStreamsRequest = new Request("GET", "/_data_stream");
+        var response = client().performRequest(getDataStreamsRequest);
+        assertOK(response);
+        var dataStreams = entityAsMap(response);
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("test123"));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("2"));
+        assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
+        String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
+        assertThat(firstBackingIndex, backingIndexEqualTo("test123", 1));
+
+        // Check the backing index:
+        // 2023-08-15T04:35:50.000Z
+        var indices = getIndex(firstBackingIndex);
+        var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
+        assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("test123"));
+        assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
+        String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
+        assertThat(startTimeFirstBackingIndex, notNullValue());
+        Instant now = Instant.now();
+        Instant startTime = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(startTimeFirstBackingIndex)).toInstant();
+        assertTrue(now.minus(24, ChronoUnit.HOURS).isAfter(startTime));
+        String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
+        assertThat(endTimeFirstBackingIndex, notNullValue());
+    }
+
     private static Map<?, ?> getIndex(String indexName) throws IOException {
         var getIndexRequest = new Request("GET", "/" + indexName + "?human");
         var response = client().performRequest(getIndexRequest);

+ 2 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

@@ -84,10 +84,11 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
                 if (indexMode == IndexMode.TIME_SERIES) {
                     Settings.Builder builder = Settings.builder();
                     TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings);
+                    TimeValue lookBackTime = DataStreamsPlugin.LOOK_BACK_TIME.get(allSettings);
                     final Instant start;
                     final Instant end;
                     if (dataStream == null || migrating) {
-                        start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis()));
+                        start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookBackTime.getMillis()));
                         end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis()));
                     } else {
                         IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());

+ 9 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

@@ -105,7 +105,14 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
         Setting.Property.Dynamic
     );
     public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";
-
+    public static final Setting<TimeValue> LOOK_BACK_TIME = Setting.timeSetting(
+        "index.look_back_time",
+        TimeValue.timeValueHours(2),
+        TimeValue.timeValueMinutes(1),
+        TimeValue.timeValueDays(7),
+        Setting.Property.IndexScope,
+        Setting.Property.Dynamic
+    );
     // The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
     private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
     private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();
@@ -141,6 +148,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
         List<Setting<?>> pluginSettings = new ArrayList<>();
         pluginSettings.add(TIME_SERIES_POLL_INTERVAL);
         pluginSettings.add(LOOK_AHEAD_TIME);
+        pluginSettings.add(LOOK_BACK_TIME);
         pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
         pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
         pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);

+ 25 - 3
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java

@@ -38,6 +38,7 @@ import static org.hamcrest.Matchers.equalTo;
 
 public class DataStreamIndexSettingsProviderTests extends ESTestCase {
 
+    private static final TimeValue DEFAULT_LOOK_BACK_TIME = TimeValue.timeValueHours(2); // default
     private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default
 
     DataStreamIndexSettingsProvider provider;
@@ -83,7 +84,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             List.of(new CompressedXContent(mapping))
         );
         assertThat(result.size(), equalTo(3));
-        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
         assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
         assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3"));
     }
@@ -235,10 +236,31 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             List.of(new CompressedXContent("{}"))
         );
         assertThat(result.size(), equalTo(2));
-        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
         assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis())));
     }
 
+    public void testGetAdditionalIndexSettingsLookBackTime() throws Exception {
+        Metadata metadata = Metadata.EMPTY_METADATA;
+        String dataStreamName = "logs-app1";
+
+        Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
+        TimeValue lookBackTime = TimeValue.timeValueHours(12);
+        Settings settings = builder().put("index.mode", "time_series").put("index.look_back_time", lookBackTime.getStringRep()).build();
+        Settings result = provider.getAdditionalIndexSettings(
+            DataStream.getDefaultBackingIndexName(dataStreamName, 1),
+            dataStreamName,
+            true,
+            metadata,
+            now,
+            settings,
+            List.of(new CompressedXContent("{}"))
+        );
+        assertThat(result.size(), equalTo(2));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookBackTime.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+    }
+
     public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() throws Exception {
         String dataStreamName = "logs-app1";
         TimeValue lookAheadTime = TimeValue.timeValueHours(2);
@@ -358,7 +380,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
             List.of()
         );
         assertThat(result.size(), equalTo(2));
-        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
+        assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
         assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
     }
 

+ 15 - 4
modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java

@@ -16,7 +16,6 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESSingleNodeTestCase;
 import org.junit.After;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
@@ -52,7 +51,7 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase {
         assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]"));
     }
 
-    public void testLookAheadTimeSetting() throws IOException {
+    public void testLookAheadTimeSetting() {
         var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build();
         updateIndexSettings(settings);
     }
@@ -69,6 +68,18 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase {
         assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]"));
     }
 
+    public void testLookBackTimeSettingToLow() {
+        var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "1s").build();
+        var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
+        assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_back_time], must be >= [1m]"));
+    }
+
+    public void testLookBackTimeSettingToHigh() {
+        var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "8d").build();
+        var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
+        assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_back_time], must be <= [7d]"));
+    }
+
     public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
         {
             var settings = Settings.builder()
@@ -99,7 +110,7 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase {
         }
     }
 
-    public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException {
+    public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() {
         var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build();
         updateClusterSettings(clusterSettings);
         var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build();
@@ -110,7 +121,7 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase {
         clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet();
     }
 
-    private void updateIndexSettings(Settings settings) throws IOException {
+    private void updateIndexSettings(Settings settings) {
         try {
             createIndex("test");
         } catch (ResourceAlreadyExistsException e) {