Browse Source

Keep track of average shard write load (#90768)

This commit adds a new field, write_load, into the shard stats. This new stat exposes the average number of write threads used while indexing documents.

Closes #90102
Francisco Fernández Castaño 3 years ago
parent
commit
1a3032beb6
25 changed files with 412 additions and 39 deletions
  1. 6 0
      docs/changelog/90768.yaml
  2. 4 0
      docs/reference/cluster/nodes-stats.asciidoc
  3. 59 0
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.stats/70_write_load.yml
  4. 2 1
      server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java
  5. 2 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java
  6. 33 0
      server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java
  7. 2 1
      server/src/main/java/org/elasticsearch/index/IndexService.java
  8. 9 1
      server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  9. 4 1
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  10. 26 7
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  11. 23 4
      server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java
  12. 27 5
      server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java
  13. 1 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java
  14. 4 2
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  15. 142 1
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  17. 2 1
      server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
  18. 4 2
      server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
  19. 10 5
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  20. 2 1
      test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java
  21. 43 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  22. 2 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
  23. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java
  24. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java
  25. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

+ 6 - 0
docs/changelog/90768.yaml

@@ -0,0 +1,6 @@
+pr: 90768
+summary: Keep track of average shard write load
+area: CRUD
+type: enhancement
+issues:
+ - 90102

+ 4 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -346,6 +346,10 @@ Total time spent throttling operations.
 (integer)
 Total time in milliseconds
 spent throttling operations.
+
+`write_load`::
+(double)
+Average number of write threads used while indexing documents.
 =======
 
 `get`::

+ 59 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.stats/70_write_load.yml

@@ -0,0 +1,59 @@
+---
+setup:
+  - skip:
+      version: " - 8.5.99"
+      reason: Indices write load stats were introduced in 8.6
+---
+"Write load average is tracked at shard level":
+  - do:
+      indices.create:
+        index: testindex
+        body:
+          settings:
+            index.number_of_shards: 1
+            index.number_of_replicas: 0
+          mappings:
+            properties:
+              name:
+                type: text
+              description:
+                type: text
+              price:
+                type: double
+
+  - do:
+      indices.stats:
+        index: "testindex"
+        level: shards
+        metric: [ indexing ]
+
+  - match: { _all.total.indexing.write_load: 0.0 }
+  - match: { indices.testindex.total.indexing.write_load: 0.0 }
+  - match: { indices.testindex.shards.0.0.indexing.write_load: 0.0 }
+
+  - do:
+      index:
+        index: testindex
+        body: { "name": "specialty coffee", "description": "arabica coffee beans", "price": 100 }
+  - do:
+      index:
+        index: testindex
+        body: { "name": "commercial coffee", "description": "robusta coffee beans", "price": 50 }
+  - do:
+      index:
+        index: testindex
+        body: { "name": "raw coffee", "description": "colombian coffee beans", "price": 25 }
+  - do:
+      index:
+        index: testindex
+        body: { "name": "book", "description": "some book", "price": 1000 }
+
+  - do:
+      indices.stats:
+        index: "testindex"
+        level: shards
+        metric: [ indexing ]
+
+  - gte: { _all.total.indexing.write_load: 0.0 }
+  - gte: { indices.testindex.total.indexing.write_load: 0.0 }
+  - gte: { indices.testindex.shards.0.0.indexing.write_load: 0.0 }

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -659,7 +659,8 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             () -> {},
             RetentionLeaseSyncer.EMPTY,
             cbs,
-            IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER
+            IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
+            System::nanoTime
         );
     }
 

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

@@ -77,7 +77,8 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
                 config.retentionLeasesSupplier(),
                 config.getPrimaryTermSupplier(),
                 config.getSnapshotCommitSupplier(),
-                config.getLeafSorter()
+                config.getLeafSorter(),
+                config.getRelativeTimeInNanosSupplier()
             );
         }
 

+ 33 - 0
server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.indices.stats;
 
 import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -1437,6 +1438,38 @@ public class IndexStatsIT extends ESIntegTestCase {
         assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
     }
 
+    public void testWriteLoadIsCaptured() throws Exception {
+        final String indexName = "test-idx";
+        createIndex(indexName);
+        final IndicesStatsResponse statsResponseBeforeIndexing = client().admin().indices().prepareStats(indexName).get();
+        final IndexStats indexStatsBeforeIndexing = statsResponseBeforeIndexing.getIndices().get(indexName);
+        assertThat(indexStatsBeforeIndexing, is(notNullValue()));
+        assertThat(indexStatsBeforeIndexing.getPrimaries().getIndexing().getTotal().getWriteLoad(), is(equalTo(0.0)));
+
+        final AtomicInteger idGenerator = new AtomicInteger();
+        assertBusy(() -> {
+            final int numDocs = randomIntBetween(15, 25);
+            final List<ActionFuture<IndexResponse>> indexRequestFutures = new ArrayList<>(numDocs);
+            for (int i = 0; i < numDocs; i++) {
+                indexRequestFutures.add(
+                    client().prepareIndex(indexName)
+                        .setId(Integer.toString(idGenerator.incrementAndGet()))
+                        .setSource("{}", XContentType.JSON)
+                        .execute()
+                );
+            }
+
+            for (ActionFuture<IndexResponse> indexRequestFuture : indexRequestFutures) {
+                assertThat(indexRequestFuture.get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
+            }
+
+            final IndicesStatsResponse statsResponseAfterIndexing = client().admin().indices().prepareStats(indexName).get();
+            final IndexStats indexStatsAfterIndexing = statsResponseAfterIndexing.getIndices().get(indexName);
+            assertThat(indexStatsAfterIndexing, is(notNullValue()));
+            assertThat(indexStatsAfterIndexing.getPrimaries().getIndexing().getTotal().getWriteLoad(), is(greaterThan(0.0)));
+        });
+    }
+
     /**
      * Persist the global checkpoint on all shards of the given index into disk.
      * This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.

+ 2 - 1
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -513,7 +513,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 () -> globalCheckpointSyncer.accept(shardId),
                 retentionLeaseSyncer,
                 circuitBreakerService,
-                snapshotCommitSupplier
+                snapshotCommitSupplier,
+                System::nanoTime
             );
             eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
             eventListener.afterIndexShardCreated(indexShard);

+ 9 - 1
server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -110,6 +110,8 @@ public final class EngineConfig {
 
     private final TranslogConfig translogConfig;
 
+    private final LongSupplier relativeTimeInNanosSupplier;
+
     /**
      * Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
      */
@@ -136,7 +138,8 @@ public final class EngineConfig {
         Supplier<RetentionLeases> retentionLeasesSupplier,
         LongSupplier primaryTermSupplier,
         IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
-        Comparator<LeafReader> leafSorter
+        Comparator<LeafReader> leafSorter,
+        LongSupplier relativeTimeInNanosSupplier
     ) {
         this.shardId = shardId;
         this.indexSettings = indexSettings;
@@ -176,6 +179,7 @@ public final class EngineConfig {
         this.primaryTermSupplier = primaryTermSupplier;
         this.snapshotCommitSupplier = snapshotCommitSupplier;
         this.leafSorter = leafSorter;
+        this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
     }
 
     /**
@@ -374,4 +378,8 @@ public final class EngineConfig {
     public Comparator<LeafReader> getLeafSorter() {
         return leafSorter;
     }
+
+    public LongSupplier getRelativeTimeInNanosSupplier() {
+        return relativeTimeInNanosSupplier;
+    }
 }

+ 4 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -188,6 +188,8 @@ public class InternalEngine extends Engine {
     @Nullable
     private volatile String forceMergeUUID;
 
+    private final LongSupplier relativeTimeInNanosSupplier;
+
     public InternalEngine(EngineConfig engineConfig) {
         this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
     }
@@ -195,6 +197,7 @@ public class InternalEngine extends Engine {
     InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
         super(engineConfig);
         this.maxDocs = maxDocs;
+        this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
         final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
         store.incRef();
         IndexWriter writer = null;
@@ -1024,7 +1027,7 @@ public class InternalEngine extends Engine {
                     assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                     localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
                 }
-                indexResult.setTook(System.nanoTime() - index.startTime());
+                indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
                 indexResult.freeze();
                 return indexResult;
             } finally {

+ 26 - 7
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -286,6 +286,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     private final RefreshPendingLocationListener refreshPendingLocationListener;
     private volatile boolean useRetentionLeasesInPeerRecovery;
     private final boolean isDataStreamIndex; // if a shard is a part of data stream
+    private final LongSupplier relativeTimeInNanosSupplier;
+    private volatile long startedRelativeTimeInNanos;
+    private volatile long indexingTimeBeforeShardStartedInNanos;
 
     public IndexShard(
         final ShardRouting shardRouting,
@@ -307,7 +310,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         final Runnable globalCheckpointSyncer,
         final RetentionLeaseSyncer retentionLeaseSyncer,
         final CircuitBreakerService circuitBreakerService,
-        final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier
+        final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
+        final LongSupplier relativeTimeInNanosSupplier
     ) throws IOException {
         super(shardRouting.shardId(), indexSettings);
         assert shardRouting.initializing();
@@ -385,6 +389,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
         this.refreshPendingLocationListener = new RefreshPendingLocationListener();
         this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
+        this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
     }
 
     public ThreadPool getThreadPool() {
@@ -533,6 +538,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     : "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
 
                 changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
+                startedRelativeTimeInNanos = getRelativeTimeInNanos();
+                indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
             } else if (currentRouting.primary()
                 && currentRouting.relocating()
                 && replicationTracker.isRelocated()
@@ -956,7 +963,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 autoGeneratedTimeStamp,
                 isRetry,
                 ifSeqNo,
-                ifPrimaryTerm
+                ifPrimaryTerm,
+                getRelativeTimeInNanos()
             );
             Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
             if (update != null) {
@@ -985,9 +993,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         long autoGeneratedIdTimestamp,
         boolean isRetry,
         long ifSeqNo,
-        long ifPrimaryTerm
+        long ifPrimaryTerm,
+        long startTimeInNanos
     ) {
-        long startTime = System.nanoTime();
         assert source.dynamicTemplates().isEmpty() || origin == Engine.Operation.Origin.PRIMARY
             : "dynamic_templates parameter can only be associated with primary operations";
         DocumentMapper documentMapper = mapperService.documentMapper();
@@ -1013,7 +1021,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             version,
             versionType,
             origin,
-            startTime,
+            startTimeInNanos,
             autoGeneratedIdTimestamp,
             isRetry,
             ifSeqNo,
@@ -1272,7 +1280,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             throttled = engine.isThrottled();
             throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
         }
-        return internalIndexingStats.stats(throttled, throttleTimeInMillis);
+
+        return internalIndexingStats.stats(
+            throttled,
+            throttleTimeInMillis,
+            indexingTimeBeforeShardStartedInNanos,
+            getRelativeTimeInNanos() - startedRelativeTimeInNanos
+        );
     }
 
     public SearchStats searchStats(String... groups) {
@@ -3255,7 +3269,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             replicationTracker::getRetentionLeases,
             this::getOperationPrimaryTerm,
             snapshotCommitSupplier,
-            isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null
+            isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
+            relativeTimeInNanosSupplier
         );
     }
 
@@ -4091,6 +4106,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return retentionLeaseSyncer;
     }
 
+    public long getRelativeTimeInNanos() {
+        return relativeTimeInNanosSupplier.getAsLong();
+    }
+
     @Override
     public String toString() {
         return "IndexShard(shardRouting=" + shardRouting + ")";

+ 23 - 4
server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

@@ -26,6 +26,7 @@ import java.util.Objects;
 public class IndexingStats implements Writeable, ToXContentFragment {
 
     public static class Stats implements Writeable, ToXContentFragment {
+        private static final Version WRITE_LOAD_AVG_SUPPORTED_VERSION = Version.V_8_6_0;
 
         private long indexCount;
         private long indexTimeInMillis;
@@ -37,6 +38,7 @@ public class IndexingStats implements Writeable, ToXContentFragment {
         private long noopUpdateCount;
         private long throttleTimeInMillis;
         private boolean isThrottled;
+        private double writeLoad;
 
         Stats() {}
 
@@ -51,6 +53,9 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             noopUpdateCount = in.readVLong();
             isThrottled = in.readBoolean();
             throttleTimeInMillis = in.readLong();
+            if (in.getVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
+                writeLoad = in.readDouble();
+            }
         }
 
         public Stats(
@@ -63,7 +68,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             long deleteCurrent,
             long noopUpdateCount,
             boolean isThrottled,
-            long throttleTimeInMillis
+            long throttleTimeInMillis,
+            double writeLoad
         ) {
             this.indexCount = indexCount;
             this.indexTimeInMillis = indexTimeInMillis;
@@ -75,6 +81,7 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             this.noopUpdateCount = noopUpdateCount;
             this.isThrottled = isThrottled;
             this.throttleTimeInMillis = throttleTimeInMillis;
+            this.writeLoad = writeLoad;
         }
 
         public void add(Stats stats) {
@@ -92,6 +99,7 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             if (isThrottled != stats.isThrottled) {
                 isThrottled = true; // When combining if one is throttled set result to throttled.
             }
+            writeLoad += stats.writeLoad;
         }
 
         /**
@@ -161,6 +169,10 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             return noopUpdateCount;
         }
 
+        public double getWriteLoad() {
+            return writeLoad;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeVLong(indexCount);
@@ -173,7 +185,9 @@ public class IndexingStats implements Writeable, ToXContentFragment {
             out.writeVLong(noopUpdateCount);
             out.writeBoolean(isThrottled);
             out.writeLong(throttleTimeInMillis);
-
+            if (out.getVersion().onOrAfter(WRITE_LOAD_AVG_SUPPORTED_VERSION)) {
+                out.writeDouble(writeLoad);
+            }
         }
 
         @Override
@@ -191,6 +205,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
 
             builder.field(Fields.IS_THROTTLED, isThrottled);
             builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());
+
+            builder.field(Fields.WRITE_LOAD, writeLoad);
             return builder;
         }
 
@@ -208,7 +224,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
                 && deleteCurrent == that.deleteCurrent
                 && noopUpdateCount == that.noopUpdateCount
                 && isThrottled == that.isThrottled
-                && throttleTimeInMillis == that.throttleTimeInMillis;
+                && throttleTimeInMillis == that.throttleTimeInMillis
+                && writeLoad == that.writeLoad;
         }
 
         @Override
@@ -223,7 +240,8 @@ public class IndexingStats implements Writeable, ToXContentFragment {
                 deleteCurrent,
                 noopUpdateCount,
                 isThrottled,
-                throttleTimeInMillis
+                throttleTimeInMillis,
+                writeLoad
             );
         }
     }
@@ -311,6 +329,7 @@ public class IndexingStats implements Writeable, ToXContentFragment {
         static final String IS_THROTTLED = "is_throttled";
         static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis";
         static final String THROTTLED_TIME = "throttle_time";
+        static final String WRITE_LOAD = "write_load";
     }
 
     @Override

+ 27 - 5
server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

@@ -27,11 +27,25 @@ final class InternalIndexingStats implements IndexingOperationListener {
      * is returned for them. If they are set, then only types provided will be returned, or
      * {@code _all} for all types.
      */
-    IndexingStats stats(boolean isThrottled, long currentThrottleInMillis) {
-        IndexingStats.Stats total = totalStats.stats(isThrottled, currentThrottleInMillis);
+    IndexingStats stats(
+        boolean isThrottled,
+        long currentThrottleInMillis,
+        long indexingTimeBeforeShardStartedInNanos,
+        long timeSinceShardStartedInNanos
+    ) {
+        IndexingStats.Stats total = totalStats.stats(
+            isThrottled,
+            currentThrottleInMillis,
+            indexingTimeBeforeShardStartedInNanos,
+            timeSinceShardStartedInNanos
+        );
         return new IndexingStats(total);
     }
 
+    long totalIndexingTimeInNanos() {
+        return totalStats.indexMetric.sum();
+    }
+
     @Override
     public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
         if (operation.origin().isRecovery() == false) {
@@ -112,10 +126,17 @@ final class InternalIndexingStats implements IndexingOperationListener {
         private final CounterMetric deleteCurrent = new CounterMetric();
         private final CounterMetric noopUpdates = new CounterMetric();
 
-        IndexingStats.Stats stats(boolean isThrottled, long currentThrottleMillis) {
+        IndexingStats.Stats stats(
+            boolean isThrottled,
+            long currentThrottleMillis,
+            long indexingTimeBeforeShardStartedInNanos,
+            long timeSinceShardStartedInNanos
+        ) {
+            final long totalIndexingTimeInNanos = indexMetric.sum();
+            final long totalIndexingTimeSinceShardStarted = totalIndexingTimeInNanos - indexingTimeBeforeShardStartedInNanos;
             return new IndexingStats.Stats(
                 indexMetric.count(),
-                TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()),
+                TimeUnit.NANOSECONDS.toMillis(totalIndexingTimeInNanos),
                 indexCurrent.count(),
                 indexFailed.count(),
                 deleteMetric.count(),
@@ -123,7 +144,8 @@ final class InternalIndexingStats implements IndexingOperationListener {
                 deleteCurrent.count(),
                 noopUpdates.count(),
                 isThrottled,
-                TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)
+                TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
+                timeSinceShardStartedInNanos > 0 ? (double) totalIndexingTimeSinceShardStarted / timeSinceShardStartedInNanos : 0
             );
         }
     }

+ 1 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -574,6 +574,7 @@ public class NodeStatsTests extends ESTestCase {
             ++iota,
             ++iota,
             false,
+            ++iota,
             ++iota
         );
         indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));

+ 4 - 2
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3587,7 +3587,8 @@ public class InternalEngineTests extends EngineTestCase {
             () -> RetentionLeases.EMPTY,
             primaryTerm::get,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-            null
+            null,
+            config.getRelativeTimeInNanosSupplier()
         );
         expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
 
@@ -7255,7 +7256,8 @@ public class InternalEngineTests extends EngineTestCase {
                 config.retentionLeasesSupplier(),
                 config.getPrimaryTermSupplier(),
                 config.getSnapshotCommitSupplier(),
-                config.getLeafSorter()
+                config.getLeafSorter(),
+                config.getRelativeTimeInNanosSupplier()
             );
             try (InternalEngine engine = createEngine(configWithWarmer)) {
                 assertThat(warmedUpReaders, empty());

+ 142 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -155,6 +155,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongFunction;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -4517,7 +4518,8 @@ public class IndexShardTests extends IndexShardTestCase {
                 config.retentionLeasesSupplier(),
                 config.getPrimaryTermSupplier(),
                 IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-                config.getLeafSorter()
+                config.getLeafSorter(),
+                config.getRelativeTimeInNanosSupplier()
             );
             return new InternalEngine(configWithWarmer);
         });
@@ -4552,6 +4554,145 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testShardExposesWriteLoadStats() throws Exception {
+        final IndexShard primary = newStartedShard(true);
+        for (int i = 0; i < 10; i++) {
+            indexDoc(primary, "_doc", "primary-" + i);
+        }
+
+        final FakeClock fakeClock = new FakeClock();
+        final ShardRouting shardRouting = newShardRouting(
+            primary.shardId(),
+            randomAlphaOfLength(10),
+            false,
+            ShardRoutingState.INITIALIZING,
+            RecoverySource.PeerRecoverySource.INSTANCE
+        );
+        final ShardId shardId = shardRouting.shardId();
+        final NodeEnvironment.DataPath dataPath = new NodeEnvironment.DataPath(createTempDir());
+        final ShardPath shardPath = new ShardPath(false, dataPath.resolve(shardId), dataPath.resolve(shardId), shardId);
+        final IndexShard replicaShard = newShard(
+            shardRouting,
+            shardPath,
+            primary.indexSettings().getIndexMetadata(),
+            null,
+            null,
+            new InternalEngineFactory(),
+            () -> {},
+            RetentionLeaseSyncer.EMPTY,
+            EMPTY_EVENT_LISTENER,
+            fakeClock
+        );
+
+        // Now simulate that each operation takes 1 minute to complete.
+        // This applies both for replaying translog ops and new indexing ops
+        fakeClock.setSimulatedElapsedRelativeTime(TimeValue.timeValueMinutes(1));
+
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        final CountDownLatch concurrentIndexingFinished = new CountDownLatch(1);
+        // Index some documents concurrently while the shard is replaying the primary
+        // translog operations to ensure that those are not accounted to compute the
+        // shard write load.
+        final Thread indexingThread = new Thread(() -> {
+            try {
+                barrier.await(10, TimeUnit.SECONDS);
+                for (int i = 1; i <= 10; i++) {
+                    long seqNo = primary.seqNoStats().getMaxSeqNo() + i;
+                    replicaShard.applyIndexOperationOnReplica(
+                        seqNo,
+                        primaryTerm,
+                        1,
+                        IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
+                        false,
+                        new SourceToParse("id-" + seqNo, new BytesArray("{}"), XContentType.JSON)
+                    );
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            } finally {
+                concurrentIndexingFinished.countDown();
+            }
+        });
+        indexingThread.start();
+
+        recoverReplica(replicaShard, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, null, null, recoveryListener) {
+            @Override
+            public void indexTranslogOperations(
+                List<Translog.Operation> operations,
+                int totalTranslogOps,
+                long maxSeenAutoIdTimestampOnPrimary,
+                long maxSeqNoOfDeletesOrUpdatesOnPrimary,
+                RetentionLeases retentionLeases,
+                long mappingVersionOnPrimary,
+                ActionListener<Long> listener
+            ) {
+                try {
+                    barrier.await(10, TimeUnit.SECONDS);
+                    assertTrue(concurrentIndexingFinished.await(10, TimeUnit.SECONDS));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                super.indexTranslogOperations(
+                    operations,
+                    totalTranslogOps,
+                    maxSeenAutoIdTimestampOnPrimary,
+                    maxSeqNoOfDeletesOrUpdatesOnPrimary,
+                    retentionLeases,
+                    mappingVersionOnPrimary,
+                    listener
+                );
+            }
+        }, true, true);
+
+        fakeClock.setSimulatedElapsedRelativeTime(TimeValue.ZERO);
+        final IndexingStats indexingStatsBeforeIndexingDocs = replicaShard.indexingStats();
+        assertThat(indexingStatsBeforeIndexingDocs.getTotal().getWriteLoad(), is(equalTo(0.0)));
+
+        // Now simulate that each operation takes 1 second to complete.
+        fakeClock.setSimulatedElapsedRelativeTime(TimeValue.timeValueSeconds(1));
+        final int numberOfDocs = randomIntBetween(5, 10);
+        for (int i = 0; i < numberOfDocs; i++) {
+            long seqNo = replicaShard.seqNoStats().getMaxSeqNo() + 1;
+            replicaShard.applyIndexOperationOnReplica(
+                seqNo,
+                primaryTerm,
+                1,
+                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
+                false,
+                new SourceToParse("id-" + seqNo, new BytesArray("{}"), XContentType.JSON)
+            );
+        }
+
+        fakeClock.setSimulatedElapsedRelativeTime(TimeValue.ZERO);
+        final IndexingStats indexingStatsAfterIndexingDocs = replicaShard.indexingStats();
+        assertThat(indexingStatsAfterIndexingDocs.getTotal().getWriteLoad(), is(equalTo(1.0)));
+
+        closeShards(primary, replicaShard);
+    }
+
+    static class FakeClock implements LongSupplier {
+        private final AtomicLong currentRelativeTime = new AtomicLong();
+        private final AtomicInteger tick = new AtomicInteger();
+        private volatile TimeValue elapsedTimePerPairOfQueries = TimeValue.ZERO;
+
+        @Override
+        public long getAsLong() {
+            // Since the clock is checked at the beginning and at the end of
+            // the indexing op, just increase the current relative time at the
+            // end.
+            if (tick.getAndIncrement() % 2 == 0) {
+                return currentRelativeTime.get();
+            } else {
+                return currentRelativeTime.addAndGet(elapsedTimePerPairOfQueries.nanos());
+            }
+        }
+
+        void setSimulatedElapsedRelativeTime(TimeValue elapsedTimePerPairOfQueries) {
+            tick.set(0);
+            this.elapsedTimePerPairOfQueries = elapsedTimePerPairOfQueries;
+        }
+    }
+
     private static void blockingCallRelocated(
         IndexShard indexShard,
         ShardRouting routing,

+ 2 - 1
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -151,7 +151,8 @@ public class RefreshListenersTests extends ESTestCase {
             () -> RetentionLeases.EMPTY,
             () -> primaryTerm,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-            null
+            null,
+            System::nanoTime
         );
         engine = new InternalEngine(config);
         engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -400,7 +400,8 @@ public class IndexingMemoryControllerTests extends IndexShardTestCase {
             config.retentionLeasesSupplier(),
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
-            config.getLeafSorter()
+            config.getLeafSorter(),
+            config.getRelativeTimeInNanosSupplier()
         );
     }
 

+ 4 - 2
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -485,7 +485,8 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
                 -1,
                 false,
                 UNASSIGNED_SEQ_NO,
-                0
+                0,
+                System.nanoTime()
             );
         }
     }
@@ -521,7 +522,8 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
                 -1,
                 false,
                 UNASSIGNED_SEQ_NO,
-                0
+                0,
+                System.nanoTime()
             );
         }
     }

+ 10 - 5
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -273,7 +273,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.retentionLeasesSupplier(),
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
-            config.getLeafSorter()
+            config.getLeafSorter(),
+            config.getRelativeTimeInNanosSupplier()
         );
     }
 
@@ -301,7 +302,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.retentionLeasesSupplier(),
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
-            config.getLeafSorter()
+            config.getLeafSorter(),
+            config.getRelativeTimeInNanosSupplier()
         );
     }
 
@@ -329,7 +331,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.retentionLeasesSupplier(),
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
-            config.getLeafSorter()
+            config.getLeafSorter(),
+            config.getRelativeTimeInNanosSupplier()
         );
     }
 
@@ -847,7 +850,8 @@ public abstract class EngineTestCase extends ESTestCase {
             retentionLeasesSupplier,
             primaryTerm,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-            null
+            null,
+            System::nanoTime
         );
     }
 
@@ -883,7 +887,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.retentionLeasesSupplier(),
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
-            config.getLeafSorter()
+            config.getLeafSorter(),
+            config.getRelativeTimeInNanosSupplier()
         );
     }
 

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

@@ -103,7 +103,8 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
                     index.getAutoGeneratedIdTimestamp(),
                     true,
                     SequenceNumbers.UNASSIGNED_SEQ_NO,
-                    SequenceNumbers.UNASSIGNED_PRIMARY_TERM
+                    SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
+                    System.nanoTime()
                 );
                 return engineIndex;
             }

+ 43 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -93,6 +93,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
@@ -412,6 +413,46 @@ public abstract class IndexShardTestCase extends ESTestCase {
         RetentionLeaseSyncer retentionLeaseSyncer,
         IndexEventListener indexEventListener,
         IndexingOperationListener... listeners
+    ) throws IOException {
+        return newShard(
+            routing,
+            shardPath,
+            indexMetadata,
+            storeProvider,
+            indexReaderWrapper,
+            engineFactory,
+            globalCheckpointSyncer,
+            retentionLeaseSyncer,
+            indexEventListener,
+            System::nanoTime,
+            listeners
+        );
+    }
+
+    /**
+     * creates a new initializing shard.
+     * @param routing                       shard routing to use
+     * @param shardPath                     path to use for shard data
+     * @param indexMetadata                 indexMetadata for the shard, including any mapping
+     * @param storeProvider                 an optional custom store provider to use. If null a default file based store will be created
+     * @param indexReaderWrapper            an optional wrapper to be used during search
+     * @param globalCheckpointSyncer        callback for syncing global checkpoints
+     * @param indexEventListener            index event listener
+     * @param relativeTimeSupplier          the clock used to measure relative time
+     * @param listeners                     an optional set of listeners to add to the shard
+     */
+    protected IndexShard newShard(
+        ShardRouting routing,
+        ShardPath shardPath,
+        IndexMetadata indexMetadata,
+        @Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
+        @Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
+        @Nullable EngineFactory engineFactory,
+        Runnable globalCheckpointSyncer,
+        RetentionLeaseSyncer retentionLeaseSyncer,
+        IndexEventListener indexEventListener,
+        LongSupplier relativeTimeSupplier,
+        IndexingOperationListener... listeners
     ) throws IOException {
         final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
         final IndexSettings indexSettings = new IndexSettings(indexMetadata, nodeSettings);
@@ -461,7 +502,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
                 globalCheckpointSyncer,
                 retentionLeaseSyncer,
                 breakerService,
-                IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER
+                IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
+                relativeTimeSupplier
             );
             indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
             success = true;

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -286,7 +286,8 @@ public class FollowingEngineTests extends ESTestCase {
             () -> RetentionLeases.EMPTY,
             () -> primaryTerm.get(),
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-            null
+            null,
+            System::nanoTime
         );
     }
 

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java

@@ -386,7 +386,7 @@ public class IndexStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestC
         commonStats.getStore().add(new StoreStats(++iota, no, no));
         commonStats.getRefresh().add(new RefreshStats(no, ++iota, no, ++iota, (int) no));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota, no);
         commonStats.getIndexing().add(new IndexingStats(indexingStats));
 
         final SearchStats.Stats searchStats = new SearchStats.Stats(++iota, ++iota, no, no, no, no, no, no, no, no, no, no);

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java

@@ -183,7 +183,7 @@ public class IndicesStatsMonitoringDocTests extends BaseFilteredMonitoringDocTes
         commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong()));
         commonStats.getStore().add(new StoreStats(2L, 0L, 0L));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0);
         commonStats.getIndexing().add(new IndexingStats(indexingStats));
 
         final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java

@@ -328,7 +328,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa
         indicesCommonStats.getFieldData().add(new FieldDataStats(++iota, ++iota, null));
         indicesCommonStats.getStore().add(new StoreStats(++iota, no, no));
 
-        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, ++iota, no, no, no, no, no, false, ++iota);
+        final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, ++iota, no, no, no, no, no, false, ++iota, no);
         indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));
         indicesCommonStats.getQueryCache().add(new QueryCacheStats(++iota, ++iota, ++iota, ++iota, no));
         indicesCommonStats.getRequestCache().add(new RequestCacheStats(++iota, ++iota, ++iota, ++iota));