Browse Source

Store write load in IndexMetadata during data streams rollovers (#91019)

This commits stores the index write load of the current data stream write-index
during rollover into its IndexMetadata.

Closes #91046
Francisco Fernández Castaño 2 years ago
parent
commit
72f3578f2b
19 changed files with 749 additions and 38 deletions
  1. 5 0
      docs/changelog/91019.yaml
  2. 135 1
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  3. 1 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java
  4. 13 3
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java
  5. 13 4
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java
  6. 16 3
      server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java
  7. 71 7
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
  8. 200 0
      server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java
  9. 24 10
      server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
  10. 3 2
      server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
  11. 1 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  12. 8 4
      server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java
  13. 22 0
      server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java
  14. 14 0
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java
  15. 60 0
      server/src/test/java/org/elasticsearch/index/shard/IndexWriteLoadSerializationTests.java
  16. 147 0
      server/src/test/java/org/elasticsearch/index/shard/IndexWriteLoadTests.java
  17. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java
  18. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java
  19. 14 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

+ 5 - 0
docs/changelog/91019.yaml

@@ -0,0 +1,5 @@
+pr: 91019
+summary: Store write load in the `IndexMetadata` during data streams rollovers
+area: Allocation
+type: enhancement
+issues: []

+ 135 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -27,7 +27,11 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
 import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
+import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
@@ -61,6 +65,8 @@ import org.elasticsearch.cluster.metadata.DataStreamAlias;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.compress.CompressedXContent;
@@ -72,6 +78,8 @@ import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
 import org.elasticsearch.index.mapper.DateFieldMapper;
 import org.elasticsearch.index.mapper.MapperParsingException;
 import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.index.shard.IndexWriteLoad;
+import org.elasticsearch.index.shard.IndexingStats;
 import org.elasticsearch.indices.InvalidAliasNameException;
 import org.elasticsearch.indices.InvalidIndexNameException;
 import org.elasticsearch.plugins.Plugin;
@@ -80,6 +88,8 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.ObjectPath;
 import org.elasticsearch.xcontent.XContentType;
 
@@ -115,6 +125,8 @@ import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItemInArray;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
@@ -127,7 +139,7 @@ public class DataStreamIT extends ESIntegTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return List.of(DataStreamsPlugin.class);
+        return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
     }
 
     public void testBasicScenario() throws Exception {
@@ -1998,6 +2010,128 @@ public class DataStreamIT extends ESIntegTestCase {
         assertEquals(searchResponse.getTotalShards(), 4);
     }
 
+    public void testWriteIndexWriteLoadIsStoredAfterRollover() throws Exception {
+        final String dataStreamName = "logs-es";
+        final int numberOfShards = randomIntBetween(1, 5);
+        final int numberOfReplicas = randomIntBetween(0, 1);
+        final var indexSettings = Settings.builder()
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
+            .build();
+        DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
+        final var request = new CreateDataStreamAction.Request(dataStreamName);
+        assertAcked(client().execute(CreateDataStreamAction.INSTANCE, request).actionGet());
+
+        assertBusy(() -> {
+            for (int i = 0; i < 10; i++) {
+                indexDocs(dataStreamName, randomIntBetween(100, 200));
+            }
+
+            final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
+            final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+            final String writeIndex = dataStream.getWriteIndex().getName();
+            final IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(writeIndex).get();
+            for (IndexShardStats indexShardStats : indicesStatsResponse.getIndex(writeIndex).getIndexShards().values()) {
+                for (ShardStats shard : indexShardStats.getShards()) {
+                    final IndexingStats.Stats shardIndexingStats = shard.getStats().getIndexing().getTotal();
+                    // Ensure that we have enough clock granularity before rolling over to ensure that we capture _some_ write load
+                    assertThat(shardIndexingStats.getTotalActiveTimeInMillis(), is(greaterThan(0L)));
+                    assertThat(shardIndexingStats.getWriteLoad(), is(greaterThan(0.0)));
+                }
+            }
+        });
+
+        assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
+        final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
+        final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+
+        for (Index index : dataStream.getIndices()) {
+            final IndexMetadata indexMetadata = clusterState.metadata().index(index);
+            final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();
+
+            if (index.equals(dataStream.getWriteIndex()) == false) {
+                assertThat(indexWriteLoad, is(notNullValue()));
+                for (int shardId = 0; shardId < numberOfShards; shardId++) {
+                    assertThat(indexWriteLoad.getWriteLoadForShard(shardId).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
+                    assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).getAsLong(), is(greaterThan(0L)));
+                }
+            } else {
+                assertThat(indexWriteLoad, is(nullValue()));
+            }
+        }
+    }
+
+    public void testWriteLoadIsStoredInABestEffort() throws Exception {
+        // This test simulates the scenario where some nodes fail to respond
+        // to the IndicesStatsRequest and therefore only a partial view of the
+        // write-index write-load is stored during rollover.
+        // In this test we simulate the following scenario:
+        // - The DataStream template is configured to have 2 shards and 1 replica
+        // - There's an allocation rule to allocate the data stream nodes in 4 particular nodes
+        // - We want to simulate two possible cases here:
+        // - All the assigned nodes for shard 0 will fail to respond to the IndicesStatsRequest
+        // - Only the shard 1 replica will respond successfully to the IndicesStatsRequest ensuring that we fall back in that case
+
+        final List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(4);
+        final String dataStreamName = "logs-es";
+
+        final var indexSettings = Settings.builder()
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes))
+            .build();
+        DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
+        final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
+        assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());
+
+        for (int i = 0; i < 10; i++) {
+            indexDocs(dataStreamName, randomIntBetween(100, 200));
+        }
+
+        final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
+        final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
+        final IndexRoutingTable currentDataStreamWriteIndexRoutingTable = clusterStateBeforeRollover.routingTable()
+            .index(dataStreamBeforeRollover.getWriteIndex());
+
+        final List<String> failingIndicesStatsNodeIds = new ArrayList<>();
+        for (ShardRouting shardRouting : currentDataStreamWriteIndexRoutingTable.shard(0).assignedShards()) {
+            failingIndicesStatsNodeIds.add(shardRouting.currentNodeId());
+        }
+        failingIndicesStatsNodeIds.add(currentDataStreamWriteIndexRoutingTable.shard(1).primaryShard().currentNodeId());
+
+        for (String nodeId : failingIndicesStatsNodeIds) {
+            String nodeName = clusterStateBeforeRollover.nodes().resolveNode(nodeId).getName();
+            MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
+            transportService.addRequestHandlingBehavior(
+                IndicesStatsAction.NAME + "[n]",
+                (handler, request, channel, task) -> channel.sendResponse(new RuntimeException("Unable to get stats"))
+            );
+        }
+        assertThat(failingIndicesStatsNodeIds.size(), is(equalTo(3)));
+
+        assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
+        final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
+        final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+
+        for (Index index : dataStream.getIndices()) {
+            final IndexMetadata indexMetadata = clusterState.metadata().index(index);
+            final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();
+
+            if (index.equals(dataStream.getWriteIndex()) == false) {
+                assertThat(indexWriteLoad, is(notNullValue()));
+                // All stats request performed against nodes holding the shard 0 failed
+                assertThat(indexWriteLoad.getWriteLoadForShard(0).isPresent(), is(false));
+                assertThat(indexWriteLoad.getUptimeInMillisForShard(0).isPresent(), is(false));
+
+                // At least one of the shard 1 copies responded with stats
+                assertThat(indexWriteLoad.getWriteLoadForShard(1).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
+                assertThat(indexWriteLoad.getUptimeInMillisForShard(1).getAsLong(), is(greaterThan(0L)));
+            } else {
+                assertThat(indexWriteLoad, is(nullValue()));
+            }
+        }
+    }
+
     static void putComposableIndexTemplate(
         String id,
         @Nullable String mappings,

+ 1 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamGetWriteIndexTests.java

@@ -313,7 +313,7 @@ public class DataStreamGetWriteIndexTests extends ESTestCase {
         MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
         List<Condition<?>> metConditions = Collections.singletonList(condition);
         CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
-        return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false);
+        return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null);
     }
 
     private Index getWriteIndex(ClusterState state, String name, String timestamp) {

+ 13 - 3
modules/data-streams/src/test/java/org/elasticsearch/datastreams/MetadataDataStreamRolloverServiceTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.MapperTestUtils;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -47,6 +48,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
 
@@ -100,6 +102,7 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
             MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
             List<Condition<?>> metConditions = Collections.singletonList(condition);
             CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
+            IndexWriteLoad indexWriteLoad = IndexWriteLoad.builder(1).build();
 
             long before = testThreadPool.absoluteTimeInMillis();
             MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
@@ -110,7 +113,8 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
                 metConditions,
                 now,
                 randomBoolean(),
-                false
+                false,
+                indexWriteLoad
             );
             long after = testThreadPool.absoluteTimeInMillis();
 
@@ -138,12 +142,16 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
             IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
             Instant startTime1 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
             Instant endTime1 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            IndexWriteLoad indexWriteLoad1 = im.getWriteLoad();
             im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
             Instant startTime2 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
             Instant endTime2 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
+            IndexWriteLoad indexWriteLoad2 = im.getWriteLoad();
             assertThat(startTime1.isBefore(endTime1), is(true));
             assertThat(endTime1, equalTo(startTime2));
             assertThat(endTime2.isAfter(endTime1), is(true));
+            assertThat(indexWriteLoad1, is(equalTo(indexWriteLoad)));
+            assertThat(indexWriteLoad2, is(nullValue()));
         } finally {
             testThreadPool.shutdown();
         }
@@ -204,7 +212,8 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
                 metConditions,
                 now,
                 randomBoolean(),
-                false
+                false,
+                null
             );
 
             String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
@@ -295,7 +304,8 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase {
                 metConditions,
                 now,
                 randomBoolean(),
-                false
+                false,
+                null
             );
 
             String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());

+ 13 - 4
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.indices.SystemDataStreamDescriptor;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.snapshots.SnapshotInProgressException;
@@ -98,7 +99,8 @@ public class MetadataRolloverService {
         List<Condition<?>> metConditions,
         Instant now,
         boolean silent,
-        boolean onlyValidate
+        boolean onlyValidate,
+        @Nullable IndexWriteLoad sourceIndexWriteLoad
     ) throws Exception {
         validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
         final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
@@ -121,7 +123,8 @@ public class MetadataRolloverService {
                 metConditions,
                 now,
                 silent,
-                onlyValidate
+                onlyValidate,
+                sourceIndexWriteLoad
             );
             default ->
                 // the validate method above prevents this case
@@ -228,7 +231,8 @@ public class MetadataRolloverService {
         List<Condition<?>> metConditions,
         Instant now,
         boolean silent,
-        boolean onlyValidate
+        boolean onlyValidate,
+        @Nullable IndexWriteLoad sourceIndexWriteLoad
     ) throws Exception {
 
         if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
@@ -284,10 +288,15 @@ public class MetadataRolloverService {
         );
 
         RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
+
         newState = ClusterState.builder(newState)
             .metadata(
                 Metadata.builder(newState.metadata())
-                    .put(IndexMetadata.builder(newState.metadata().index(originalWriteIndex)).putRolloverInfo(rolloverInfo))
+                    .put(
+                        IndexMetadata.builder(newState.metadata().index(originalWriteIndex))
+                            .indexWriteLoad(sourceIndexWriteLoad)
+                            .putRolloverInfo(rolloverInfo)
+                    )
             )
             .build();
 

+ 16 - 3
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -38,6 +39,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -119,7 +121,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
         IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getRolloverTarget())
             .clear()
             .indicesOptions(IndicesOptions.fromOptions(true, false, true, true))
-            .docs(true);
+            .docs(true)
+            .indexing(true);
         statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
         // Rollover can sometimes happen concurrently, to handle these cases, we treat rollover in the same way we would treat a
         // "synchronized" block, in that we have a "before" world, where we calculate naming and condition matching, we then enter our
@@ -292,9 +295,10 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
             );
 
             // Re-evaluate the conditions, now with our final source index name
+            IndexMetadata rolloverSourceIndex = currentState.metadata().index(rolloverNames.sourceName());
             final Map<String, Boolean> postConditionResults = evaluateConditions(
                 rolloverRequest.getConditions().values(),
-                buildStats(currentState.metadata().index(rolloverNames.sourceName()), rolloverTask.statsResponse())
+                buildStats(rolloverSourceIndex, rolloverTask.statsResponse())
             );
 
             if (rolloverRequest.areConditionsMet(postConditionResults)) {
@@ -304,6 +308,14 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                     .filter(condition -> postConditionResults.get(condition.toString()))
                     .toList();
 
+                final IndexAbstraction rolloverTargetAbstraction = currentState.metadata()
+                    .getIndicesLookup()
+                    .get(rolloverRequest.getRolloverTarget());
+
+                final IndexWriteLoad sourceIndexWriteLoad = rolloverTargetAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM
+                    ? IndexWriteLoad.fromStats(rolloverSourceIndex, rolloverTask.statsResponse())
+                    : null;
+
                 // Perform the actual rollover
                 final var rolloverResult = rolloverService.rolloverClusterState(
                     currentState,
@@ -313,7 +325,8 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                     metConditions,
                     Instant.now(),
                     false,
-                    false
+                    false,
+                    sourceIndexWriteLoad
                 );
                 results.add(rolloverResult);
                 logger.trace("rollover result [{}]", rolloverResult);

+ 71 - 7
server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java

@@ -44,6 +44,7 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.shard.IndexLongFieldRange;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardLongFieldRange;
 import org.elasticsearch.rest.RestStatus;
@@ -521,11 +522,14 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
     static final String KEY_SYSTEM = "system";
     static final String KEY_TIMESTAMP_RANGE = "timestamp_range";
     public static final String KEY_PRIMARY_TERMS = "primary_terms";
+    public static final String KEY_WRITE_LOAD = "write_load";
 
     public static final String INDEX_STATE_FILE_PREFIX = "state-";
 
     static final Version SYSTEM_INDEX_FLAG_ADDED = Version.V_7_10_0;
 
+    static final Version WRITE_LOAD_ADDED = Version.V_8_6_0;
+
     private final int routingNumShards;
     private final int routingFactor;
     private final int routingPartitionSize;
@@ -603,6 +607,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
     private final Instant timeSeriesStart;
     @Nullable
     private final Instant timeSeriesEnd;
+    @Nullable
+    private final IndexWriteLoad writeLoad;
 
     private IndexMetadata(
         final Index index,
@@ -645,7 +651,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
         @Nullable final IndexMode indexMode,
         @Nullable final Instant timeSeriesStart,
         @Nullable final Instant timeSeriesEnd,
-        final Version indexCompatibilityVersion
+        final Version indexCompatibilityVersion,
+        @Nullable final IndexWriteLoad writeLoad
     ) {
         this.index = index;
         this.version = version;
@@ -696,6 +703,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
         this.indexMode = indexMode;
         this.timeSeriesStart = timeSeriesStart;
         this.timeSeriesEnd = timeSeriesEnd;
+        this.writeLoad = writeLoad;
         assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
     }
 
@@ -744,7 +752,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.indexMode,
             this.timeSeriesStart,
             this.timeSeriesEnd,
-            this.indexCompatibilityVersion
+            this.indexCompatibilityVersion,
+            this.writeLoad
         );
     }
 
@@ -799,7 +808,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.indexMode,
             this.timeSeriesStart,
             this.timeSeriesEnd,
-            this.indexCompatibilityVersion
+            this.indexCompatibilityVersion,
+            this.writeLoad
         );
     }
 
@@ -852,7 +862,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.indexMode,
             this.timeSeriesStart,
             this.timeSeriesEnd,
-            this.indexCompatibilityVersion
+            this.indexCompatibilityVersion,
+            this.writeLoad
         );
     }
 
@@ -905,7 +916,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.indexMode,
             this.timeSeriesStart,
             this.timeSeriesEnd,
-            this.indexCompatibilityVersion
+            this.indexCompatibilityVersion,
+            this.writeLoad
         );
     }
 
@@ -954,7 +966,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.indexMode,
             this.timeSeriesStart,
             this.timeSeriesEnd,
-            this.indexCompatibilityVersion
+            this.indexCompatibilityVersion,
+            this.writeLoad
         );
     }
 
@@ -1145,6 +1158,11 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
         return mapping;
     }
 
+    @Nullable
+    public IndexWriteLoad getWriteLoad() {
+        return writeLoad;
+    }
+
     public static final String INDEX_RESIZE_SOURCE_UUID_KEY = "index.resize.source.uuid";
     public static final String INDEX_RESIZE_SOURCE_NAME_KEY = "index.resize.source.name";
     public static final Setting<String> INDEX_RESIZE_SOURCE_UUID = Setting.simpleString(INDEX_RESIZE_SOURCE_UUID_KEY);
@@ -1379,6 +1397,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
         private final Diff<ImmutableOpenMap<String, RolloverInfo>> rolloverInfos;
         private final boolean isSystem;
         private final IndexLongFieldRange timestampRange;
+        private final IndexWriteLoad indexWriteLoad;
 
         IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
             index = after.index.getName();
@@ -1412,6 +1431,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer());
             isSystem = after.isSystem;
             timestampRange = after.timestampRange;
+            indexWriteLoad = after.writeLoad;
         }
 
         private static final DiffableUtils.DiffableValueReader<String, AliasMetadata> ALIAS_METADATA_DIFF_VALUE_READER =
@@ -1462,6 +1482,11 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
                 isSystem = false;
             }
             timestampRange = IndexLongFieldRange.readFrom(in);
+            if (in.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
+                indexWriteLoad = in.readOptionalWriteable(IndexWriteLoad::new);
+            } else {
+                indexWriteLoad = null;
+            }
         }
 
         @Override
@@ -1492,6 +1517,9 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
                 out.writeBoolean(isSystem);
             }
             timestampRange.writeTo(out);
+            if (out.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
+                out.writeOptionalWriteable(indexWriteLoad);
+            }
         }
 
         @Override
@@ -1518,6 +1546,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             builder.rolloverInfos.putAllFromMap(rolloverInfos.apply(part.rolloverInfos));
             builder.system(isSystem);
             builder.timestampRange(timestampRange);
+            builder.indexWriteLoad(indexWriteLoad);
             return builder.build();
         }
     }
@@ -1579,6 +1608,10 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             builder.system(in.readBoolean());
         }
         builder.timestampRange(IndexLongFieldRange.readFrom(in));
+
+        if (in.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
+            builder.indexWriteLoad(in.readOptionalWriteable(IndexWriteLoad::new));
+        }
         return builder.build();
     }
 
@@ -1620,6 +1653,9 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             out.writeBoolean(isSystem);
         }
         timestampRange.writeTo(out);
+        if (out.getVersion().onOrAfter(WRITE_LOAD_ADDED)) {
+            out.writeOptionalWriteable(writeLoad);
+        }
     }
 
     @Override
@@ -1666,6 +1702,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
         private boolean isSystem;
         private IndexLongFieldRange timestampRange = IndexLongFieldRange.NO_SHARDS;
         private LifecycleExecutionState lifecycleExecutionState = LifecycleExecutionState.EMPTY_STATE;
+        private IndexWriteLoad indexWriteLoad = null;
 
         public Builder(String index) {
             this.index = index;
@@ -1694,6 +1731,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             this.isSystem = indexMetadata.isSystem;
             this.timestampRange = indexMetadata.timestampRange;
             this.lifecycleExecutionState = indexMetadata.lifecycleExecutionState;
+            this.indexWriteLoad = indexMetadata.writeLoad;
         }
 
         public Builder index(String index) {
@@ -1908,6 +1946,11 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             return this;
         }
 
+        public Builder indexWriteLoad(IndexWriteLoad indexWriteLoad) {
+            this.indexWriteLoad = indexWriteLoad;
+            return this;
+        }
+
         public IndexMetadata build() {
             /*
              * We expect that the metadata has been properly built to set the number of shards and the number of replicas, and do not rely
@@ -2023,6 +2066,17 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
                 lifecycleExecutionState = LifecycleExecutionState.EMPTY_STATE;
             }
 
+            if (indexWriteLoad != null && indexWriteLoad.numberOfShards() != numberOfShards) {
+                assert false;
+                throw new IllegalArgumentException(
+                    "The number of write load shards ["
+                        + indexWriteLoad.numberOfShards()
+                        + "] is different than the number of index shards ["
+                        + numberOfShards
+                        + "]"
+                );
+            }
+
             final boolean isSearchableSnapshot = SearchableSnapshotsSettings.isSearchableSnapshotStore(settings);
             final String indexMode = settings.get(IndexSettings.MODE.getKey());
             final boolean isTsdb = indexMode != null && IndexMode.TIME_SERIES.getName().equals(indexMode.toLowerCase(Locale.ROOT));
@@ -2067,7 +2121,8 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
                 isTsdb ? IndexMode.TIME_SERIES : null,
                 isTsdb ? IndexSettings.TIME_SERIES_START_TIME.get(settings) : null,
                 isTsdb ? IndexSettings.TIME_SERIES_END_TIME.get(settings) : null,
-                SETTING_INDEX_VERSION_COMPATIBILITY.get(settings)
+                SETTING_INDEX_VERSION_COMPATIBILITY.get(settings),
+                indexWriteLoad
             );
         }
 
@@ -2179,6 +2234,12 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
             indexMetadata.timestampRange.toXContent(builder, params);
             builder.endObject();
 
+            if (indexMetadata.writeLoad != null) {
+                builder.startObject(KEY_WRITE_LOAD);
+                indexMetadata.writeLoad.toXContent(builder, params);
+                builder.endObject();
+            }
+
             builder.endObject();
         }
 
@@ -2253,6 +2314,9 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
                         case KEY_TIMESTAMP_RANGE:
                             builder.timestampRange(IndexLongFieldRange.fromXContent(parser));
                             break;
+                        case KEY_WRITE_LOAD:
+                            builder.indexWriteLoad(IndexWriteLoad.fromXContent(parser));
+                            break;
                         default:
                             // assume it's custom index metadata
                             builder.putCustom(currentFieldName, parser.mapStrings());

+ 200 - 0
server/src/main/java/org/elasticsearch/index/shard/IndexWriteLoad.java

@@ -0,0 +1,200 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xcontent.ConstructingObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContentFragment;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.OptionalDouble;
+import java.util.OptionalLong;
+
+public class IndexWriteLoad implements Writeable, ToXContentFragment {
+    public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
+    public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
+    private static final Double UNKNOWN_LOAD = -1.0;
+    private static final long UNKNOWN_UPTIME = -1;
+
+    @SuppressWarnings("unchecked")
+    private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
+        "index_write_load_parser",
+        false,
+        (args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1])
+    );
+
+    static {
+        PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
+        PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
+    }
+
+    public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> shardsUptimeInMillis) {
+        if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
+            assert false;
+            throw new IllegalArgumentException(
+                "The same number of shard write loads and shard uptimes should be provided, but "
+                    + shardsWriteLoad
+                    + " "
+                    + shardsUptimeInMillis
+                    + " were provided"
+            );
+        }
+
+        if (shardsWriteLoad.isEmpty()) {
+            assert false;
+            throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided");
+        }
+
+        return new IndexWriteLoad(
+            shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
+            shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray()
+        );
+    }
+
+    @Nullable
+    public static IndexWriteLoad fromStats(IndexMetadata indexMetadata, @Nullable IndicesStatsResponse indicesStatsResponse) {
+        if (indicesStatsResponse == null) {
+            return null;
+        }
+
+        final IndexStats indexStats = indicesStatsResponse.getIndex(indexMetadata.getIndex().getName());
+        if (indexStats == null) {
+            return null;
+        }
+
+        final int numberOfShards = indexMetadata.getNumberOfShards();
+        final var indexWriteLoadBuilder = IndexWriteLoad.builder(numberOfShards);
+        final var indexShards = indexStats.getIndexShards();
+        for (IndexShardStats indexShardsStats : indexShards.values()) {
+            final var shardStats = Arrays.stream(indexShardsStats.getShards())
+                .filter(stats -> stats.getShardRouting().primary())
+                .findFirst()
+                // Fallback to a replica if for some reason we couldn't find the primary stats
+                .orElse(indexShardsStats.getAt(0));
+            final var indexingShardStats = shardStats.getStats().getIndexing().getTotal();
+            indexWriteLoadBuilder.withShardWriteLoad(
+                shardStats.getShardRouting().id(),
+                indexingShardStats.getWriteLoad(),
+                indexingShardStats.getTotalActiveTimeInMillis()
+            );
+        }
+        return indexWriteLoadBuilder.build();
+    }
+
+    private final double[] shardWriteLoad;
+    private final long[] shardUptimeInMillis;
+
+    private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) {
+        assert shardWriteLoad.length == shardUptimeInMillis.length;
+        this.shardWriteLoad = shardWriteLoad;
+        this.shardUptimeInMillis = shardUptimeInMillis;
+    }
+
+    public IndexWriteLoad(StreamInput in) throws IOException {
+        this(in.readDoubleArray(), in.readLongArray());
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeDoubleArray(shardWriteLoad);
+        out.writeLongArray(shardUptimeInMillis);
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
+        builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
+        return builder;
+    }
+
+    public static IndexWriteLoad fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    public OptionalDouble getWriteLoadForShard(int shardId) {
+        assertShardInBounds(shardId);
+
+        double load = shardWriteLoad[shardId];
+        return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
+    }
+
+    public OptionalLong getUptimeInMillisForShard(int shardId) {
+        assertShardInBounds(shardId);
+
+        long uptime = shardUptimeInMillis[shardId];
+        return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty();
+    }
+
+    // Visible for testing
+    public int numberOfShards() {
+        return shardWriteLoad.length;
+    }
+
+    private void assertShardInBounds(int shardId) {
+        assert shardId >= 0 : "Unexpected shard id " + shardId;
+        assert shardId < shardWriteLoad.length : "Unexpected shard id " + shardId + ", expected < " + shardWriteLoad.length;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        IndexWriteLoad that = (IndexWriteLoad) o;
+        return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Arrays.hashCode(shardWriteLoad);
+        result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
+        return result;
+    }
+
+    public static Builder builder(int numShards) {
+        assert numShards > 0 : "A positive number of shards should be provided";
+        return new Builder(numShards);
+    }
+
+    public static class Builder {
+        final double[] shardWriteLoad;
+        final long[] uptimeInMillis;
+
+        private Builder(int numShards) {
+            this.shardWriteLoad = new double[numShards];
+            this.uptimeInMillis = new long[numShards];
+            Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
+            Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
+        }
+
+        public void withShardWriteLoad(int shardId, double load, long uptimeInMillis) {
+            if (shardId >= this.shardWriteLoad.length) {
+                throw new IllegalArgumentException();
+            }
+
+            this.shardWriteLoad[shardId] = load;
+            this.uptimeInMillis[shardId] = uptimeInMillis;
+        }
+
+        public IndexWriteLoad build() {
+            return new IndexWriteLoad(shardWriteLoad, uptimeInMillis);
+        }
+    }
+}

+ 24 - 10
server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 public class IndexingStats implements Writeable, ToXContentFragment {
 
@@ -38,7 +39,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
         private long noopUpdateCount;
         private long throttleTimeInMillis;
         private boolean isThrottled;
-        private double writeLoad;
+        private long totalIndexingTimeSinceShardStartedInNanos;
+        private long totalActiveTimeInNanos;
 
         Stats() {}
 
@@ -54,7 +56,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             isThrottled = in.readBoolean();
             throttleTimeInMillis = in.readLong();
             if (in.getVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
-                writeLoad = in.readDouble();
+                totalIndexingTimeSinceShardStartedInNanos = in.readLong();
+                totalActiveTimeInNanos = in.readLong();
             }
         }
 
@@ -69,7 +72,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             long noopUpdateCount,
             boolean isThrottled,
             long throttleTimeInMillis,
-            double writeLoad
+            long totalIndexingTimeSinceShardStartedInNanos,
+            long totalActiveTimeInNanos
         ) {
             this.indexCount = indexCount;
             this.indexTimeInMillis = indexTimeInMillis;
@@ -81,7 +85,9 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             this.noopUpdateCount = noopUpdateCount;
             this.isThrottled = isThrottled;
             this.throttleTimeInMillis = throttleTimeInMillis;
-            this.writeLoad = writeLoad;
+            // We store the raw write-load values in order to avoid losing precision when we combine the shard stats
+            this.totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeSinceShardStartedInNanos;
+            this.totalActiveTimeInNanos = totalActiveTimeInNanos;
         }
 
         public void add(Stats stats) {
@@ -99,7 +105,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             if (isThrottled != stats.isThrottled) {
                 isThrottled = true; // When combining if one is throttled set result to throttled.
             }
-            writeLoad += stats.writeLoad;
+            totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
+            totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
         }
 
         /**
@@ -170,7 +177,11 @@ public class IndexingStats implements Writeable, ToXContentFragment {
         }
 
         public double getWriteLoad() {
-            return writeLoad;
+            return totalActiveTimeInNanos > 0 ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0;
+        }
+
+        public long getTotalActiveTimeInMillis() {
+            return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
         }
 
         @Override
@@ -186,7 +197,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             out.writeBoolean(isThrottled);
             out.writeLong(throttleTimeInMillis);
             if (out.getVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
-                out.writeDouble(writeLoad);
+                out.writeLong(totalIndexingTimeSinceShardStartedInNanos);
+                out.writeLong(totalActiveTimeInNanos);
             }
         }
 
@@ -206,7 +218,7 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             builder.field(Fields.IS_THROTTLED, isThrottled);
             builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());
 
-            builder.field(Fields.WRITE_LOAD, writeLoad);
+            builder.field(Fields.WRITE_LOAD, getWriteLoad());
             return builder;
         }
 
@@ -225,7 +237,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
                 && noopUpdateCount == that.noopUpdateCount
                 && isThrottled == that.isThrottled
                 && throttleTimeInMillis == that.throttleTimeInMillis
-                && writeLoad == that.writeLoad;
+                && totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
+                && totalActiveTimeInNanos == that.totalActiveTimeInNanos;
         }
 
         @Override
@@ -241,7 +254,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
                 noopUpdateCount,
                 isThrottled,
                 throttleTimeInMillis,
-                writeLoad
+                totalIndexingTimeSinceShardStartedInNanos,
+                totalActiveTimeInNanos
             );
         }
     }

+ 3 - 2
server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

@@ -133,7 +133,7 @@ final class InternalIndexingStats implements IndexingOperationListener {
             long timeSinceShardStartedInNanos
         ) {
             final long totalIndexingTimeInNanos = indexMetric.sum();
-            final long totalIndexingTimeSinceShardStarted = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos;
+            final long totalIndexingTimeSinceShardStartedInNanos = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos;
             return new IndexingStats.Stats(
                 indexMetric.count(),
                 TimeUnit.NANOSECONDS.toMillis(totalIndexingTimeInNanos),
@@ -145,7 +145,8 @@ final class InternalIndexingStats implements IndexingOperationListener {
                 noopUpdates.count(),
                 isThrottled,
                 TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
-                timeSinceShardStartedInNanos > 0 ? (double) totalIndexingTimeSinceShardStarted / timeSinceShardStartedInNanos : 0
+                totalIndexingTimeSinceShardStartedInNanos,
+                timeSinceShardStartedInNanos
             );
         }
     }

+ 1 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -575,6 +575,7 @@ public class NodeStatsTests extends ESTestCase {
             ++iota,
             false,
             ++iota,
+            ++iota,
             ++iota
         );
         indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));

+ 8 - 4
server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

@@ -546,7 +546,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
                 metConditions,
                 Instant.now(),
                 randomBoolean(),
-                false
+                false,
+                null
             );
             long after = testThreadPool.absoluteTimeInMillis();
 
@@ -612,7 +613,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
                 metConditions,
                 Instant.now(),
                 randomBoolean(),
-                false
+                false,
+                null
             );
             long after = testThreadPool.absoluteTimeInMillis();
 
@@ -695,7 +697,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
             null,
             Instant.now(),
             randomBoolean(),
-            true
+            true,
+            null
         );
 
         newIndexName = newIndexName == null ? defaultRolloverIndexName : newIndexName;
@@ -735,7 +738,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
                 metConditions,
                 Instant.now(),
                 false,
-                randomBoolean()
+                randomBoolean(),
+                null
             )
         );
         assertThat(e.getMessage(), equalTo("no matching index template found for data stream [" + dataStream.getName() + "]"));

+ 22 - 0
server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
@@ -281,6 +282,10 @@ public class ClusterStateTests extends ESTestCase {
                     "system": false,
                     "timestamp_range": {
                       "shards": []
+                    },
+                    "write_load": {
+                      "loads": [-1.0],
+                      "uptimes": [-1]
                     }
                   }
                 },
@@ -493,6 +498,14 @@ public class ClusterStateTests extends ESTestCase {
                     "system" : false,
                     "timestamp_range" : {
                       "shards" : [ ]
+                    },
+                    "write_load" : {
+                      "loads" : [
+                        -1.0
+                      ],
+                      "uptimes" : [
+                        -1
+                      ]
                     }
                   }
                 },
@@ -712,6 +725,14 @@ public class ClusterStateTests extends ESTestCase {
                     "system" : false,
                     "timestamp_range" : {
                       "shards" : [ ]
+                    },
+                    "write_load" : {
+                      "loads" : [
+                        -1.0
+                      ],
+                      "uptimes" : [
+                        -1
+                      ]
                     }
                   }
                 },
@@ -901,6 +922,7 @@ public class ClusterStateTests extends ESTestCase {
             })
             .numberOfReplicas(2)
             .putRolloverInfo(new RolloverInfo("rolloveAlias", new ArrayList<>(), 1L))
+            .indexWriteLoad(IndexWriteLoad.builder(1).build())
             .build();
 
         return ClusterState.builder(ClusterName.DEFAULT)

+ 14 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.shard.IndexWriteLoad;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.test.ESTestCase;
@@ -72,6 +73,7 @@ public class IndexMetadataTests extends ESTestCase {
         Map<String, String> customMap = new HashMap<>();
         customMap.put(randomAlphaOfLength(5), randomAlphaOfLength(10));
         customMap.put(randomAlphaOfLength(10), randomAlphaOfLength(15));
+        IndexWriteLoad indexWriteLoad = randomBoolean() ? randomWriteLoad(numShard) : null;
         IndexMetadata metadata = IndexMetadata.builder("foo")
             .settings(
                 Settings.builder()
@@ -98,6 +100,7 @@ public class IndexMetadataTests extends ESTestCase {
                     randomNonNegativeLong()
                 )
             )
+            .indexWriteLoad(indexWriteLoad)
             .build();
         assertEquals(system, metadata.isSystem());
 
@@ -126,6 +129,7 @@ public class IndexMetadataTests extends ESTestCase {
         Map<String, DiffableStringMap> expectedCustom = Map.of("my_custom", new DiffableStringMap(customMap));
         assertEquals(metadata.getCustomData(), expectedCustom);
         assertEquals(metadata.getCustomData(), fromXContentMeta.getCustomData());
+        assertEquals(metadata.getWriteLoad(), fromXContentMeta.getWriteLoad());
 
         final BytesStreamOutput out = new BytesStreamOutput();
         metadata.writeTo(out);
@@ -146,6 +150,7 @@ public class IndexMetadataTests extends ESTestCase {
             assertEquals(deserialized.getCustomData(), expectedCustom);
             assertEquals(metadata.getCustomData(), deserialized.getCustomData());
             assertEquals(metadata.isSystem(), deserialized.isSystem());
+            assertEquals(metadata.getWriteLoad(), deserialized.getWriteLoad());
         }
     }
 
@@ -490,4 +495,13 @@ public class IndexMetadataTests extends ESTestCase {
             .put(DataTier.TIER_PREFERENCE, dataTier)
             .build();
     }
+
+    private IndexWriteLoad randomWriteLoad(int numberOfShards) {
+        IndexWriteLoad.Builder indexWriteLoadBuilder = IndexWriteLoad.builder(numberOfShards);
+        int numberOfPopulatedWriteLoads = randomIntBetween(0, numberOfShards);
+        for (int i = 0; i < numberOfPopulatedWriteLoads; i++) {
+            indexWriteLoadBuilder.withShardWriteLoad(i, randomDoubleBetween(0.0, 128.0, true), randomNonNegativeLong());
+        }
+        return indexWriteLoadBuilder.build();
+    }
 }

+ 60 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexWriteLoadSerializationTests.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractXContentSerializingTestCase;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+
+public class IndexWriteLoadSerializationTests extends AbstractXContentSerializingTestCase<IndexWriteLoad> {
+
+    @Override
+    protected IndexWriteLoad doParseInstance(XContentParser parser) throws IOException {
+        return IndexWriteLoad.fromXContent(parser);
+    }
+
+    @Override
+    protected Writeable.Reader<IndexWriteLoad> instanceReader() {
+        return IndexWriteLoad::new;
+    }
+
+    @Override
+    protected IndexWriteLoad createTestInstance() {
+        final int numberOfShards = randomIntBetween(1, 10);
+        final var indexWriteLoad = IndexWriteLoad.builder(numberOfShards);
+        for (int i = 0; i < numberOfShards; i++) {
+            indexWriteLoad.withShardWriteLoad(i, randomDoubleBetween(1, 10, true), randomLongBetween(1, 1000));
+        }
+        return indexWriteLoad.build();
+    }
+
+    @Override
+    protected IndexWriteLoad mutateInstance(IndexWriteLoad instance) throws IOException {
+        final int newNumberOfShards;
+        if (instance.numberOfShards() > 1 && randomBoolean()) {
+            newNumberOfShards = randomIntBetween(1, instance.numberOfShards() - 1);
+        } else {
+            newNumberOfShards = instance.numberOfShards() + randomIntBetween(1, 5);
+        }
+        final var indexWriteLoad = IndexWriteLoad.builder(newNumberOfShards);
+        for (int i = 0; i < newNumberOfShards; i++) {
+            boolean existingShard = i < instance.numberOfShards();
+            double shardLoad = existingShard && randomBoolean()
+                ? instance.getWriteLoadForShard(i).getAsDouble()
+                : randomDoubleBetween(0, 128, true);
+            long uptimeInMillis = existingShard && randomBoolean()
+                ? instance.getUptimeInMillisForShard(i).getAsLong()
+                : randomNonNegativeLong();
+            indexWriteLoad.withShardWriteLoad(i, shardLoad, uptimeInMillis);
+        }
+        return indexWriteLoad.build();
+    }
+}

+ 147 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexWriteLoadTests.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.indices.stats.CommonStats;
+import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
+import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class IndexWriteLoadTests extends ESTestCase {
+
+    public void testGetWriteLoadForShardAndGetUptimeInMillisForShard() {
+        final int numberOfPopulatedShards = 10;
+        final int numberOfShards = randomIntBetween(numberOfPopulatedShards, 20);
+        final IndexWriteLoad.Builder indexWriteLoadBuilder = IndexWriteLoad.builder(numberOfShards);
+
+        final double[] populatedShardWriteLoads = new double[numberOfPopulatedShards];
+        final long[] populatedShardUptimes = new long[numberOfPopulatedShards];
+        for (int shardId = 0; shardId < numberOfPopulatedShards; shardId++) {
+            double writeLoad = randomDoubleBetween(1, 128, true);
+            long uptimeInMillis = randomNonNegativeLong();
+            populatedShardWriteLoads[shardId] = writeLoad;
+            populatedShardUptimes[shardId] = uptimeInMillis;
+            indexWriteLoadBuilder.withShardWriteLoad(shardId, writeLoad, uptimeInMillis);
+        }
+
+        final IndexWriteLoad indexWriteLoad = indexWriteLoadBuilder.build();
+        for (int shardId = 0; shardId < numberOfShards; shardId++) {
+            if (shardId < numberOfPopulatedShards) {
+                assertThat(indexWriteLoad.getWriteLoadForShard(shardId).isPresent(), is(equalTo(true)));
+                assertThat(indexWriteLoad.getWriteLoadForShard(shardId).getAsDouble(), is(equalTo(populatedShardWriteLoads[shardId])));
+                assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).isPresent(), is(equalTo(true)));
+                assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).getAsLong(), is(equalTo(populatedShardUptimes[shardId])));
+            } else {
+                assertThat(indexWriteLoad.getWriteLoadForShard(shardId).isPresent(), is(false));
+                assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).isPresent(), is(false));
+            }
+        }
+    }
+
+    public void testFromStatsCreation() {
+        final String indexName = "idx";
+        final IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
+            .settings(
+                Settings.builder()
+                    .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                    .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
+                    .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
+                    .build()
+            )
+            .build();
+
+        final IndicesStatsResponse response = mock(IndicesStatsResponse.class);
+        final IndexStats indexStats = mock(IndexStats.class);
+
+        // Shard 0 has both primary/replica
+        final IndexShardStats indexShard0Stats = new IndexShardStats(
+            new ShardId(indexName, "__na__", 0),
+            new ShardStats[] {
+                createShardStats(indexName, 0, true, TimeValue.timeValueMillis(2048).nanos(), TimeValue.timeValueMillis(1024).nanos()),
+                createShardStats(indexName, 0, false, TimeValue.timeValueMillis(2048).nanos(), TimeValue.timeValueMillis(512).nanos()) }
+        );
+
+        // Shard 1 only has a replica available
+        final IndexShardStats indexShard1Stats = new IndexShardStats(
+            new ShardId(indexName, "__na__", 1),
+            new ShardStats[] {
+                createShardStats(indexName, 1, false, TimeValue.timeValueMillis(4096).nanos(), TimeValue.timeValueMillis(512).nanos()) }
+        );
+        // Shard 2 was not available
+
+        when(response.getIndex(indexName)).thenReturn(indexStats);
+        when(indexStats.getIndexShards()).thenReturn(Map.of(0, indexShard0Stats, 1, indexShard1Stats));
+
+        // Shard 0 uses the results from the primary
+        final IndexWriteLoad indexWriteLoadFromStats = IndexWriteLoad.fromStats(indexMetadata, response);
+        assertThat(indexWriteLoadFromStats.getWriteLoadForShard(0).isPresent(), is(equalTo(true)));
+        assertThat(indexWriteLoadFromStats.getWriteLoadForShard(0).getAsDouble(), is(equalTo(2.0)));
+        assertThat(indexWriteLoadFromStats.getUptimeInMillisForShard(0).isPresent(), is(equalTo(true)));
+        assertThat(indexWriteLoadFromStats.getUptimeInMillisForShard(0).getAsLong(), is(equalTo(1024L)));
+
+        // Shard 1 uses the only available stats from a replica
+        assertThat(indexWriteLoadFromStats.getWriteLoadForShard(1).isPresent(), is(equalTo(true)));
+        assertThat(indexWriteLoadFromStats.getWriteLoadForShard(1).getAsDouble(), is(equalTo(8.0)));
+        assertThat(indexWriteLoadFromStats.getUptimeInMillisForShard(1).isPresent(), is(equalTo(true)));
+        assertThat(indexWriteLoadFromStats.getUptimeInMillisForShard(1).getAsLong(), is(equalTo(512L)));
+
+        assertThat(indexWriteLoadFromStats.getWriteLoadForShard(2).isPresent(), is(equalTo(false)));
+        assertThat(indexWriteLoadFromStats.getUptimeInMillisForShard(2).isPresent(), is(equalTo(false)));
+
+        assertThat(IndexWriteLoad.fromStats(indexMetadata, null), is(nullValue()));
+    }
+
+    private ShardStats createShardStats(
+        String indexName,
+        int shard,
+        boolean primary,
+        long totalIndexingTimeSinceShardStartedInNanos,
+        long totalActiveTimeInNanos
+    ) {
+        RecoverySource recoverySource = primary
+            ? RecoverySource.EmptyStoreRecoverySource.INSTANCE
+            : RecoverySource.PeerRecoverySource.INSTANCE;
+        ShardRouting shardRouting = ShardRouting.newUnassigned(
+            new ShardId(indexName, "__na__", shard),
+            primary,
+            recoverySource,
+            new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")
+        );
+        shardRouting = ShardRoutingHelper.initialize(shardRouting, UUIDs.randomBase64UUID());
+        shardRouting = ShardRoutingHelper.moveToStarted(shardRouting);
+
+        final CommonStats commonStats = new CommonStats(CommonStatsFlags.ALL);
+        commonStats.getIndexing()
+            .getTotal()
+            .add(
+                new IndexingStats.Stats(0, 0, 0, 0, 0, 0, 0, 0, false, 0, totalIndexingTimeSinceShardStartedInNanos, totalActiveTimeInNanos)
+            );
+        return new ShardStats(shardRouting, commonStats, null, null, null, null, null, false);
+    }
+}

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java

@@ -386,7 +386,7 @@ public class IndexStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestC
         commonStats.getStore().add(new StoreStats(++iota, no, no));
         commonStats.getRefresh().add(new RefreshStats(no, ++iota, no, ++iota, (int) no));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota, no);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota, no, no);
         commonStats.getIndexing().add(new IndexingStats(indexingStats));
 
         final SearchStats.Stats searchStats = new SearchStats.Stats(++iota, ++iota, no, no, no, no, no, no, no, no, no, no);

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java

@@ -183,7 +183,7 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes
         commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong()));
         commonStats.getStore().add(new StoreStats(2L, 0L, 0L));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0);
         commonStats.getIndexing().add(new IndexingStats(indexingStats));
 
         final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);

+ 14 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

@@ -328,7 +328,20 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
         indicesCommonStats.getFieldData().add(new FieldDataStats(++iota, ++iota, null));
         indicesCommonStats.getStore().add(new StoreStats(++iota, no, no));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, ++iota, no, no, no, no, no, false, ++iota, no);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(
+            ++iota,
+            ++iota,
+            ++iota,
+            no,
+            no,
+            no,
+            no,
+            no,
+            false,
+            ++iota,
+            no,
+            no
+        );
         indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));
         indicesCommonStats.getQueryCache().add(new QueryCacheStats(++iota, ++iota, ++iota, ++iota, no));
         indicesCommonStats.getRequestCache().add(new RequestCacheStats(++iota, ++iota, ++iota, ++iota));