浏览代码

Add metric for calculating index flush time excluding waiting on locks (#107196)

Add a new `total_time_excluding_waiting_on_lock metric` to the index flush stats that measures the flushing time excluding waiting on the flush lock. This metrics provides a more granular view on flush performance and without the overhead of flush throttling.

Resolves ES-7201
Artem Prigoda 1 年之前
父节点
当前提交
6a300509cd

+ 5 - 0
docs/changelog/107196.yaml

@@ -0,0 +1,5 @@
+pr: 107196
+summary: Add metric for calculating index flush time excluding waiting on locks
+area: Engine
+type: enhancement
+issues: []

+ 1 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -626,6 +626,7 @@ Total time spent performing flush operations.
 (integer)
 Total time in milliseconds
 spent performing flush operations.
+
 =======
 
 `warmer`::

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -171,6 +171,7 @@ public class TransportVersions {
     public static final TransportVersion MODIFY_DATA_STREAM_FAILURE_STORES = def(8_630_00_0);
     public static final TransportVersion ML_INFERENCE_RERANK_NEW_RESPONSE_FORMAT = def(8_631_00_0);
     public static final TransportVersion HIGHLIGHTERS_TAGS_ON_FIELD_LEVEL = def(8_632_00_0);
+    public static final TransportVersion TRACK_FLUSH_TIME_EXCLUDING_WAITING_ON_LOCKS = def(8_633_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 7 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -395,6 +395,13 @@ public abstract class Engine implements Closeable {
      */
     public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;
 
+    /**
+     * Returns the total time flushes have been executed excluding waiting on locks.
+     */
+    public long getTotalFlushTimeExcludingWaitingOnLockInMillis() {
+        return 0;
+    }
+
     /** A Lock implementation that always allows the lock to be acquired */
     protected static final class NoOpLock implements Lock {
 

+ 11 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -52,6 +52,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
 import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.Maps;
@@ -107,6 +108,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -177,6 +179,8 @@ public class InternalEngine extends Engine {
     private final CounterMetric numDocDeletes = new CounterMetric();
     private final CounterMetric numDocAppends = new CounterMetric();
     private final CounterMetric numDocUpdates = new CounterMetric();
+    private final MeanMetric totalFlushTimeExcludingWaitingOnLock = new MeanMetric();
+
     private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
     private final SoftDeletesPolicy softDeletesPolicy;
     private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
@@ -2195,6 +2199,7 @@ public class InternalEngine extends Engine {
             logger.trace("acquired flush lock immediately");
         }
 
+        final long startTime = System.nanoTime();
         try {
             // Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
             // newly created commit points to a different translog generation (can free translog),
@@ -2246,6 +2251,7 @@ public class InternalEngine extends Engine {
             listener.onFailure(e);
             return;
         } finally {
+            totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime);
             flushLock.unlock();
             logger.trace("released flush lock");
         }
@@ -3066,6 +3072,11 @@ public class InternalEngine extends Engine {
         return numDocUpdates.count();
     }
 
+    @Override
+    public long getTotalFlushTimeExcludingWaitingOnLockInMillis() {
+        return TimeUnit.NANOSECONDS.toMillis(totalFlushTimeExcludingWaitingOnLock.sum());
+    }
+
     @Override
     public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
         ensureOpen();

+ 32 - 5
server/src/main/java/org/elasticsearch/index/flush/FlushStats.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.index.flush;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -23,6 +24,7 @@ public class FlushStats implements Writeable, ToXContentFragment {
     private long total;
     private long periodic;
     private long totalTimeInMillis;
+    private long totalTimeExcludingWaitingOnLockInMillis;
 
     public FlushStats() {
 
@@ -32,18 +34,22 @@ public class FlushStats implements Writeable, ToXContentFragment {
         total = in.readVLong();
         totalTimeInMillis = in.readVLong();
         periodic = in.readVLong();
+        totalTimeExcludingWaitingOnLockInMillis = in.getTransportVersion()
+            .onOrAfter(TransportVersions.TRACK_FLUSH_TIME_EXCLUDING_WAITING_ON_LOCKS) ? in.readVLong() : 0L;
     }
 
-    public FlushStats(long total, long periodic, long totalTimeInMillis) {
+    public FlushStats(long total, long periodic, long totalTimeInMillis, long totalTimeExcludingWaitingOnLockInMillis) {
         this.total = total;
         this.periodic = periodic;
         this.totalTimeInMillis = totalTimeInMillis;
+        this.totalTimeExcludingWaitingOnLockInMillis = totalTimeExcludingWaitingOnLockInMillis;
     }
 
-    public void add(long total, long periodic, long totalTimeInMillis) {
+    public void add(long total, long periodic, long totalTimeInMillis, long totalTimeWithoutWaitingInMillis) {
         this.total += total;
         this.periodic += periodic;
         this.totalTimeInMillis += totalTimeInMillis;
+        this.totalTimeExcludingWaitingOnLockInMillis += totalTimeWithoutWaitingInMillis;
     }
 
     public void add(FlushStats flushStats) {
@@ -57,6 +63,7 @@ public class FlushStats implements Writeable, ToXContentFragment {
         this.total += flushStats.total;
         this.periodic += flushStats.periodic;
         this.totalTimeInMillis += flushStats.totalTimeInMillis;
+        this.totalTimeExcludingWaitingOnLockInMillis += flushStats.totalTimeExcludingWaitingOnLockInMillis;
     }
 
     /**
@@ -81,18 +88,30 @@ public class FlushStats implements Writeable, ToXContentFragment {
     }
 
     /**
-     * The total time merges have been executed.
+     * The total time flushes have been executed.
      */
     public TimeValue getTotalTime() {
         return new TimeValue(totalTimeInMillis);
     }
 
+    /**
+     * The total time flushes have been executed excluding waiting time on locks (in milliseconds).
+     */
+    public long getTotalTimeExcludingWaitingOnLockMillis() {
+        return totalTimeExcludingWaitingOnLockInMillis;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject(Fields.FLUSH);
         builder.field(Fields.TOTAL, total);
         builder.field(Fields.PERIODIC, periodic);
         builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
+        builder.humanReadableField(
+            Fields.TOTAL_TIME_EXCLUDING_WAITING_ON_LOCK_IN_MILLIS,
+            Fields.TOTAL_TIME_EXCLUDING_WAITING,
+            new TimeValue(getTotalTimeExcludingWaitingOnLockMillis())
+        );
         builder.endObject();
         return builder;
     }
@@ -103,6 +122,8 @@ public class FlushStats implements Writeable, ToXContentFragment {
         static final String PERIODIC = "periodic";
         static final String TOTAL_TIME = "total_time";
         static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
+        static final String TOTAL_TIME_EXCLUDING_WAITING = "total_time_excluding_waiting";
+        static final String TOTAL_TIME_EXCLUDING_WAITING_ON_LOCK_IN_MILLIS = "total_time_excluding_waiting_on_lock_in_millis";
     }
 
     @Override
@@ -110,6 +131,9 @@ public class FlushStats implements Writeable, ToXContentFragment {
         out.writeVLong(total);
         out.writeVLong(totalTimeInMillis);
         out.writeVLong(periodic);
+        if (out.getTransportVersion().onOrAfter(TransportVersions.TRACK_FLUSH_TIME_EXCLUDING_WAITING_ON_LOCKS)) {
+            out.writeVLong(totalTimeExcludingWaitingOnLockInMillis);
+        }
     }
 
     @Override
@@ -117,11 +141,14 @@ public class FlushStats implements Writeable, ToXContentFragment {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         FlushStats that = (FlushStats) o;
-        return total == that.total && totalTimeInMillis == that.totalTimeInMillis && periodic == that.periodic;
+        return total == that.total
+            && totalTimeInMillis == that.totalTimeInMillis
+            && periodic == that.periodic
+            && totalTimeExcludingWaitingOnLockInMillis == that.totalTimeExcludingWaitingOnLockInMillis;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(total, totalTimeInMillis, periodic);
+        return Objects.hash(total, totalTimeInMillis, periodic, totalTimeExcludingWaitingOnLockInMillis);
     }
 }

+ 6 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1329,7 +1329,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     public FlushStats flushStats() {
-        return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
+        return new FlushStats(
+            flushMetric.count(),
+            periodicFlushMetric.count(),
+            TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()),
+            getEngineOrNull() != null ? getEngineOrNull().getTotalFlushTimeExcludingWaitingOnLockInMillis() : 0L
+        );
     }
 
     public DocsStats docStats() {

+ 24 - 0
server/src/main/java/org/elasticsearch/monitor/metrics/NodeMetrics.java

@@ -651,6 +651,29 @@ public class NodeMetrics extends AbstractLifecycleComponent {
             )
         );
 
+        metrics.add(
+            registry.registerLongAsyncCounter(
+                "es.flush.total.time",
+                "The total time flushes have been executed excluding waiting time on locks",
+                "milliseconds",
+                () -> new LongWithAttributes(
+                    stats.getOrRefresh() != null ? stats.getOrRefresh().getIndices().getFlush().getTotalTimeInMillis() : 0L
+                )
+            )
+        );
+
+        metrics.add(
+            registry.registerLongAsyncCounter(
+                "es.flush.total_excluding_lock_waiting.time",
+                "The total time flushes have been executed excluding waiting time on locks",
+                "milliseconds",
+                () -> new LongWithAttributes(
+                    stats.getOrRefresh() != null
+                        ? stats.getOrRefresh().getIndices().getFlush().getTotalTimeExcludingWaitingOnLockMillis()
+                        : 0L
+                )
+            )
+        );
     }
 
     /**
@@ -680,6 +703,7 @@ public class NodeMetrics extends AbstractLifecycleComponent {
     private NodeStats getNodeStats() {
         CommonStatsFlags flags = new CommonStatsFlags(
             CommonStatsFlags.Flag.Indexing,
+            CommonStatsFlags.Flag.Flush,
             CommonStatsFlags.Flag.Get,
             CommonStatsFlags.Flag.Search,
             CommonStatsFlags.Flag.Merge,

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -628,7 +628,7 @@ public class NodeStatsTests extends ESTestCase {
 
         indicesCommonStats.getMerge().add(mergeStats);
         indicesCommonStats.getRefresh().add(new RefreshStats(++iota, ++iota, ++iota, ++iota, ++iota));
-        indicesCommonStats.getFlush().add(new FlushStats(++iota, ++iota, ++iota));
+        indicesCommonStats.getFlush().add(new FlushStats(++iota, ++iota, ++iota, ++iota));
         indicesCommonStats.getWarmer().add(new WarmerStats(++iota, ++iota, ++iota));
         indicesCommonStats.getCompletion().add(new CompletionStats(++iota, null));
         indicesCommonStats.getTranslog().add(new TranslogStats(++iota, ++iota, ++iota, ++iota, ++iota));

+ 36 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -89,6 +89,7 @@ import org.elasticsearch.index.fielddata.FieldDataStats;
 import org.elasticsearch.index.fielddata.IndexFieldData;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.fielddata.IndexFieldDataService;
+import org.elasticsearch.index.flush.FlushStats;
 import org.elasticsearch.index.mapper.DocumentParsingException;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.LuceneDocument;
@@ -155,6 +156,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -192,6 +194,7 @@ import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.matchesRegex;
 import static org.hamcrest.Matchers.not;
@@ -3998,6 +4001,39 @@ public class IndexShardTests extends IndexShardTestCase {
         closeShards(shard);
     }
 
+    public void testFlushTimeExcludingWaiting() throws Exception {
+        IndexShard shard = newStartedShard();
+        for (int i = 0; i < randomIntBetween(4, 10); i++) {
+            indexDoc(shard, "_doc", Integer.toString(i));
+        }
+
+        int numFlushes = randomIntBetween(2, 5);
+        var flushesLatch = new CountDownLatch(numFlushes);
+        var executor = Executors.newFixedThreadPool(numFlushes);
+        for (int i = 0; i < numFlushes; i++) {
+            executor.submit(() -> {
+                shard.flush(new FlushRequest().waitIfOngoing(true).force(true));
+                flushesLatch.countDown();
+            });
+        }
+        safeAwait(flushesLatch);
+
+        FlushStats flushStats = shard.flushStats();
+        assertThat(
+            "Flush time excluding waiting should be captured",
+            flushStats.getTotalTimeExcludingWaitingOnLockMillis(),
+            greaterThan(0L)
+        );
+        assertThat(
+            "Flush time excluding waiting should less than flush time with waiting",
+            flushStats.getTotalTimeExcludingWaitingOnLockMillis(),
+            lessThan(flushStats.getTotalTime().millis())
+        );
+
+        closeShards(shard);
+        executor.shutdown();
+    }
+
     @TestLogging(reason = "testing traces of concurrent flushes", value = "org.elasticsearch.index.engine.Engine:TRACE")
     public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception {
         final MockLogAppender mockLogAppender = new MockLogAppender();