Browse Source

Initial implementation for DataLifecycleService (#94012)

This adds support for managing the lifecycle of data streams. It
currently supports rollover and data retention.
Andrei Dan 2 years ago
parent
commit
4760f0074f

+ 5 - 0
docs/changelog/94012.yaml

@@ -0,0 +1,5 @@
+pr: 94012
+summary: Initial implementation for `DataLifecycleService`
+area: DLM
+type: feature
+issues: []

+ 1 - 0
modules/data-streams/src/main/java/module-info.java

@@ -14,4 +14,5 @@ module org.elasticsearch.datastreams {
     requires org.apache.lucene.core;
 
     exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
+    exports org.elasticsearch.datastreams to org.elasticsearch.dlm;
 }

+ 3 - 2
modules/dlm/build.gradle

@@ -1,5 +1,3 @@
-import org.elasticsearch.gradle.internal.info.BuildParams
-
 apply plugin: 'elasticsearch.internal-es-plugin'
 apply plugin: 'elasticsearch.internal-cluster-test'
 
@@ -9,5 +7,8 @@ esplugin {
   classname 'org.elasticsearch.dlm.DataLifecyclePlugin'
 }
 archivesBaseName = 'dlm'
+dependencies {
+  testImplementation project(':modules:data-streams')
+}
 
 addQaCheckDependencies(project)

+ 180 - 0
modules/dlm/src/internalClusterTest/java/org/elasticsearch/dlm/DataLifecycleServiceIT.java

@@ -0,0 +1,180 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.dlm;
+
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.datastreams.CreateDataStreamAction;
+import org.elasticsearch.action.datastreams.GetDataStreamAction;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.datastreams.DataStreamsPlugin;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.xcontent.XContentType;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
+import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
+
+public class DataLifecycleServiceIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(DataLifecyclePlugin.class, DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
+    }
+
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
+        settings.put(DataLifecycleService.DLM_POLL_INTERVAL, "1s");
+        return settings.build();
+    }
+
+    public void testRolloverLifecycle() throws Exception {
+        // empty lifecycle contains the default rollover
+        DataLifecycle lifecycle = new DataLifecycle();
+
+        putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
+        Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);
+
+        for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
+            dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
+                RolloverRequest rolloverRequest = new RolloverRequest(target, null);
+                rolloverRequest.addMaxIndexDocsCondition(1);
+                return rolloverRequest;
+            });
+        }
+        String dataStreamName = "metrics-foo";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        indexDocs(dataStreamName, 1);
+
+        assertBusy(() -> {
+            GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+            GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                .actionGet();
+            assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+            assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
+            List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
+            assertThat(backingIndices.size(), equalTo(2));
+            String backingIndex = backingIndices.get(0).getName();
+            assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
+            String writeIndex = backingIndices.get(1).getName();
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+        });
+    }
+
+    public void testRolloverAndRetention() throws Exception {
+        DataLifecycle lifecycle = new DataLifecycle(TimeValue.timeValueMillis(0));
+
+        putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
+        Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);
+
+        for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
+            dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
+                RolloverRequest rolloverRequest = new RolloverRequest(target, null);
+                rolloverRequest.addMaxIndexDocsCondition(1);
+                return rolloverRequest;
+            });
+        }
+        String dataStreamName = "metrics-foo";
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+
+        indexDocs(dataStreamName, 1);
+
+        assertBusy(() -> {
+            GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
+            GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
+                .actionGet();
+            assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
+            assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
+            List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
+            assertThat(backingIndices.size(), equalTo(1));
+            // we expect the data stream to have only one backing index, the write one, with generation 2
+            // as generation 1 would've been deleted by DLM given the lifecycle configuration
+            String writeIndex = backingIndices.get(0).getName();
+            assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
+        });
+    }
+
+    static void indexDocs(String dataStream, int numDocs) {
+        BulkRequest bulkRequest = new BulkRequest();
+        for (int i = 0; i < numDocs; i++) {
+            String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
+            bulkRequest.add(
+                new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
+                    .source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
+            );
+        }
+        BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
+        assertThat(bulkResponse.getItems().length, equalTo(numDocs));
+        String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
+        for (BulkItemResponse itemResponse : bulkResponse) {
+            assertThat(itemResponse.getFailureMessage(), nullValue());
+            assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
+            assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
+        }
+        client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet();
+    }
+
+    static void putComposableIndexTemplate(
+        String id,
+        @Nullable String mappings,
+        List<String> patterns,
+        @Nullable Settings settings,
+        @Nullable Map<String, Object> metadata,
+        @Nullable DataLifecycle lifecycle
+    ) throws IOException {
+        PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
+        request.indexTemplate(
+            new ComposableIndexTemplate(
+                patterns,
+                new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, lifecycle),
+                null,
+                null,
+                null,
+                metadata,
+                new ComposableIndexTemplate.DataStreamTemplate(),
+                null
+            )
+        );
+        client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
+    }
+
+}

+ 2 - 0
modules/dlm/src/main/java/module-info.java

@@ -12,4 +12,6 @@ module org.elasticsearch.dlm {
     requires org.elasticsearch.xcontent;
     requires org.apache.lucene.core;
     requires org.apache.logging.log4j;
+
+    exports org.elasticsearch.dlm;
 }

+ 43 - 14
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecyclePlugin.java

@@ -8,12 +8,18 @@
 
 package org.elasticsearch.dlm;
 
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.client.internal.OriginSettingClient;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.plugins.ActionPlugin;
@@ -25,17 +31,21 @@ import org.elasticsearch.tracing.Tracer;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 
+import java.io.IOException;
 import java.time.Clock;
 import java.util.Collection;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static org.elasticsearch.cluster.metadata.DataLifecycle.DLM_ORIGIN;
+
 /**
  * Plugin encapsulating Data Lifecycle Management Service.
  */
 public class DataLifecyclePlugin extends Plugin implements ActionPlugin {
 
     private final Settings settings;
+    private final SetOnce<DataLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
 
     public DataLifecyclePlugin(Settings settings) {
         this.settings = settings;
@@ -66,20 +76,39 @@ public class DataLifecyclePlugin extends Plugin implements ActionPlugin {
         Tracer tracer,
         AllocationService allocationService
     ) {
-        return super.createComponents(
-            client,
-            clusterService,
-            threadPool,
-            resourceWatcherService,
-            scriptService,
-            xContentRegistry,
-            environment,
-            nodeEnvironment,
-            namedWriteableRegistry,
-            indexNameExpressionResolver,
-            repositoriesServiceSupplier,
-            tracer,
-            allocationService
+        if (DataLifecycle.isEnabled() == false) {
+            return List.of();
+        }
+
+        dataLifecycleInitialisationService.set(
+            new DataLifecycleService(
+                settings,
+                new OriginSettingClient(client, DLM_ORIGIN),
+                clusterService,
+                getClock(),
+                threadPool,
+                threadPool::absoluteTimeInMillis
+            )
         );
+        dataLifecycleInitialisationService.get().init();
+        return List.of(dataLifecycleInitialisationService.get());
+    }
+
+    @Override
+    public List<Setting<?>> getSettings() {
+        if (DataLifecycle.isEnabled() == false) {
+            return List.of();
+        }
+
+        return List.of(DataLifecycleService.DLM_POLL_INTERVAL_SETTING);
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            IOUtils.close(dataLifecycleInitialisationService.get());
+        } catch (IOException e) {
+            throw new ElasticsearchException("unable to close the data lifecycle service", e);
+        }
     }
 }

+ 359 - 0
modules/dlm/src/main/java/org/elasticsearch/dlm/DataLifecycleService.java

@@ -0,0 +1,359 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ResultDeduplicator;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.Lifecycle;
+import org.elasticsearch.common.scheduler.SchedulerEngine;
+import org.elasticsearch.common.scheduler.TimeValueSchedule;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.gateway.GatewayService;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequest;
+
+import java.io.Closeable;
+import java.time.Clock;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+
+/**
+ * This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a DLM lifecycle configured.
+ * It runs on the master node and it schedules a job according to the configured {@link DataLifecycleService#DLM_POLL_INTERVAL_SETTING}.
+ */
+public class DataLifecycleService implements ClusterStateListener, Closeable, SchedulerEngine.Listener {
+
+    public static final String DLM_POLL_INTERVAL = "indices.dlm.poll_interval";
+    public static final Setting<TimeValue> DLM_POLL_INTERVAL_SETTING = Setting.timeSetting(
+        DLM_POLL_INTERVAL,
+        TimeValue.timeValueMinutes(10),
+        TimeValue.timeValueSeconds(1),
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+    private static final Logger logger = LogManager.getLogger(DataLifecycleService.class);
+    /**
+     * Name constant for the job DLM schedules
+     */
+    private static final String DATA_LIFECYCLE_JOB_NAME = "dlm";
+
+    private final Settings settings;
+    private final Client client;
+    private final ClusterService clusterService;
+    private final ResultDeduplicator<TransportRequest, Void> transportActionsDeduplicator;
+    private final LongSupplier nowSupplier;
+    private final Clock clock;
+    private volatile boolean isMaster = false;
+    private volatile TimeValue pollInterval;
+    private SchedulerEngine.Job scheduledJob;
+    private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
+    // we use this rollover supplier to facilitate testing until we'll be able to read the
+    // rollover configuration from a cluster setting
+    private Function<String, RolloverRequest> defaultRolloverRequestSupplier;
+
+    public DataLifecycleService(
+        Settings settings,
+        Client client,
+        ClusterService clusterService,
+        Clock clock,
+        ThreadPool threadPool,
+        LongSupplier nowSupplier
+    ) {
+        this.settings = settings;
+        this.client = client;
+        this.clusterService = clusterService;
+        this.clock = clock;
+        this.transportActionsDeduplicator = new ResultDeduplicator<>(threadPool.getThreadContext());
+        this.nowSupplier = nowSupplier;
+        this.scheduledJob = null;
+        this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings);
+        this.defaultRolloverRequestSupplier = this::getDefaultRolloverRequest;
+    }
+
+    /**
+     * Initializer method to avoid the publication of a self reference in the constructor.
+     */
+    public void init() {
+        clusterService.addListener(this);
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(DLM_POLL_INTERVAL_SETTING, this::updatePollInterval);
+    }
+
+    @Override
+    public void clusterChanged(ClusterChangedEvent event) {
+        // wait for the cluster state to be recovered
+        if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+            return;
+        }
+
+        final boolean prevIsMaster = this.isMaster;
+        if (prevIsMaster != event.localNodeMaster()) {
+            this.isMaster = event.localNodeMaster();
+            if (this.isMaster) {
+                // we weren't the master, and now we are
+                maybeScheduleJob();
+            } else {
+                // we were the master, and now we aren't
+                cancelJob();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        SchedulerEngine engine = scheduler.get();
+        if (engine != null) {
+            engine.stop();
+        }
+    }
+
+    @Override
+    public void triggered(SchedulerEngine.Event event) {
+        if (event.getJobName().equals(DATA_LIFECYCLE_JOB_NAME)) {
+            if (this.isMaster) {
+                logger.trace("DLM job triggered: {}, {}, {}", event.getJobName(), event.getScheduledTime(), event.getTriggeredTime());
+                run(clusterService.state());
+            }
+        }
+    }
+
+    /**
+     * Iterates over the DLM managed data streams and executes the needed operations
+     * to satisfy the configured {@link org.elasticsearch.cluster.metadata.DataLifecycle}.
+     */
+    // default visibility for testing purposes
+    void run(ClusterState state) {
+        for (DataStream dataStream : state.metadata().dataStreams().values()) {
+            if (dataStream.getLifecycle() == null) {
+                continue;
+            }
+
+            try {
+                maybeExecuteRollover(state, dataStream);
+            } catch (Exception e) {
+                logger.error(() -> String.format(Locale.ROOT, "DLM failed to rollver data stream [%s]", dataStream.getName()), e);
+            }
+
+            try {
+                maybeExecuteRetention(state, dataStream);
+            } catch (Exception e) {
+                logger.error(
+                    () -> String.format(Locale.ROOT, "DLM failed to execute retention for data stream [%s]", dataStream.getName()),
+                    e
+                );
+            }
+        }
+    }
+
+    private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
+        IndexMetadata writeIndex = state.metadata().index(dataStream.getWriteIndex());
+        if (writeIndex != null && isManagedByDLM(dataStream, writeIndex)) {
+            RolloverRequest rolloverRequest = defaultRolloverRequestSupplier.apply(dataStream.getName());
+            transportActionsDeduplicator.executeOnce(
+                rolloverRequest,
+                ActionListener.noop(),
+                (req, reqListener) -> rolloverDataStream(rolloverRequest, reqListener)
+            );
+        }
+    }
+
+    private void maybeExecuteRetention(ClusterState state, DataStream dataStream) {
+        TimeValue retention = getRetentionConfiguration(dataStream);
+        if (retention != null) {
+            List<Index> backingIndices = dataStream.getIndices();
+            // we'll look at the current write index in the next run if it's rolled over (and not the write index anymore)
+            for (int i = 0; i < backingIndices.size() - 1; i++) {
+                IndexMetadata backingIndex = state.metadata().index(backingIndices.get(i));
+                if (backingIndex == null || isManagedByDLM(dataStream, backingIndex) == false) {
+                    continue;
+                }
+
+                if (isTimeToBeDeleted(dataStream.getName(), backingIndex, nowSupplier, retention)) {
+                    // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
+                    // let's start simple and reevaluate
+                    DeleteIndexRequest deleteRequest = new DeleteIndexRequest(backingIndex.getIndex().getName()).masterNodeTimeout(
+                        TimeValue.MAX_VALUE
+                    );
+
+                    // time to delete the index
+                    transportActionsDeduplicator.executeOnce(
+                        deleteRequest,
+                        ActionListener.noop(),
+                        (req, reqListener) -> deleteIndex(deleteRequest, reqListener)
+                    );
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks if the provided index is ready to be deleted according to the configured retention.
+     */
+    static boolean isTimeToBeDeleted(
+        String dataStreamName,
+        IndexMetadata backingIndex,
+        LongSupplier nowSupplier,
+        TimeValue configuredRetention
+    ) {
+        TimeValue indexLifecycleDate = getCreationOrRolloverDate(dataStreamName, backingIndex);
+
+        long nowMillis = nowSupplier.getAsLong();
+        return nowMillis >= indexLifecycleDate.getMillis() + configuredRetention.getMillis();
+    }
+
+    private void rolloverDataStream(RolloverRequest rolloverRequest, ActionListener<Void> listener) {
+        // "saving" the rollover target name here so we don't capture the entire request
+        String rolloverTarget = rolloverRequest.getRolloverTarget();
+        logger.trace("DLM issues rollover request for data stream [{}]", rolloverTarget);
+        client.admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(RolloverResponse rolloverResponse) {
+                logger.info(
+                    "DLM successfully rolled over datastream [{}]. The new index is [{}]",
+                    rolloverTarget,
+                    rolloverResponse.getNewIndex()
+                );
+                listener.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                logger.error(() -> Strings.format("DLM rollover of [%s] failed", rolloverTarget), e);
+                listener.onFailure(e);
+            }
+        });
+    }
+
+    private void deleteIndex(DeleteIndexRequest deleteIndexRequest, ActionListener<Void> listener) {
+        assert deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length == 1 : "DLM deletes one index at a time";
+        // "saving" the index name here so we don't capture the entire request
+        String targetIndex = deleteIndexRequest.indices()[0];
+        logger.trace("DLM issue delete request for index [{}]", targetIndex);
+        client.admin().indices().delete(deleteIndexRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                logger.info("DLM successfully deleted index [{}]", targetIndex);
+                listener.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                logger.error(() -> Strings.format("DLM request to delete [%s] failed", targetIndex), e);
+                listener.onFailure(e);
+            }
+        });
+    }
+
+    @Nullable
+    static TimeValue getRetentionConfiguration(DataStream dataStream) {
+        if (dataStream.getLifecycle() == null) {
+            return null;
+        }
+        return dataStream.getLifecycle().getDataRetention();
+    }
+
+    /**
+     * Calculate the age of the index since creation or rollover time if the index was already rolled.
+     * The rollover target is the data stream name the index is a part of.
+     */
+    static TimeValue getCreationOrRolloverDate(String rolloverTarget, IndexMetadata index) {
+        RolloverInfo rolloverInfo = index.getRolloverInfos().get(rolloverTarget);
+        if (rolloverInfo != null) {
+            return TimeValue.timeValueMillis(rolloverInfo.getTime());
+        } else {
+            return TimeValue.timeValueMillis(index.getCreationDate());
+        }
+    }
+
+    /**
+     * This is quite a shallow method but the purpose of its existence is to have only one place to modify once we
+     * introduce the index.lifecycle.prefer_ilm setting. Once the prefer_ilm setting exists the method will also
+     * make more sense as it will encapsulate a bit more logic.
+     */
+    private static boolean isManagedByDLM(DataStream parentDataStream, IndexMetadata indexMetadata) {
+        return indexMetadata.getLifecyclePolicyName() == null && parentDataStream.getLifecycle() != null;
+    }
+
+    private RolloverRequest getDefaultRolloverRequest(String dataStream) {
+        RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
+
+        // TODO get rollover from cluster setting once we have it
+        rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(7));
+        rolloverRequest.addMaxPrimaryShardSizeCondition(ByteSizeValue.ofGb(50));
+        rolloverRequest.addMaxPrimaryShardDocsCondition(200_000_000);
+        // don't rollover an empty index
+        rolloverRequest.addMinIndexDocsCondition(1);
+        return rolloverRequest;
+    }
+
+    private void updatePollInterval(TimeValue newInterval) {
+        this.pollInterval = newInterval;
+        maybeScheduleJob();
+    }
+
+    private void cancelJob() {
+        if (scheduler.get() != null) {
+            scheduler.get().remove(DATA_LIFECYCLE_JOB_NAME);
+            scheduledJob = null;
+        }
+    }
+
+    private boolean isClusterServiceStoppedOrClosed() {
+        final Lifecycle.State state = clusterService.lifecycleState();
+        return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED;
+    }
+
+    private void maybeScheduleJob() {
+        if (this.isMaster == false) {
+            return;
+        }
+
+        // don't schedule the job if the node is shutting down
+        if (isClusterServiceStoppedOrClosed()) {
+            logger.trace("Skipping scheduling a DLM job due to the cluster lifecycle state being: [{}] ", clusterService.lifecycleState());
+            return;
+        }
+
+        if (scheduler.get() == null) {
+            scheduler.set(new SchedulerEngine(settings, clock));
+            scheduler.get().register(this);
+        }
+
+        assert scheduler.get() != null : "scheduler should be available";
+        scheduledJob = new SchedulerEngine.Job(DATA_LIFECYCLE_JOB_NAME, new TimeValueSchedule(pollInterval));
+        scheduler.get().add(scheduledJob);
+    }
+
+    // package visibility for testing
+    void setDefaultRolloverRequestSupplier(Function<String, RolloverRequest> defaultRolloverRequestSupplier) {
+        this.defaultRolloverRequestSupplier = defaultRolloverRequestSupplier;
+    }
+}

+ 277 - 0
modules/dlm/src/test/java/org/elasticsearch/dlm/DataLifecycleServiceTests.java

@@ -0,0 +1,277 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.dlm;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
+import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataLifecycle;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.client.NoOpClient;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequest;
+import org.junit.After;
+import org.junit.Before;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+public class DataLifecycleServiceTests extends ESTestCase {
+
+    private long now;
+    private ThreadPool threadPool;
+    private DataLifecycleService dataLifecycleService;
+    private List<TransportRequest> clientSeenRequests;
+    private NoOpClient client;
+
+    @Before
+    public void setupServices() {
+        threadPool = new TestThreadPool(getTestName());
+        Set<Setting<?>> builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+        builtInClusterSettings.add(DataLifecycleService.DLM_POLL_INTERVAL_SETTING);
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings);
+        ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
+
+        now = randomNonNegativeLong();
+        Clock clock = Clock.fixed(Instant.ofEpochMilli(now), ZoneId.of(randomFrom(ZoneId.getAvailableZoneIds())));
+        clientSeenRequests = new CopyOnWriteArrayList<>();
+
+        client = getTransportRequestsRecordingClient();
+        dataLifecycleService = new DataLifecycleService(Settings.EMPTY, client, clusterService, clock, threadPool, () -> now);
+        dataLifecycleService.init();
+    }
+
+    @After
+    public void cleanup() {
+        clientSeenRequests.clear();
+        dataLifecycleService.close();
+        threadPool.shutdownNow();
+        client.close();
+    }
+
+    public void testOperationsExecutedOnce() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.timeValueMillis(0))
+        );
+        builder.put(dataStream);
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.size(), is(3));
+        assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+        assertThat(((RolloverRequest) clientSeenRequests.get(0)).getRolloverTarget(), is(dataStreamName));
+        List<DeleteIndexRequest> deleteRequests = clientSeenRequests.subList(1, 3)
+            .stream()
+            .map(transportRequest -> (DeleteIndexRequest) transportRequest)
+            .toList();
+        assertThat(deleteRequests.get(0).indices()[0], is(dataStream.getIndices().get(0).getName()));
+        assertThat(deleteRequests.get(1).indices()[0], is(dataStream.getIndices().get(1).getName()));
+
+        // on the second run the rollover and delete requests should not execute anymore
+        // i.e. the count should *remain* 1 for rollover and 2 for deletes
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.size(), is(3));
+    }
+
+    public void testRetentionNotConfigured() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle((TimeValue) null)
+        );
+        builder.put(dataStream);
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.size(), is(1));
+        assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+    }
+
+    public void testRetentionNotExecutedDueToAge() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            settings(Version.CURRENT),
+            new DataLifecycle(TimeValue.timeValueDays(700))
+        );
+        builder.put(dataStream);
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.size(), is(1));
+        assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
+    }
+
+    public void testIlmManagedIndicesAreSkipped() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT),
+            new DataLifecycle(TimeValue.timeValueMillis(0))
+        );
+        builder.put(dataStream);
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.isEmpty(), is(true));
+    }
+
+    public void testDataStreamsWithoutLifecycleAreSkipped() {
+        String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        int numBackingIndices = 3;
+        Metadata.Builder builder = Metadata.builder();
+        DataStream dataStream = createDataStream(
+            builder,
+            dataStreamName,
+            numBackingIndices,
+            Settings.builder().put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy").put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT),
+            null
+        );
+        builder.put(dataStream);
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
+        dataLifecycleService.run(state);
+        assertThat(clientSeenRequests.isEmpty(), is(true));
+    }
+
+    public void testIsTimeToBeDeleted() {
+        String dataStreamName = "metrics-foo";
+        {
+            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
+                .settings(settings(Version.CURRENT))
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .creationDate(now - 3000L);
+            MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+            indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+
+            IndexMetadata rolledIndex = indexMetaBuilder.build();
+            assertThat(
+                DataLifecycleService.isTimeToBeDeleted(dataStreamName, rolledIndex, () -> now, TimeValue.timeValueMillis(1000)),
+                is(true)
+            );
+
+            assertThat(
+                DataLifecycleService.isTimeToBeDeleted(dataStreamName, rolledIndex, () -> now, TimeValue.timeValueMillis(5000)),
+                is(false)
+            );
+        }
+
+        {
+            // if rollover info is missing the creation date should be used
+            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
+                .settings(settings(Version.CURRENT))
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .creationDate(now - 3000L);
+            IndexMetadata noRolloverIndex = indexMetaBuilder.build();
+
+            assertThat(
+                DataLifecycleService.isTimeToBeDeleted(dataStreamName, noRolloverIndex, () -> now, TimeValue.timeValueMillis(2000)),
+                is(true)
+            );
+
+            assertThat(
+                DataLifecycleService.isTimeToBeDeleted(dataStreamName, noRolloverIndex, () -> now, TimeValue.timeValueMillis(5000)),
+                is(false)
+            );
+        }
+    }
+
+    private DataStream createDataStream(
+        Metadata.Builder builder,
+        String dataStreamName,
+        int backingIndicesCount,
+        Settings.Builder backingIndicesSettings,
+        @Nullable DataLifecycle lifecycle
+    ) {
+        final List<Index> backingIndices = new ArrayList<>();
+        for (int k = 1; k <= backingIndicesCount; k++) {
+            IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k))
+                .settings(backingIndicesSettings)
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .creationDate(now - 3000L);
+            if (k < backingIndicesCount) {
+                // add rollover info only for non-write indices
+                MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
+                indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
+            }
+            IndexMetadata indexMetadata = indexMetaBuilder.build();
+            builder.put(indexMetadata, false);
+            backingIndices.add(indexMetadata.getIndex());
+        }
+        return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle);
+    }
+
+    private NoOpClient getTransportRequestsRecordingClient() {
+        return new NoOpClient(getTestName()) {
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                clientSeenRequests.add(request);
+            }
+        };
+    }
+}

+ 21 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/delete/DeleteIndexRequest.java

@@ -17,6 +17,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.CollectionUtils;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 
@@ -105,4 +107,23 @@ public class DeleteIndexRequest extends AcknowledgedRequest<DeleteIndexRequest>
         out.writeStringArray(indices);
         indicesOptions.writeIndicesOptions(out);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DeleteIndexRequest that = (DeleteIndexRequest) o;
+        return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(indicesOptions);
+        result = 31 * result + Arrays.hashCode(indices);
+        return result;
+    }
 }

+ 22 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java

@@ -30,6 +30,7 @@ import org.elasticsearch.xcontent.XContentParser;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 
@@ -429,4 +430,25 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
     public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
         return new CancellableTask(id, type, action, "", parentTaskId, headers);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RolloverRequest that = (RolloverRequest) o;
+        return dryRun == that.dryRun
+            && Objects.equals(rolloverTarget, that.rolloverTarget)
+            && Objects.equals(newIndexName, that.newIndexName)
+            && Objects.equals(conditions, that.conditions)
+            && Objects.equals(createIndexRequest, that.createIndexRequest);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rolloverTarget, newIndexName, dryRun, conditions, createIndexRequest);
+    }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataLifecycle.java

@@ -35,6 +35,7 @@ public class DataLifecycle implements SimpleDiffable<DataLifecycle>, ToXContentO
     private static final boolean FEATURE_FLAG_ENABLED;
 
     public static final DataLifecycle EMPTY = new DataLifecycle();
+    public static final String DLM_ORIGIN = "data_lifecycle";
 
     private static final ParseField DATA_RETENTION_FIELD = new ParseField("data_retention");
 

+ 13 - 1
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -20,6 +20,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.CheckedFunction;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.Index;
@@ -100,7 +101,18 @@ public final class DataStreamTestHelper {
         Map<String, Object> metadata,
         boolean replicated
     ) {
-        return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, null);
+        return newInstance(name, indices, generation, metadata, replicated, null);
+    }
+
+    public static DataStream newInstance(
+        String name,
+        List<Index> indices,
+        long generation,
+        Map<String, Object> metadata,
+        boolean replicated,
+        @Nullable DataLifecycle lifecycle
+    ) {
+        return new DataStream(name, indices, generation, metadata, false, replicated, false, false, null, lifecycle);
     }
 
     public static String getLegacyDefaultBackingIndexName(

+ 2 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java

@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
+import static org.elasticsearch.cluster.metadata.DataLifecycle.DLM_ORIGIN;
 import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;
 import static org.elasticsearch.persistent.PersistentTasksService.PERSISTENT_TASK_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
@@ -131,6 +132,7 @@ public final class AuthorizationUtils {
             case PERSISTENT_TASK_ORIGIN:
             case ROLLUP_ORIGIN:
             case INDEX_LIFECYCLE_ORIGIN:
+            case DLM_ORIGIN:
             case ENRICH_ORIGIN:
             case IDP_ORIGIN:
             case INGEST_ORIGIN: