Browse Source

Introduce lazy rollover for mapping updates in data streams (#103309)

In this PR we implement the idea to introduce a flag, that a data stream needs to be rolloved over before the next document is indexed.
Mary Gouseti 1 year ago
parent
commit
046cdeae23
35 changed files with 868 additions and 168 deletions
  1. 6 0
      docs/changelog/103309.yaml
  2. 2 1
      docs/reference/data-streams/change-mappings-and-settings.asciidoc
  3. 1 0
      docs/reference/data-streams/downsampling-ilm.asciidoc
  4. 1 0
      docs/reference/data-streams/downsampling-manual.asciidoc
  5. 8 4
      docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc
  6. 13 1
      docs/reference/data-streams/use-a-data-stream.asciidoc
  7. 9 2
      docs/reference/indices/get-data-stream.asciidoc
  8. 24 0
      docs/reference/indices/rollover-index.asciidoc
  9. 183 0
      modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java
  10. 1 1
      qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java
  11. 10 1
      qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RolloverRestCancellationIT.java
  12. 5 0
      rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json
  13. 29 0
      server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java
  14. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  15. 3 0
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java
  16. 7 0
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java
  17. 23 1
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java
  18. 5 0
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java
  19. 27 4
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java
  20. 60 17
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  21. 60 10
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  22. 1 0
      server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
  23. 2 0
      server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java
  24. 69 13
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  25. 113 5
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  26. 1 0
      server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java
  27. 2 0
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestTests.java
  28. 29 82
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
  29. 163 21
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java
  30. 2 0
      server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
  31. 2 1
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  32. 2 1
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  33. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java
  34. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java
  35. 2 1
      x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/ProfilingDataStreamManagerTests.java

+ 6 - 0
docs/changelog/103309.yaml

@@ -0,0 +1,6 @@
+pr: 103309
+summary: Introduce lazy rollover for mapping updates in data streams
+area: Data streams
+type: enhancement
+issues:
+  - 89346

+ 2 - 1
docs/reference/data-streams/change-mappings-and-settings.asciidoc

@@ -592,7 +592,8 @@ stream's oldest backing index.
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }

+ 1 - 0
docs/reference/data-streams/downsampling-ilm.asciidoc

@@ -326,6 +326,7 @@ following. Note the original `index_name`: `.ds-datastream-<timestamp>-000001`.
       "system": false,
       "allow_custom_routing": false,
       "replicated": false,
+      "rollover_on_write": false,
       "time_series": {
         "temporal_ranges": [
           {

+ 1 - 0
docs/reference/data-streams/downsampling-manual.asciidoc

@@ -372,6 +372,7 @@ This returns:
       "system": false,
       "allow_custom_routing": false,
       "replicated": false,
+      "rollover_on_write": false,
       "time_series": {
         "temporal_ranges": [
           {

+ 8 - 4
docs/reference/data-streams/lifecycle/tutorial-migrate-data-stream-from-ilm-to-dsl.asciidoc

@@ -139,7 +139,8 @@ and that the next generation index will also be managed by {ilm-init}:
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }
@@ -275,7 +276,8 @@ GET _data_stream/dsl-data-stream
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }
@@ -352,7 +354,8 @@ GET _data_stream/dsl-data-stream
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }
@@ -449,7 +452,8 @@ GET _data_stream/dsl-data-stream
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }

+ 13 - 1
docs/reference/data-streams/use-a-data-stream.asciidoc

@@ -117,12 +117,24 @@ GET /_data_stream/my-data-stream/_stats?human=true
 === Manually roll over a data stream
 
 Use the <<indices-rollover-index,rollover API>> to manually
-<<data-streams-rollover,roll over>> a data stream:
+<<data-streams-rollover,roll over>> a data stream. You have
+two options when manually rolling over:
 
+1. To immediately trigger a rollover:
++
 [source,console]
 ----
 POST /my-data-stream/_rollover/
 ----
+2. Or to postpone the rollover until the next indexing event occurs:
++
+[source,console]
+----
+POST /my-data-stream/_rollover?lazy
+----
++
+Use the second to avoid having empty backing indices in data streams
+that do not get updated often.
 
 [discrete]
 [[open-closed-backing-indices]]

+ 9 - 2
docs/reference/indices/get-data-stream.asciidoc

@@ -262,6 +262,11 @@ The conditions which will trigger the rollover of a backing index as configured
 `cluster.lifecycle.default.rollover`. This property is an implementation detail and it will only be retrieved when the query
 param `include_defaults` is set to `true`. The contents of this field are subject to change.
 =====
+
+`rollover_on_write`::
+(Boolean)
+If `true`, the next write to this data stream will trigger a rollover first and the document will be
+indexed in the new backing index. If the rollover fails the indexing request will fail too.
 ====
 
 [[get-data-stream-api-example]]
@@ -311,7 +316,8 @@ The API returns the following response:
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     },
     {
       "name": "my-data-stream-two",
@@ -339,7 +345,8 @@ The API returns the following response:
       "hidden": false,
       "system": false,
       "allow_custom_routing": false,
-      "replicated": false
+      "replicated": false,
+      "rollover_on_write": false
     }
   ]
 }

+ 24 - 0
docs/reference/indices/rollover-index.asciidoc

@@ -114,6 +114,11 @@ include::{es-repo-dir}/indices/create-index.asciidoc[tag=index-name-reqs]
 If `true`, checks whether the current index satisfies the specified
 `conditions` but does not perform a rollover. Defaults to `false`.
 
+`lazy`::
+(Optional, Boolean)
+If `true`, signals that the data stream will be rolled over when the next
+indexing operation occurs. Applies only to data streams. Defaults to `false`.
+
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards]
 
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
@@ -204,6 +209,11 @@ conditions were specified, this is an empty object.
 index met the condition.
 ====
 
+`lazy`::
+(Boolean)
+If `true`, {es} did not perform the rollover, but successfully marked the data stream to be rolled
+over at the next indexing event.
+
 [[rollover-index-api-example]]
 ==== {api-examples-title}
 
@@ -218,6 +228,17 @@ POST my-data-stream/_rollover
 ----
 // TEST[setup:my_data_stream]
 
+The following request rolls over a data stream lazily, meaning that the data stream
+will roll over at the next indexing event. This ensures that mapping and setting changes
+will be applied to the coming data, but it will avoid creating extra backing indices for
+data streams with slow ingestion.
+
+[source,console]
+----
+POST my-data-stream/_rollover?lazy
+----
+// TEST[continued]
+
 :target: data stream
 :index: write index
 
@@ -257,6 +278,7 @@ The API returns:
   "new_index": ".ds-my-data-stream-2099.05.07-000002",
   "rolled_over": true,
   "dry_run": false,
+  "lazy": false,
   "conditions": {
     "[max_age: 7d]": false,
     "[max_docs: 1000]": true,
@@ -328,6 +350,7 @@ The API returns:
   "new_index": "my-index-2099.05.07-000002",
   "rolled_over": true,
   "dry_run": false,
+  "lazy": false,
   "conditions": {
     "[max_age: 7d]": false,
     "[max_docs: 1000]": true,
@@ -399,6 +422,7 @@ The API returns:
   "new_index": "my-index-2099.05.07-000002",
   "rolled_over": true,
   "dry_run": false,
+  "lazy": false,
   "conditions": {
     "[max_age: 7d]": false,
     "[max_docs: 1000]": true,

+ 183 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java

@@ -0,0 +1,183 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.datastreams;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.endsWith;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+
+public class LazyRolloverDataStreamIT extends DisabledSecurityDataStreamTestCase {
+
+    @SuppressWarnings("unchecked")
+    public void testLazyRollover() throws Exception {
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
+        putComposableIndexTemplateRequest.setJsonEntity("""
+            {
+              "index_patterns": ["lazy-ds*"],
+              "data_stream": {}
+            }
+            """);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        String dataStreamName = "lazy-ds";
+
+        Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");
+        assertOK(client().performRequest(createDocRequest));
+
+        final Response rolloverResponse = client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover?lazy"));
+        Map<String, Object> rolloverResponseMap = entityAsMap(rolloverResponse);
+        assertThat((String) rolloverResponseMap.get("old_index"), startsWith(".ds-lazy-ds-"));
+        assertThat((String) rolloverResponseMap.get("old_index"), endsWith("-000001"));
+        assertThat((String) rolloverResponseMap.get("new_index"), startsWith(".ds-lazy-ds-"));
+        assertThat((String) rolloverResponseMap.get("new_index"), endsWith("-000002"));
+        assertThat(rolloverResponseMap.get("lazy"), equalTo(true));
+        assertThat(rolloverResponseMap.get("dry_run"), equalTo(false));
+        assertThat(rolloverResponseMap.get("acknowledged"), equalTo(true));
+        assertThat(rolloverResponseMap.get("rolled_over"), equalTo(false));
+        assertThat(rolloverResponseMap.get("conditions"), equalTo(Map.of()));
+
+        {
+            final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
+            List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), equalTo(dataStreamName));
+            assertThat(dataStream.get("rollover_on_write"), is(true));
+            assertThat(((List<Object>) dataStream.get("indices")).size(), is(1));
+        }
+
+        createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
+        assertOK(client().performRequest(createDocRequest));
+
+        {
+            final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
+            List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), equalTo(dataStreamName));
+            assertThat(dataStream.get("rollover_on_write"), is(false));
+            assertThat(((List<Object>) dataStream.get("indices")).size(), is(2));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testLazyRolloverFailsIndexing() throws Exception {
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
+        putComposableIndexTemplateRequest.setJsonEntity("""
+            {
+              "index_patterns": ["lazy-ds*"],
+              "data_stream": {}
+            }
+            """);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        String dataStreamName = "lazy-ds";
+
+        Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");
+        assertOK(client().performRequest(createDocRequest));
+
+        Request updateClusterSettingsRequest = new Request("PUT", "_cluster/settings");
+        updateClusterSettingsRequest.setJsonEntity("""
+            {
+              "persistent": {
+                "cluster.max_shards_per_node": 1
+              }
+            }""");
+        assertAcknowledged(client().performRequest(updateClusterSettingsRequest));
+
+        final Response rolloverResponse = client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover?lazy"));
+        Map<String, Object> rolloverResponseMap = entityAsMap(rolloverResponse);
+        assertThat((String) rolloverResponseMap.get("old_index"), startsWith(".ds-lazy-ds-"));
+        assertThat((String) rolloverResponseMap.get("old_index"), endsWith("-000001"));
+        assertThat((String) rolloverResponseMap.get("new_index"), startsWith(".ds-lazy-ds-"));
+        assertThat((String) rolloverResponseMap.get("new_index"), endsWith("-000002"));
+        assertThat(rolloverResponseMap.get("lazy"), equalTo(true));
+        assertThat(rolloverResponseMap.get("dry_run"), equalTo(false));
+        assertThat(rolloverResponseMap.get("acknowledged"), equalTo(true));
+        assertThat(rolloverResponseMap.get("rolled_over"), equalTo(false));
+        assertThat(rolloverResponseMap.get("conditions"), equalTo(Map.of()));
+
+        {
+            final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
+            List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), equalTo(dataStreamName));
+            assertThat(dataStream.get("rollover_on_write"), is(true));
+            assertThat(((List<Object>) dataStream.get("indices")).size(), is(1));
+        }
+
+        try {
+            createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+            createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
+            client().performRequest(createDocRequest);
+            fail("Indexing should have failed.");
+        } catch (ResponseException responseException) {
+            assertThat(responseException.getMessage(), containsString("this action would add [2] shards"));
+        }
+
+        updateClusterSettingsRequest = new Request("PUT", "_cluster/settings");
+        updateClusterSettingsRequest.setJsonEntity("""
+            {
+              "persistent": {
+                "cluster.max_shards_per_node": null
+              }
+            }""");
+        assertAcknowledged(client().performRequest(updateClusterSettingsRequest));
+        createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
+        assertOK(client().performRequest(createDocRequest));
+        {
+            final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
+            List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
+            assertThat(dataStreams.size(), is(1));
+            Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
+            assertThat(dataStream.get("name"), equalTo(dataStreamName));
+            assertThat(dataStream.get("rollover_on_write"), is(false));
+            assertThat(((List<Object>) dataStream.get("indices")).size(), is(2));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testLazyRolloverWithConditions() throws Exception {
+        Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
+        putComposableIndexTemplateRequest.setJsonEntity("""
+            {
+              "index_patterns": ["lazy-ds*"],
+              "data_stream": {}
+            }
+            """);
+        assertOK(client().performRequest(putComposableIndexTemplateRequest));
+
+        String dataStreamName = "lazy-ds";
+
+        Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
+        createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");
+
+        assertOK(client().performRequest(createDocRequest));
+
+        Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover?lazy");
+        rolloverRequest.setJsonEntity("{\"conditions\": {\"max_docs\": 1}}");
+        ResponseException responseError = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest));
+        assertThat(responseError.getResponse().getStatusLine().getStatusCode(), is(400));
+        assertThat(responseError.getMessage(), containsString("only without any conditions"));
+    }
+}

+ 1 - 1
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java

@@ -55,7 +55,7 @@ import static org.hamcrest.Matchers.not;
  */
 public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {
 
-    private static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
+    protected static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
         "index.block_searcher",
         false,
         Setting.Property.IndexScope

+ 10 - 1
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RolloverRestCancellationIT.java

@@ -9,12 +9,21 @@
 package org.elasticsearch.http;
 
 import org.apache.http.client.methods.HttpPost;
+import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.common.settings.Settings;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 
 public class RolloverRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
 
     public void testRolloverRestCancellation() throws Exception {
-        runTest(new Request(HttpPost.METHOD_NAME, "test/_rollover"), RolloverAction.NAME);
+        assertAcked(
+            prepareCreate("test-000001").addAlias(new Alias("test-alias").writeIndex(true))
+                .setSettings(Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true))
+        );
+        ensureGreen("test-000001");
+        runTest(new Request(HttpPost.METHOD_NAME, "test-alias/_rollover"), RolloverAction.NAME);
     }
 }

+ 5 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json

@@ -58,6 +58,11 @@
       "wait_for_active_shards":{
         "type":"string",
         "description":"Set the number of active shards to wait for on the newly created rollover index before the operation returns."
+      },
+      "lazy":{
+        "type":"boolean",
+        "default":"false",
+        "description":"If set to true, the rollover action will only mark a data stream to signal that it needs to be rolled over at the next write. Only allowed on data streams."
       }
     },
     "body":{

+ 29 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/rollover/RolloverIT.java

@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.is;
@@ -272,6 +273,34 @@ public class RolloverIT extends ESIntegTestCase {
         assertNull(newIndex);
     }
 
+    public void testRolloverLazy() throws Exception {
+        if (randomBoolean()) {
+            PutIndexTemplateRequestBuilder putTemplate = indicesAdmin().preparePutTemplate("test_index")
+                .setPatterns(List.of("test_index-*"))
+                .setOrder(-1)
+                .setSettings(Settings.builder().put(AutoExpandReplicas.SETTING.getKey(), "0-all"));
+            assertAcked(putTemplate.get());
+        }
+        assertAcked(prepareCreate("test_index-1").addAlias(new Alias("test_alias")).get());
+        indexDoc("test_index-1", "1", "field", "value");
+        flush("test_index-1");
+        ensureGreen();
+
+        IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
+            RolloverConditions.Builder rolloverConditionsBuilder = RolloverConditions.newBuilder();
+            if (randomBoolean()) {
+                rolloverConditionsBuilder.addMaxIndexDocsCondition(1L);
+            }
+            indicesAdmin().prepareRolloverIndex("test_alias")
+                .dryRun(randomBoolean())
+                .lazy(true)
+                .setConditions(rolloverConditionsBuilder)
+                .get();
+        });
+        assertThat(exception.getMessage(), containsString("can be applied only on a data stream"));
+
+    }
+
     public void testRolloverConditionsNotMet() throws Exception {
         boolean explicitWriteIndex = randomBoolean();
         Alias testAlias = new Alias("test_alias");

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -178,6 +178,7 @@ public class TransportVersions {
     public static final TransportVersion SNAPSHOTS_IN_PROGRESS_TRACKING_REMOVING_NODES_ADDED = def(8_566_00_0);
     public static final TransportVersion SMALLER_RELOAD_SECURE_SETTINGS_REQUEST = def(8_567_00_0);
     public static final TransportVersion UPDATE_API_KEY_EXPIRATION_TIME_ADDED = def(8_568_00_0);
+    public static final TransportVersion LAZY_ROLLOVER_ADDED = def(8_569_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 3 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
 import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
 import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
 import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
@@ -290,6 +291,7 @@ public class MetadataRolloverService {
         createIndexClusterStateRequest.setMatchingTemplate(templateV2);
         assert createIndexClusterStateRequest.performReroute() == false
             : "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";
+
         ClusterState newState = createIndexService.applyCreateIndexRequest(
             currentState,
             createIndexClusterStateRequest,
@@ -312,6 +314,7 @@ public class MetadataRolloverService {
         metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);
 
         newState = ClusterState.builder(newState).metadata(metadataBuilder).build();
+        newState = MetadataDataStreamsService.setRolloverOnWrite(newState, dataStreamName, false);
 
         return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
     }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverConditions.java

@@ -113,6 +113,13 @@ public class RolloverConditions implements Writeable, ToXContentObject {
         return conditions.values().stream().anyMatch(c -> Condition.Type.MIN == c.type());
     }
 
+    /**
+     * Returns true if there is at least one condition of any type
+     */
+    public boolean hasConditions() {
+        return conditions.isEmpty() == false;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeNamedWriteableCollection(

+ 23 - 1
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

@@ -7,6 +7,7 @@
  */
 package org.elasticsearch.action.admin.indices.rollover;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -96,6 +97,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
     private String rolloverTarget;
     private String newIndexName;
     private boolean dryRun;
+    private boolean lazy;
     private RolloverConditions conditions = new RolloverConditions();
     // the index name "_na_" is never read back, what matters are settings, mappings and aliases
     private CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
@@ -107,6 +109,11 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         dryRun = in.readBoolean();
         conditions = new RolloverConditions(in);
         createIndexRequest = new CreateIndexRequest(in);
+        if (in.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED)) {
+            lazy = in.readBoolean();
+        } else {
+            lazy = false;
+        }
     }
 
     RolloverRequest() {}
@@ -142,6 +149,9 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         out.writeBoolean(dryRun);
         conditions.writeTo(out);
         createIndexRequest.writeTo(out);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED)) {
+            out.writeBoolean(lazy);
+        }
     }
 
     @Override
@@ -194,6 +204,13 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         this.conditions = conditions;
     }
 
+    /**
+     * Sets if an unconditional rollover should wait for a document to come before it gets executed
+     */
+    public void lazy(boolean lazy) {
+        this.lazy = lazy;
+    }
+
     public boolean isDryRun() {
         return dryRun;
     }
@@ -214,6 +231,10 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         return newIndexName;
     }
 
+    public boolean isLazy() {
+        return lazy;
+    }
+
     /**
      * Given the results of evaluating each individual condition, determine whether the rollover request should proceed -- that is,
      * whether the conditions are met.
@@ -257,6 +278,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
         }
         RolloverRequest that = (RolloverRequest) o;
         return dryRun == that.dryRun
+            && lazy == that.lazy
             && Objects.equals(rolloverTarget, that.rolloverTarget)
             && Objects.equals(newIndexName, that.newIndexName)
             && Objects.equals(conditions, that.conditions)
@@ -265,6 +287,6 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
 
     @Override
     public int hashCode() {
-        return Objects.hash(rolloverTarget, newIndexName, dryRun, conditions, createIndexRequest);
+        return Objects.hash(rolloverTarget, newIndexName, dryRun, conditions, createIndexRequest, lazy);
     }
 }

+ 5 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java

@@ -43,6 +43,11 @@ public class RolloverRequestBuilder extends MasterNodeOperationRequestBuilder<Ro
         return this;
     }
 
+    public RolloverRequestBuilder lazy(boolean lazy) {
+        this.request.lazy(lazy);
+        return this;
+    }
+
     public RolloverRequestBuilder settings(Settings settings) {
         this.request.getCreateIndexRequest().settings(settings);
         return this;

+ 27 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java

@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.TransportVersions.LAZY_ROLLOVER_ADDED;
+
 /**
  * Response object for {@link RolloverRequest} API
  *
@@ -32,6 +34,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
     private static final ParseField OLD_INDEX = new ParseField("old_index");
     private static final ParseField DRY_RUN = new ParseField("dry_run");
     private static final ParseField ROLLED_OVER = new ParseField("rolled_over");
+    private static final ParseField LAZY = new ParseField("lazy");
     private static final ParseField CONDITIONS = new ParseField("conditions");
 
     private final String oldIndex;
@@ -39,9 +42,10 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
     private final Map<String, Boolean> conditionStatus;
     private final boolean dryRun;
     private final boolean rolledOver;
-    // Needs to be duplicated, because shardsAcknowledged gets (de)serailized as last field whereas
-    // in other subclasses of ShardsAcknowledgedResponse this field (de)serailized as first field.
+    // Needs to be duplicated, because shardsAcknowledged gets (de)serialized as last field whereas
+    // in other subclasses of ShardsAcknowledgedResponse this field (de)serialized as first field.
     private final boolean shardsAcknowledged;
+    private final boolean lazy;
 
     RolloverResponse(StreamInput in) throws IOException {
         super(in, false);
@@ -55,6 +59,11 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         dryRun = in.readBoolean();
         rolledOver = in.readBoolean();
         shardsAcknowledged = in.readBoolean();
+        if (in.getTransportVersion().onOrAfter(LAZY_ROLLOVER_ADDED)) {
+            lazy = in.readBoolean();
+        } else {
+            lazy = false;
+        }
     }
 
     public RolloverResponse(
@@ -64,7 +73,8 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         boolean dryRun,
         boolean rolledOver,
         boolean acknowledged,
-        boolean shardsAcknowledged
+        boolean shardsAcknowledged,
+        boolean lazy
     ) {
         super(acknowledged, shardsAcknowledged);
         this.oldIndex = oldIndex;
@@ -73,6 +83,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         this.rolledOver = rolledOver;
         this.conditionStatus = conditionResults;
         this.shardsAcknowledged = shardsAcknowledged;
+        this.lazy = lazy;
     }
 
     /**
@@ -115,6 +126,13 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         return shardsAcknowledged;
     }
 
+    /**
+     * Returns true if the rollover has been lazily applied, meaning the target will rollover when the next document will get indexed.
+     */
+    public boolean isLazy() {
+        return lazy;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
@@ -124,6 +142,9 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         out.writeBoolean(dryRun);
         out.writeBoolean(rolledOver);
         out.writeBoolean(shardsAcknowledged);
+        if (out.getTransportVersion().onOrAfter(LAZY_ROLLOVER_ADDED)) {
+            out.writeBoolean(lazy);
+        }
     }
 
     @Override
@@ -133,6 +154,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
         builder.field(NEW_INDEX.getPreferredName(), newIndex);
         builder.field(ROLLED_OVER.getPreferredName(), rolledOver);
         builder.field(DRY_RUN.getPreferredName(), dryRun);
+        builder.field(LAZY.getPreferredName(), lazy);
         builder.startObject(CONDITIONS.getPreferredName());
         for (Map.Entry<String, Boolean> entry : conditionStatus.entrySet()) {
             builder.field(entry.getKey(), entry.getValue());
@@ -146,6 +168,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
             RolloverResponse that = (RolloverResponse) o;
             return dryRun == that.dryRun
                 && rolledOver == that.rolledOver
+                && lazy == that.lazy
                 && Objects.equals(oldIndex, that.oldIndex)
                 && Objects.equals(newIndex, that.newIndex)
                 && Objects.equals(conditionStatus, that.conditionStatus);
@@ -155,6 +178,6 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), oldIndex, newIndex, conditionStatus, dryRun, rolledOver);
+        return Objects.hash(super.hashCode(), oldIndex, newIndex, conditionStatus, dryRun, rolledOver, lazy);
     }
 }

+ 60 - 17
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadataStats;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -67,6 +68,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
 
     private final Client client;
     private final MasterServiceTaskQueue<RolloverTask> rolloverTaskQueue;
+    private final MetadataDataStreamsService metadataDataStreamsService;
 
     @Inject
     public TransportRolloverAction(
@@ -77,7 +79,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         IndexNameExpressionResolver indexNameExpressionResolver,
         MetadataRolloverService rolloverService,
         Client client,
-        AllocationService allocationService
+        AllocationService allocationService,
+        MetadataDataStreamsService metadataDataStreamsService
     ) {
         super(
             RolloverAction.NAME,
@@ -96,6 +99,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             Priority.NORMAL,
             new RolloverExecutor(clusterService, allocationService, rolloverService, threadPool)
         );
+        this.metadataDataStreamsService = metadataDataStreamsService;
     }
 
     @Override
@@ -118,12 +122,61 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
     protected void masterOperation(
         Task task,
         final RolloverRequest rolloverRequest,
-        final ClusterState oldState,
+        final ClusterState clusterState,
         final ActionListener<RolloverResponse> listener
     ) throws Exception {
 
         assert task instanceof CancellableTask;
-        Metadata metadata = oldState.metadata();
+        Metadata metadata = clusterState.metadata();
+        // We evaluate the names of the index for which we should evaluate conditions, as well as what our newly created index *would* be.
+        final MetadataRolloverService.NameResolution trialRolloverNames = MetadataRolloverService.resolveRolloverNames(
+            clusterState,
+            rolloverRequest.getRolloverTarget(),
+            rolloverRequest.getNewIndexName(),
+            rolloverRequest.getCreateIndexRequest()
+        );
+        final String trialSourceIndexName = trialRolloverNames.sourceName();
+        final String trialRolloverIndexName = trialRolloverNames.rolloverName();
+        MetadataRolloverService.validateIndexName(clusterState, trialRolloverIndexName);
+
+        boolean isDataStream = metadata.dataStreams().containsKey(rolloverRequest.getRolloverTarget());
+        if (rolloverRequest.isLazy()) {
+            if (isDataStream == false || rolloverRequest.getConditions().hasConditions()) {
+                String message;
+                if (isDataStream) {
+                    message = "Lazy rollover can be used only without any conditions."
+                        + " Please remove the conditions from the request body or the query parameter 'lazy'.";
+                } else if (rolloverRequest.getConditions().hasConditions() == false) {
+                    message = "Lazy rollover can be applied only on a data stream." + " Please remove the query parameter 'lazy'.";
+                } else {
+                    message = "Lazy rollover can be applied only on a data stream with no conditions."
+                        + " Please remove the query parameter 'lazy'.";
+                }
+                listener.onFailure(new IllegalArgumentException(message));
+                return;
+            }
+            if (rolloverRequest.isDryRun() == false) {
+                metadataDataStreamsService.setRolloverOnWrite(
+                    rolloverRequest.getRolloverTarget(),
+                    true,
+                    rolloverRequest.ackTimeout(),
+                    rolloverRequest.masterNodeTimeout(),
+                    listener.map(
+                        response -> new RolloverResponse(
+                            trialSourceIndexName,
+                            trialRolloverIndexName,
+                            Map.of(),
+                            false,
+                            false,
+                            response.isAcknowledged(),
+                            false,
+                            response.isAcknowledged()
+                        )
+                    )
+                );
+                return;
+            }
+        }
 
         IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getRolloverTarget())
             .clear()
@@ -140,18 +193,6 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             statsRequest,
 
             listener.delegateFailureAndWrap((delegate, statsResponse) -> {
-                // Now that we have the stats for the cluster, we need to know the names of the index for which we should evaluate
-                // conditions, as well as what our newly created index *would* be.
-                final MetadataRolloverService.NameResolution trialRolloverNames = MetadataRolloverService.resolveRolloverNames(
-                    oldState,
-                    rolloverRequest.getRolloverTarget(),
-                    rolloverRequest.getNewIndexName(),
-                    rolloverRequest.getCreateIndexRequest()
-                );
-                final String trialSourceIndexName = trialRolloverNames.sourceName();
-                final String trialRolloverIndexName = trialRolloverNames.rolloverName();
-
-                MetadataRolloverService.validateIndexName(oldState, trialRolloverIndexName);
 
                 // Evaluate the conditions, so that we can tell without a cluster state update whether a rollover would occur.
                 final Map<String, Boolean> trialConditionResults = evaluateConditions(
@@ -166,7 +207,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                     rolloverRequest.isDryRun(),
                     false,
                     false,
-                    false
+                    false,
+                    rolloverRequest.isLazy()
                 );
 
                 // If this is a dry run, return with the results without invoking a cluster state update
@@ -366,7 +408,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                                     false,
                                     true,
                                     true,
-                                    isShardsAcknowledged
+                                    isShardsAcknowledged,
+                                    false
                                 )
                             )
                     );

+ 60 - 10
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -25,6 +25,9 @@ import org.elasticsearch.action.RoutingMissingException;
 import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.IngestActionForwarder;
 import org.elasticsearch.action.support.ActionFilters;
@@ -345,7 +348,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         // Step 1: collect all the indices in the request
         final Map<String, Boolean> indices = bulkRequest.requests.stream()
             // delete requests should not attempt to create the index (if the index does not
-            // exists), unless an external versioning is used
+            // exist), unless an external versioning is used
             .filter(
                 request -> request.opType() != DocWriteRequest.OpType.DELETE
                     || request.versionType() == VersionType.EXTERNAL
@@ -366,20 +369,28 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             }
         }
 
-        // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
+        // Step 3: Collect all the data streams that need to be rolled over before writing
+        Set<String> dataStreamsToBeRolledOver = indices.keySet().stream().filter(target -> {
+            DataStream dataStream = state.metadata().dataStreams().get(target);
+            return dataStream != null && dataStream.rolloverOnWrite();
+        }).collect(Collectors.toSet());
+
+        // Step 4: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
         createMissingIndicesAndIndexData(
             task,
             bulkRequest,
             executorName,
             listener,
             autoCreateIndices,
+            dataStreamsToBeRolledOver,
             indicesThatCannotBeCreated,
             startTime
         );
     }
 
     /*
-     * This method is responsible for creating any missing indices and indexing the data in the BulkRequest
+     * This method is responsible for creating any missing indices, rolling over a data stream when needed and then
+     *  indexing the data in the BulkRequest
      */
     protected void createMissingIndicesAndIndexData(
         Task task,
@@ -387,12 +398,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         String executorName,
         ActionListener<BulkResponse> listener,
         Set<String> autoCreateIndices,
+        Set<String> dataStreamsToBeRolledOver,
         Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
         long startTime
     ) {
         final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
         // Optimizing when there are no prerequisite actions
-        if (autoCreateIndices.isEmpty()) {
+        if (autoCreateIndices.isEmpty() && dataStreamsToBeRolledOver.isEmpty()) {
             executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
             return;
         }
@@ -417,16 +429,48 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                             }
                         } else if ((cause instanceof ResourceAlreadyExistsException) == false) {
                             // fail all requests involving this index, if create didn't work
-                            for (int i = 0; i < bulkRequest.requests.size(); i++) {
-                                DocWriteRequest<?> request = bulkRequest.requests.get(i);
-                                if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
-                                    bulkRequest.requests.set(i, null);
-                                }
-                            }
+                            failRequestsWhenPrerequisiteActionFailed(index, bulkRequest, responses, e);
                         }
                     }
                 }, refs.acquire()));
             }
+            for (String dataStream : dataStreamsToBeRolledOver) {
+                rolloverDataStream(dataStream, bulkRequest.timeout(), ActionListener.releaseAfter(new ActionListener<>() {
+
+                    @Override
+                    public void onResponse(RolloverResponse result) {
+                        // A successful response has rolled_over false when in the following cases:
+                        // - A request had the parameter lazy or dry_run enabled
+                        // - A request had conditions that were not met
+                        // Since none of the above apply, getting a response with rolled_over false is considered a bug
+                        // that should be caught here and inform the developer.
+                        assert result.isRolledOver()
+                            : "An successful unconditional rollover should always result in a rolled over data stream";
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        failRequestsWhenPrerequisiteActionFailed(dataStream, bulkRequest, responses, e);
+                    }
+                }, refs.acquire()));
+            }
+        }
+    }
+
+    /**
+     * Fails all requests involving this index or data stream because the prerequisite action failed too.
+     */
+    private static void failRequestsWhenPrerequisiteActionFailed(
+        String target,
+        BulkRequest bulkRequest,
+        AtomicArray<BulkItemResponse> responses,
+        Exception error
+    ) {
+        for (int i = 0; i < bulkRequest.requests.size(); i++) {
+            DocWriteRequest<?> request = bulkRequest.requests.get(i);
+            if (request != null && setResponseFailureIfIndexMatches(responses, i, request, target, error)) {
+                bulkRequest.requests.set(i, null);
+            }
         }
     }
 
@@ -528,6 +572,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
     }
 
+    void rolloverDataStream(String dataStream, TimeValue timeout, ActionListener<RolloverResponse> listener) {
+        RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null);
+        rolloverRequest.masterNodeTimeout(timeout);
+        client.execute(RolloverAction.INSTANCE, rolloverRequest, listener);
+    }
+
     private static boolean setResponseFailureIfIndexMatches(
         AtomicArray<BulkItemResponse> responses,
         int idx,

+ 1 - 0
server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

@@ -70,6 +70,7 @@ public class TransportSimulateBulkAction extends TransportBulkAction {
         String executorName,
         ActionListener<BulkResponse> listener,
         Set<String> autoCreateIndices,
+        Set<String> dataStreamsToRollover,
         Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
         long startTime
     ) {

+ 2 - 0
server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java

@@ -174,6 +174,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             public static final ParseField SYSTEM_FIELD = new ParseField("system");
             public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
             public static final ParseField REPLICATED = new ParseField("replicated");
+            public static final ParseField ROLLOVER_ON_WRITE = new ParseField("rollover_on_write");
             public static final ParseField TIME_SERIES = new ParseField("time_series");
             public static final ParseField TEMPORAL_RANGES = new ParseField("temporal_ranges");
             public static final ParseField TEMPORAL_RANGE_START = new ParseField("start");
@@ -345,6 +346,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem());
                 builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), dataStream.isAllowCustomRouting());
                 builder.field(REPLICATED.getPreferredName(), dataStream.isReplicated());
+                builder.field(ROLLOVER_ON_WRITE.getPreferredName(), dataStream.rolloverOnWrite());
                 if (DataStream.isFailureStoreEnabled()) {
                     builder.field(DataStream.FAILURE_STORE_FIELD.getPreferredName(), dataStream.isFailureStore());
                 }

+ 69 - 13
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -110,6 +110,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     private final IndexMode indexMode;
     @Nullable
     private final DataStreamLifecycle lifecycle;
+    private final boolean rolloverOnWrite;
     private final boolean failureStore;
     private final List<Index> failureIndices;
 
@@ -140,7 +141,41 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode,
             lifecycle,
             failureStore,
-            failureIndices
+            failureIndices,
+            false
+        );
+    }
+
+    public DataStream(
+        String name,
+        List<Index> indices,
+        long generation,
+        Map<String, Object> metadata,
+        boolean hidden,
+        boolean replicated,
+        boolean system,
+        boolean allowCustomRouting,
+        IndexMode indexMode,
+        DataStreamLifecycle lifecycle,
+        boolean failureStore,
+        List<Index> failureIndices,
+        boolean rolloverOnWrite
+    ) {
+        this(
+            name,
+            indices,
+            generation,
+            metadata,
+            hidden,
+            replicated,
+            system,
+            System::currentTimeMillis,
+            allowCustomRouting,
+            indexMode,
+            lifecycle,
+            failureStore,
+            failureIndices,
+            rolloverOnWrite
         );
     }
 
@@ -158,7 +193,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         IndexMode indexMode,
         DataStreamLifecycle lifecycle,
         boolean failureStore,
-        List<Index> failureIndices
+        List<Index> failureIndices,
+        boolean rolloverOnWrite
     ) {
         this.name = name;
         this.indices = List.copyOf(indices);
@@ -176,6 +212,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         this.failureStore = failureStore;
         this.failureIndices = failureIndices;
         assert assertConsistent(this.indices);
+        this.rolloverOnWrite = rolloverOnWrite;
     }
 
     // mainly available for testing
@@ -236,6 +273,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         return indices.get(indices.size() - 1);
     }
 
+    public boolean rolloverOnWrite() {
+        return rolloverOnWrite;
+    }
+
     /**
      * @param timestamp The timestamp used to select a backing index based on its start and end time.
      * @param metadata  The metadata that is used to fetch the start and end times for backing indices of this data stream.
@@ -616,7 +657,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode,
             lifecycle,
             failureStore,
-            failureIndices
+            failureIndices,
+            rolloverOnWrite
         );
     }
 
@@ -867,7 +909,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) ? in.readOptionalEnum(IndexMode.class) : null,
             in.getTransportVersion().onOrAfter(TransportVersions.V_8_500_020) ? in.readOptionalWriteable(DataStreamLifecycle::new) : null,
             in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? in.readBoolean() : false,
-            in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? readIndices(in) : List.of()
+            in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? readIndices(in) : List.of(),
+            in.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED) ? in.readBoolean() : false
         );
     }
 
@@ -908,6 +951,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             out.writeBoolean(failureStore);
             out.writeCollection(failureIndices);
         }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED)) {
+            out.writeBoolean(rolloverOnWrite);
+        }
     }
 
     public static final ParseField NAME_FIELD = new ParseField("name");
@@ -923,11 +969,15 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public static final ParseField LIFECYCLE = new ParseField("lifecycle");
     public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store");
     public static final ParseField FAILURE_INDICES_FIELD = new ParseField("failure_indices");
+    public static final ParseField ROLLOVER_ON_WRITE_FIELD = new ParseField("rollover_on_write");
 
     @SuppressWarnings("unchecked")
-    private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>(
-        "data_stream",
-        args -> new DataStream(
+    private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
+        // Fields behind a feature flag need to be parsed last otherwise the parser will fail when the feature flag is disabled.
+        // Until the feature flag is removed we keep them separately to be mindful of this.
+        boolean failureStoreEnabled = DataStream.isFailureStoreEnabled() && args[11] != null && (boolean) args[11];
+        List<Index> failureStoreIndices = DataStream.isFailureStoreEnabled() && args[12] != null ? (List<Index>) args[12] : List.of();
+        return new DataStream(
             (String) args[0],
             (List<Index>) args[1],
             (Long) args[2],
@@ -938,10 +988,11 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             args[7] != null && (boolean) args[7],
             args[8] != null ? IndexMode.fromString((String) args[8]) : null,
             (DataStreamLifecycle) args[9],
-            DataStream.isFailureStoreEnabled() && args[10] != null && (boolean) args[10],
-            DataStream.isFailureStoreEnabled() && args[11] != null ? (List<Index>) args[11] : List.of()
-        )
-    );
+            failureStoreEnabled,
+            failureStoreIndices,
+            args[10] != null && (boolean) args[10]
+        );
+    });
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
@@ -962,6 +1013,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_MODE);
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> DataStreamLifecycle.fromXContent(p), LIFECYCLE);
+        PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ROLLOVER_ON_WRITE_FIELD);
+        // The fields behind the feature flag should always be last.
         if (DataStream.isFailureStoreEnabled()) {
             PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_STORE_FIELD);
             PARSER.declareObjectArray(
@@ -1014,6 +1067,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             builder.field(LIFECYCLE.getPreferredName());
             lifecycle.toXContent(builder, params, rolloverConfiguration);
         }
+        builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), rolloverOnWrite);
         builder.endObject();
         return builder;
     }
@@ -1034,7 +1088,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             && indexMode == that.indexMode
             && Objects.equals(lifecycle, that.lifecycle)
             && failureStore == that.failureStore
-            && failureIndices.equals(that.failureIndices);
+            && failureIndices.equals(that.failureIndices)
+            && rolloverOnWrite == that.rolloverOnWrite;
     }
 
     @Override
@@ -1051,7 +1106,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             indexMode,
             lifecycle,
             failureStore,
-            failureIndices
+            failureIndices,
+            rolloverOnWrite
         );
     }
 

+ 113 - 5
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -41,12 +41,13 @@ public class MetadataDataStreamsService {
 
     private final ClusterService clusterService;
     private final IndicesService indicesService;
-    private final MasterServiceTaskQueue<UpdateLifecycleTask> taskQueue;
+    private final MasterServiceTaskQueue<UpdateLifecycleTask> updateLifecycleTaskQueue;
+    private final MasterServiceTaskQueue<SetRolloverOnWriteTask> setRolloverOnWriteTaskQueue;
 
     public MetadataDataStreamsService(ClusterService clusterService, IndicesService indicesService) {
         this.clusterService = clusterService;
         this.indicesService = indicesService;
-        ClusterStateTaskExecutor<UpdateLifecycleTask> executor = new SimpleBatchedAckListenerTaskExecutor<>() {
+        ClusterStateTaskExecutor<UpdateLifecycleTask> updateLifecycleExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {
 
             @Override
             public Tuple<ClusterState, ClusterStateAckListener> executeTask(
@@ -61,7 +62,25 @@ public class MetadataDataStreamsService {
         };
         // We chose priority high because changing the lifecycle is changing the retention of a backing index, so processing it quickly
         // can either free space when the retention is shortened, or prevent an index to be deleted when the retention is extended.
-        this.taskQueue = clusterService.createTaskQueue("modify-lifecycle", Priority.HIGH, executor);
+        this.updateLifecycleTaskQueue = clusterService.createTaskQueue("modify-lifecycle", Priority.HIGH, updateLifecycleExecutor);
+        ClusterStateTaskExecutor<SetRolloverOnWriteTask> rolloverOnWriteExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {
+
+            @Override
+            public Tuple<ClusterState, ClusterStateAckListener> executeTask(
+                SetRolloverOnWriteTask setRolloverOnWriteTask,
+                ClusterState clusterState
+            ) {
+                return new Tuple<>(
+                    setRolloverOnWrite(clusterState, setRolloverOnWriteTask.getDataStreamName(), setRolloverOnWriteTask.rolloverOnWrite()),
+                    setRolloverOnWriteTask
+                );
+            }
+        };
+        this.setRolloverOnWriteTaskQueue = clusterService.createTaskQueue(
+            "data-stream-rollover-on-write",
+            Priority.NORMAL,
+            rolloverOnWriteExecutor
+        );
     }
 
     public void modifyDataStream(final ModifyDataStreamsAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
@@ -93,7 +112,11 @@ public class MetadataDataStreamsService {
         TimeValue masterTimeout,
         final ActionListener<AcknowledgedResponse> listener
     ) {
-        taskQueue.submitTask("set-lifecycle", new UpdateLifecycleTask(dataStreamNames, lifecycle, ackTimeout, listener), masterTimeout);
+        updateLifecycleTaskQueue.submitTask(
+            "set-lifecycle",
+            new UpdateLifecycleTask(dataStreamNames, lifecycle, ackTimeout, listener),
+            masterTimeout
+        );
     }
 
     /**
@@ -105,7 +128,11 @@ public class MetadataDataStreamsService {
         TimeValue masterTimeout,
         ActionListener<AcknowledgedResponse> listener
     ) {
-        taskQueue.submitTask("delete-lifecycle", new UpdateLifecycleTask(dataStreamNames, null, ackTimeout, listener), masterTimeout);
+        updateLifecycleTaskQueue.submitTask(
+            "delete-lifecycle",
+            new UpdateLifecycleTask(dataStreamNames, null, ackTimeout, listener),
+            masterTimeout
+        );
     }
 
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
@@ -113,6 +140,23 @@ public class MetadataDataStreamsService {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
     }
 
+    /**
+     * Submits the task to signal that the next time this data stream receives a document, it will be rolled over.
+     */
+    public void setRolloverOnWrite(
+        String dataStreamName,
+        boolean rolloverOnWrite,
+        TimeValue ackTimeout,
+        TimeValue masterTimeout,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        setRolloverOnWriteTaskQueue.submitTask(
+            "set-rollover-on-write",
+            new SetRolloverOnWriteTask(dataStreamName, rolloverOnWrite, ackTimeout, listener),
+            masterTimeout
+        );
+    }
+
     /**
      * Computes the resulting cluster state after applying all requested data stream modifications in order.
      *
@@ -175,6 +219,42 @@ public class MetadataDataStreamsService {
         return ClusterState.builder(currentState).metadata(builder.build()).build();
     }
 
+    /**
+     * Creates an updated cluster state in which the requested data stream has the flag {@link DataStream#rolloverOnWrite()}
+     * set to the value of the parameter rolloverOnWrite
+     *
+     * @param currentState the initial cluster state
+     * @param dataStreamName the name of the data stream to be updated
+     * @param rolloverOnWrite the value of the flag
+     * @return the updated cluster state
+     */
+    public static ClusterState setRolloverOnWrite(ClusterState currentState, String dataStreamName, boolean rolloverOnWrite) {
+        Metadata metadata = currentState.metadata();
+        var dataStream = validateDataStream(metadata, dataStreamName);
+        if (dataStream.rolloverOnWrite() == rolloverOnWrite) {
+            return currentState;
+        }
+        Metadata.Builder builder = Metadata.builder(metadata);
+        builder.put(
+            new DataStream(
+                dataStream.getName(),
+                dataStream.getIndices(),
+                dataStream.getGeneration(),
+                dataStream.getMetadata(),
+                dataStream.isHidden(),
+                dataStream.isReplicated(),
+                dataStream.isSystem(),
+                dataStream.isAllowCustomRouting(),
+                dataStream.getIndexMode(),
+                dataStream.getLifecycle(),
+                dataStream.isFailureStore(),
+                dataStream.getFailureIndices(),
+                rolloverOnWrite
+            )
+        );
+        return ClusterState.builder(currentState).metadata(builder.build()).build();
+    }
+
     private static void addBackingIndex(
         Metadata metadata,
         Metadata.Builder builder,
@@ -270,4 +350,32 @@ public class MetadataDataStreamsService {
             return lifecycle;
         }
     }
+
+    /**
+     * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
+     */
+    static class SetRolloverOnWriteTask extends AckedBatchedClusterStateUpdateTask {
+
+        private final String dataStreamName;
+        private final boolean rolloverOnWrite;
+
+        SetRolloverOnWriteTask(
+            String dataStreamName,
+            boolean rolloverOnWrite,
+            TimeValue ackTimeout,
+            ActionListener<AcknowledgedResponse> listener
+        ) {
+            super(ackTimeout, listener);
+            this.dataStreamName = dataStreamName;
+            this.rolloverOnWrite = rolloverOnWrite;
+        }
+
+        public String getDataStreamName() {
+            return dataStreamName;
+        }
+
+        public boolean rolloverOnWrite() {
+            return rolloverOnWrite;
+        }
+    }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java

@@ -48,6 +48,7 @@ public class RestRolloverIndexAction extends BaseRestHandler {
         RolloverRequest rolloverIndexRequest = new RolloverRequest(request.param("index"), request.param("new_index"));
         request.applyContentParser(parser -> rolloverIndexRequest.fromXContent(includeTypeName, parser));
         rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false));
+        rolloverIndexRequest.lazy(request.paramAsBoolean("lazy", false));
         rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout()));
         rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout()));
         rolloverIndexRequest.getCreateIndexRequest()

+ 2 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestTests.java

@@ -174,6 +174,7 @@ public class RolloverRequestTests extends ESTestCase {
                 .addMinPrimaryShardDocsCondition(randomNonNegativeLong())
                 .build()
         );
+        originalRequest.lazy(randomBoolean());
         try (BytesStreamOutput out = new BytesStreamOutput()) {
             originalRequest.writeTo(out);
             BytesReference bytes = out.bytes();
@@ -181,6 +182,7 @@ public class RolloverRequestTests extends ESTestCase {
                 RolloverRequest cloneRequest = new RolloverRequest(in);
                 assertThat(cloneRequest.getNewIndexName(), equalTo(originalRequest.getNewIndexName()));
                 assertThat(cloneRequest.getRolloverTarget(), equalTo(originalRequest.getRolloverTarget()));
+                assertThat(cloneRequest.isLazy(), equalTo(originalRequest.isLazy()));
                 for (Map.Entry<String, Condition<?>> entry : cloneRequest.getConditions().getConditions().entrySet()) {
                     Condition<?> condition = originalRequest.getConditions().getConditions().get(entry.getKey());
                     // here we compare the string representation as there is some information loss when serializing

+ 29 - 82
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java

@@ -33,7 +33,8 @@ public class RolloverResponseTests extends AbstractWireSerializingTestCase<Rollo
             randomBoolean(),
             randomBoolean(),
             acknowledged,
-            shardsAcknowledged
+            shardsAcknowledged,
+            randomBoolean()
         );
     }
 
@@ -69,100 +70,46 @@ public class RolloverResponseTests extends AbstractWireSerializingTestCase<Rollo
 
     @Override
     protected RolloverResponse mutateInstance(RolloverResponse response) {
-        int i = randomIntBetween(0, 6);
+        var oldIndex = response.getOldIndex();
+        var newIndex = response.getNewIndex();
+        var conditionStatus = response.getConditionStatus();
+        var dryRun = response.isDryRun();
+        var rolledOver = response.isRolledOver();
+        var acknowledged = response.isAcknowledged();
+        var shardsAcknowledged = response.isShardsAcknowledged();
+        var lazy = response.isLazy();
+        int i = randomIntBetween(0, 7);
         switch (i) {
-            case 0:
-                return new RolloverResponse(
-                    response.getOldIndex() + randomAlphaOfLengthBetween(2, 5),
-                    response.getNewIndex(),
-                    response.getConditionStatus(),
-                    response.isDryRun(),
-                    response.isRolledOver(),
-                    response.isAcknowledged(),
-                    response.isShardsAcknowledged()
-                );
-            case 1:
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex() + randomAlphaOfLengthBetween(2, 5),
-                    response.getConditionStatus(),
-                    response.isDryRun(),
-                    response.isRolledOver(),
-                    response.isAcknowledged(),
-                    response.isShardsAcknowledged()
-                );
-            case 2:
-                Map<String, Boolean> results;
+            case 0 -> oldIndex = oldIndex + randomAlphaOfLengthBetween(2, 5);
+            case 1 -> newIndex = newIndex + randomAlphaOfLengthBetween(2, 5);
+            case 2 -> {
                 if (response.getConditionStatus().isEmpty()) {
-                    results = randomResults(false);
+                    conditionStatus = randomResults(false);
                 } else {
-                    results = Maps.newMapWithExpectedSize(response.getConditionStatus().size());
+                    conditionStatus = Maps.newMapWithExpectedSize(response.getConditionStatus().size());
                     List<String> keys = randomSubsetOf(
                         randomIntBetween(1, response.getConditionStatus().size()),
                         response.getConditionStatus().keySet()
                     );
                     for (Map.Entry<String, Boolean> entry : response.getConditionStatus().entrySet()) {
                         boolean value = keys.contains(entry.getKey()) ? entry.getValue() == false : entry.getValue();
-                        results.put(entry.getKey(), value);
+                        conditionStatus.put(entry.getKey(), value);
                     }
                 }
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex(),
-                    results,
-                    response.isDryRun(),
-                    response.isRolledOver(),
-                    response.isAcknowledged(),
-                    response.isShardsAcknowledged()
-                );
-            case 3:
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex(),
-                    response.getConditionStatus(),
-                    response.isDryRun() == false,
-                    response.isRolledOver(),
-                    response.isAcknowledged(),
-                    response.isShardsAcknowledged()
-                );
-            case 4:
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex(),
-                    response.getConditionStatus(),
-                    response.isDryRun(),
-                    response.isRolledOver() == false,
-                    response.isAcknowledged(),
-                    response.isShardsAcknowledged()
-                );
-            case 5: {
-                boolean acknowledged = response.isAcknowledged() == false;
-                boolean shardsAcknowledged = acknowledged && response.isShardsAcknowledged();
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex(),
-                    response.getConditionStatus(),
-                    response.isDryRun(),
-                    response.isRolledOver(),
-                    acknowledged,
-                    shardsAcknowledged
-                );
             }
-            case 6: {
-                boolean shardsAcknowledged = response.isShardsAcknowledged() == false;
-                boolean acknowledged = shardsAcknowledged || response.isAcknowledged();
-                return new RolloverResponse(
-                    response.getOldIndex(),
-                    response.getNewIndex(),
-                    response.getConditionStatus(),
-                    response.isDryRun(),
-                    response.isRolledOver(),
-                    acknowledged,
-                    shardsAcknowledged
-                );
+            case 3 -> dryRun = dryRun == false;
+            case 4 -> rolledOver = rolledOver == false;
+            case 5 -> {
+                acknowledged = response.isAcknowledged() == false;
+                shardsAcknowledged = acknowledged && response.isShardsAcknowledged();
             }
-            default:
-                throw new UnsupportedOperationException();
+            case 6 -> {
+                shardsAcknowledged = response.isShardsAcknowledged() == false;
+                acknowledged = shardsAcknowledged || response.isAcknowledged();
+            }
+            case 7 -> lazy = lazy == false;
+            default -> throw new UnsupportedOperationException();
         }
+        return new RolloverResponse(oldIndex, newIndex, conditionStatus, dryRun, rolledOver, acknowledged, shardsAcknowledged, lazy);
     }
 }

+ 163 - 21
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java

@@ -18,14 +18,17 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
 import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RecoverySource;
@@ -38,6 +41,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.cache.query.QueryCacheStats;
 import org.elasticsearch.index.cache.request.RequestCacheStats;
@@ -61,6 +65,8 @@ import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.hamcrest.Matchers;
+import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 
 import java.nio.file.Path;
@@ -74,6 +80,7 @@ import static java.util.Collections.emptyList;
 import static org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction.buildStats;
 import static org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction.evaluateConditions;
 import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.ArgumentMatchers.any;
@@ -86,6 +93,30 @@ import static org.mockito.Mockito.when;
 
 public class TransportRolloverActionTests extends ESTestCase {
 
+    final ClusterService mockClusterService = mock(ClusterService.class);
+    final DiscoveryNode mockNode = mock(DiscoveryNode.class);
+    final ThreadPool mockThreadPool = mock(ThreadPool.class);
+    final MetadataCreateIndexService mockCreateIndexService = mock(MetadataCreateIndexService.class);
+    final IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
+    final ActionFilters mockActionFilters = mock(ActionFilters.class);
+    final MetadataIndexAliasesService mdIndexAliasesService = mock(MetadataIndexAliasesService.class);
+    final MetadataDataStreamsService mockMetadataDataStreamService = mock(MetadataDataStreamsService.class);
+    final Client mockClient = mock(Client.class);
+    final AllocationService mockAllocationService = mock(AllocationService.class);
+    final MetadataRolloverService rolloverService = new MetadataRolloverService(
+        mockThreadPool,
+        mockCreateIndexService,
+        mdIndexAliasesService,
+        EmptySystemIndices.INSTANCE,
+        WriteLoadForecaster.DEFAULT
+    );
+
+    @Before
+    public void setUpMocks() {
+        when(mockNode.getId()).thenReturn("mocknode");
+        when(mockClusterService.localNode()).thenReturn(mockNode);
+    }
+
     public void testDocStatsSelectionFromPrimariesOnly() {
         long docsInPrimaryShards = 100;
         long docsInShards = 200;
@@ -300,19 +331,6 @@ public class TransportRolloverActionTests extends ESTestCase {
     }
 
     public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPrimariesFromWriteIndex() throws Exception {
-        final ClusterService mockClusterService = mock(ClusterService.class);
-        final DiscoveryNode mockNode = mock(DiscoveryNode.class);
-        when(mockNode.getId()).thenReturn("mocknode");
-        when(mockClusterService.localNode()).thenReturn(mockNode);
-        final ThreadPool mockThreadPool = mock(ThreadPool.class);
-        final MetadataCreateIndexService mockCreateIndexService = mock(MetadataCreateIndexService.class);
-        final IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
-        final ActionFilters mockActionFilters = mock(ActionFilters.class);
-        final MetadataIndexAliasesService mdIndexAliasesService = mock(MetadataIndexAliasesService.class);
-
-        final Client mockClient = mock(Client.class);
-        final AllocationService mockAllocationService = mock(AllocationService.class);
-
         final Map<String, IndexStats> indexStats = new HashMap<>();
         int total = randomIntBetween(500, 1000);
         indexStats.put("logs-index-000001", createIndexStats(200L, total));
@@ -346,13 +364,6 @@ public class TransportRolloverActionTests extends ESTestCase {
 
         when(mockCreateIndexService.applyCreateIndexRequest(any(), any(), anyBoolean(), any())).thenReturn(stateBefore);
         when(mdIndexAliasesService.applyAliasActions(any(), any())).thenReturn(stateBefore);
-        MetadataRolloverService rolloverService = new MetadataRolloverService(
-            mockThreadPool,
-            mockCreateIndexService,
-            mdIndexAliasesService,
-            EmptySystemIndices.INSTANCE,
-            WriteLoadForecaster.DEFAULT
-        );
 
         final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
             mock(TransportService.class),
@@ -362,7 +373,8 @@ public class TransportRolloverActionTests extends ESTestCase {
             mockIndexNameExpressionResolver,
             rolloverService,
             mockClient,
-            mockAllocationService
+            mockAllocationService,
+            mockMetadataDataStreamService
         );
 
         // For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count
@@ -398,6 +410,136 @@ public class TransportRolloverActionTests extends ESTestCase {
         assertThat(response.getConditionStatus().get("[max_docs: 300]"), is(true));
     }
 
+    public void testLazyRollover() throws Exception {
+        final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-000001")
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .build();
+        final DataStream dataStream = new DataStream(
+            "logs-ds",
+            List.of(backingIndexMetadata.getIndex()),
+            1,
+            Map.of(),
+            false,
+            false,
+            false,
+            false,
+            IndexMode.STANDARD
+        );
+        final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(backingIndexMetadata, false).put(dataStream))
+            .build();
+
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            assert args.length == 5;
+            @SuppressWarnings("unchecked")
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) args[4];
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return null;
+        }).when(mockMetadataDataStreamService).setRolloverOnWrite(eq(dataStream.getName()), eq(true), any(), any(), anyActionListener());
+
+        final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
+            mock(TransportService.class),
+            mockClusterService,
+            mockThreadPool,
+            mockActionFilters,
+            mockIndexNameExpressionResolver,
+            rolloverService,
+            mockClient,
+            mockAllocationService,
+            mockMetadataDataStreamService
+        );
+        final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
+        RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
+        rolloverRequest.lazy(true);
+        transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
+        RolloverResponse rolloverResponse = future.actionGet();
+        assertThat(rolloverResponse.getOldIndex(), equalTo(".ds-logs-ds-000001"));
+        assertThat(rolloverResponse.getNewIndex(), Matchers.startsWith(".ds-logs-ds-"));
+        assertThat(rolloverResponse.getNewIndex(), Matchers.endsWith("-000002"));
+        assertThat(rolloverResponse.isLazy(), equalTo(true));
+        assertThat(rolloverResponse.isDryRun(), equalTo(false));
+        assertThat(rolloverResponse.isRolledOver(), equalTo(false));
+        assertThat(rolloverResponse.getConditionStatus().size(), equalTo(0));
+        assertThat(rolloverResponse.isAcknowledged(), is(true));
+    }
+
+    public void testLazyRolloverFails() throws Exception {
+        final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("logs-index-000001")
+            .putAlias(AliasMetadata.builder("logs-alias").writeIndex(true).build())
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(1);
+        final IndexMetadata backingIndexMetadata = IndexMetadata.builder(".ds-logs-ds-000001")
+            .settings(settings(IndexVersion.current()))
+            .numberOfShards(1)
+            .numberOfReplicas(1)
+            .build();
+        final DataStream dataStream = new DataStream(
+            "logs-ds",
+            List.of(backingIndexMetadata.getIndex()),
+            randomIntBetween(1, 10),
+            Map.of(),
+            false,
+            false,
+            false,
+            false,
+            IndexMode.STANDARD
+        );
+        final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(Metadata.builder().put(indexMetadata).put(backingIndexMetadata, false).put(dataStream))
+            .build();
+
+        final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
+            mock(TransportService.class),
+            mockClusterService,
+            mockThreadPool,
+            mockActionFilters,
+            mockIndexNameExpressionResolver,
+            rolloverService,
+            mockClient,
+            mockAllocationService,
+            mockMetadataDataStreamService
+        );
+
+        // Lazy rollover fails on a concrete index
+        {
+            final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
+            RolloverRequest rolloverRequest = new RolloverRequest("logs-alias", null);
+            rolloverRequest.lazy(true);
+            transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
+            IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, future::actionGet);
+            assertThat(illegalArgumentException.getMessage(), containsString("Lazy rollover can be applied only on a data stream."));
+        }
+
+        // Lazy rollover fails when used with conditions
+        {
+            final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
+            RolloverRequest rolloverRequest = new RolloverRequest("logs-ds", null);
+            rolloverRequest.setConditions(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueDays(1)).build());
+            rolloverRequest.lazy(true);
+            transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
+            IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, future::actionGet);
+            assertThat(illegalArgumentException.getMessage(), containsString("Lazy rollover can be used only without any conditions."));
+        }
+
+        // Lazy rollover fails on concrete index with conditions
+        {
+            final PlainActionFuture<RolloverResponse> future = new PlainActionFuture<>();
+            RolloverRequest rolloverRequest = new RolloverRequest("logs-alias", null);
+            rolloverRequest.setConditions(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueDays(1)).build());
+            rolloverRequest.lazy(true);
+            transportRolloverAction.masterOperation(mock(CancellableTask.class), rolloverRequest, stateBefore, future);
+            IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, future::actionGet);
+            assertThat(
+                illegalArgumentException.getMessage(),
+                containsString("Lazy rollover can be applied only on a data stream with no conditions.")
+            );
+        }
+    }
+
     private IndicesStatsResponse createIndicesStatResponse(String indexName, long totalDocs, long primariesDocs) {
         final CommonStats primaryStats = mock(CommonStats.class);
         when(primaryStats.getDocs()).thenReturn(new DocsStats(primariesDocs, 0, between(1, 10000)));

+ 2 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

@@ -190,6 +190,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
             }
         };
         Set<String> autoCreateIndices = Set.of(); // unused
+        Set<String> dataStreamsToRollover = Set.of(); // unused
         Map<String, IndexNotFoundException> indicesThatCannotBeCreated = Map.of(); // unused
         long startTime = 0;
         bulkAction.createMissingIndicesAndIndexData(
@@ -198,6 +199,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
             randomAlphaOfLength(10),
             listener,
             autoCreateIndices,
+            dataStreamsToRollover,
             indicesThatCannotBeCreated,
             startTime
         );

+ 2 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -1653,7 +1653,8 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass
             lifecycle,
             failureStore,
-            failureIndices
+            failureIndices,
+            false
         );
 
         try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -261,7 +261,8 @@ public final class DataStreamTestHelper {
             randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass
             randomBoolean() ? DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build() : null,
             failureStore,
-            failureIndices
+            failureIndices,
+            randomBoolean()
         );
     }
 

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RolloverStepTests.java

@@ -152,7 +152,7 @@ public class RolloverStepTests extends AbstractStepTestCase<RolloverStep> {
             @SuppressWarnings("unchecked")
             ActionListener<RolloverResponse> listener = (ActionListener<RolloverResponse>) invocation.getArguments()[1];
             assertRolloverIndexRequest(request, rolloverTarget);
-            listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true));
+            listener.onResponse(new RolloverResponse(null, null, Collections.emptyMap(), request.isDryRun(), true, true, true, false));
             return null;
         }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
     }

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/WaitForRolloverReadyStepTests.java

@@ -349,7 +349,7 @@ public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForR
             assertRolloverIndexRequest(request, rolloverTarget, expectedConditions);
             Map<String, Boolean> conditionResults = expectedConditions.stream()
                 .collect(Collectors.toMap(Condition::toString, condition -> conditionResult));
-            listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false));
+            listener.onResponse(new RolloverResponse(null, null, conditionResults, request.isDryRun(), false, false, false, false));
             return null;
         }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());
     }

+ 2 - 1
x-pack/plugin/profiling/src/test/java/org/elasticsearch/xpack/profiling/ProfilingDataStreamManagerTests.java

@@ -404,7 +404,8 @@ public class ProfilingDataStreamManagerTests extends ESTestCase {
                 false,
                 true,
                 true,
-                true
+                true,
+                false
             );
         } else {
             fail("client called with unexpected request:" + request.toString());