1
0
Эх сурвалжийг харах

Add metrics to the shared blob cache (#101577)

Adds pattern for metrics in the shared blob cache.
John Verwolf 1 жил өмнө
parent
commit
772474d52a

+ 5 - 0
docs/changelog/101577.yaml

@@ -0,0 +1,5 @@
+pr: 101577
+summary: Add metrics to the shared blob cache
+area: Search
+type: enhancement
+issues: []

+ 48 - 0
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.blobcache;
+
+import org.elasticsearch.telemetry.TelemetryProvider;
+import org.elasticsearch.telemetry.metric.LongCounter;
+import org.elasticsearch.telemetry.metric.LongHistogram;
+import org.elasticsearch.telemetry.metric.MeterRegistry;
+
+public class BlobCacheMetrics {
+    private final LongCounter cacheMissCounter;
+    private final LongHistogram cacheMissLoadTimes;
+
+    public BlobCacheMetrics(MeterRegistry meterRegistry) {
+        this(
+            meterRegistry.registerLongCounter(
+                "elasticsearch.blob_cache.miss_that_triggered_read",
+                "The number of times there was a cache miss that triggered a read from the blob store",
+                "count"
+            ),
+            meterRegistry.registerLongHistogram(
+                "elasticsearch.blob_cache.cache_miss_load_times",
+                "The timing data for populating entries in the blob store resulting from a cache miss.",
+                "count"
+            )
+        );
+    }
+
+    BlobCacheMetrics(LongCounter cacheMissCounter, LongHistogram cacheMissLoadTimes) {
+        this.cacheMissCounter = cacheMissCounter;
+        this.cacheMissLoadTimes = cacheMissLoadTimes;
+    }
+
+    public static BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
+
+    public LongCounter getCacheMissCounter() {
+        return cacheMissCounter;
+    }
+
+    public LongHistogram getCacheMissLoadTimes() {
+        return cacheMissLoadTimes;
+    }
+}

+ 31 - 5
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

@@ -13,6 +13,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.RefCountingListener;
+import org.elasticsearch.blobcache.BlobCacheMetrics;
 import org.elasticsearch.blobcache.BlobCacheUtils;
 import org.elasticsearch.blobcache.common.ByteRange;
 import org.elasticsearch.blobcache.common.SparseFileTracker;
@@ -298,8 +299,16 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
     private final LongAdder evictCount = new LongAdder();
 
-    public SharedBlobCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool, String ioExecutor) {
-        this(environment, settings, threadPool, ioExecutor, ioExecutor);
+    private final BlobCacheMetrics blobCacheMetrics;
+
+    public SharedBlobCacheService(
+        NodeEnvironment environment,
+        Settings settings,
+        ThreadPool threadPool,
+        String ioExecutor,
+        BlobCacheMetrics blobCacheMetrics
+    ) {
+        this(environment, settings, threadPool, ioExecutor, ioExecutor, blobCacheMetrics);
     }
 
     public SharedBlobCacheService(
@@ -307,7 +316,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
         Settings settings,
         ThreadPool threadPool,
         String ioExecutor,
-        String bulkExecutor
+        String bulkExecutor,
+        BlobCacheMetrics blobCacheMetrics
     ) {
         this.threadPool = threadPool;
         this.ioExecutor = threadPool.executor(ioExecutor);
@@ -347,6 +357,8 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
 
         this.rangeSize = SHARED_CACHE_RANGE_SIZE_SETTING.get(settings);
         this.recoveryRangeSize = SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings);
+
+        this.blobCacheMetrics = blobCacheMetrics;
     }
 
     public static long calculateCacheSize(Settings settings, long totalFsSize) {
@@ -795,6 +807,20 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             final RangeAvailableHandler reader,
             final RangeMissingHandler writer
         ) throws Exception {
+            // We are interested in the total time that the system spends when fetching a result (including time spent queuing), so we start
+            // our measurement here.
+            final long startTime = threadPool.relativeTimeInMillis();
+            RangeMissingHandler writerInstrumentationDecorator = (
+                SharedBytes.IO channel,
+                int channelPos,
+                int relativePos,
+                int length,
+                IntConsumer progressUpdater) -> {
+                writer.fillCacheRange(channel, channelPos, relativePos, length, progressUpdater);
+                var elapsedTime = threadPool.relativeTimeInMillis() - startTime;
+                SharedBlobCacheService.this.blobCacheMetrics.getCacheMissLoadTimes().record(elapsedTime);
+                SharedBlobCacheService.this.blobCacheMetrics.getCacheMissCounter().increment();
+            };
             if (rangeToRead.isEmpty()) {
                 // nothing to read, skip
                 return 0;
@@ -802,9 +828,9 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
             final int startRegion = getRegion(rangeToWrite.start());
             final int endRegion = getEndingRegion(rangeToWrite.end());
             if (startRegion == endRegion) {
-                return readSingleRegion(rangeToWrite, rangeToRead, reader, writer, startRegion);
+                return readSingleRegion(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion);
             }
-            return readMultiRegions(rangeToWrite, rangeToRead, reader, writer, startRegion, endRegion);
+            return readMultiRegions(rangeToWrite, rangeToRead, reader, writerInstrumentationDecorator, startRegion, endRegion);
         }
 
         private int readSingleRegion(

+ 63 - 10
x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.blobcache.shared;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.blobcache.BlobCacheMetrics;
 import org.elasticsearch.blobcache.common.ByteRange;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.common.settings.Setting;
@@ -66,7 +67,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             final var cacheKey = generateCacheKey();
             assertEquals(5, cacheService.freeRegionCount());
@@ -126,7 +133,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             final var cacheKey = generateCacheKey();
             assertEquals(2, cacheService.freeRegionCount());
@@ -164,7 +177,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             final var cacheKey1 = generateCacheKey();
             final var cacheKey2 = generateCacheKey();
@@ -192,7 +211,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             final var cacheKey1 = generateCacheKey();
             final var cacheKey2 = generateCacheKey();
@@ -219,7 +244,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool(), ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                taskQueue.getThreadPool(),
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             final var cacheKey1 = generateCacheKey();
             final var cacheKey2 = generateCacheKey();
@@ -284,7 +315,13 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
         Set<String> files = randomSet(1, 10, () -> randomAlphaOfLength(5));
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<String>(environment, settings, threadPool, ThreadPool.Names.GENERIC)
+            var cacheService = new SharedBlobCacheService<String>(
+                environment,
+                settings,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
+            )
         ) {
             CyclicBarrier ready = new CyclicBarrier(threads);
             List<Thread> threadList = IntStream.range(0, threads).mapToObj(no -> {
@@ -364,7 +401,14 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
 
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, threadPool, ThreadPool.Names.GENERIC, "bulk")
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                "bulk",
+                BlobCacheMetrics.NOOP
+            )
         ) {
             {
                 final var cacheKey = generateCacheKey();
@@ -418,7 +462,14 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
 
         try (
             NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
-            var cacheService = new SharedBlobCacheService<>(environment, settings, threadPool, ThreadPool.Names.GENERIC, "bulk")
+            var cacheService = new SharedBlobCacheService<>(
+                environment,
+                settings,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                "bulk",
+                BlobCacheMetrics.NOOP
+            )
         ) {
 
             final long size = size(randomIntBetween(1, 100));
@@ -620,7 +671,8 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 environment,
                 settings,
                 taskQueue.getThreadPool(),
-                ThreadPool.Names.GENERIC
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
             )
         ) {
             assertEquals(val1.getBytes(), cacheService.getStats().size());
@@ -637,7 +689,8 @@ public class SharedBlobCacheServiceTests extends ESTestCase {
                 environment,
                 settings,
                 taskQueue.getThreadPool(),
-                ThreadPool.Names.GENERIC
+                ThreadPool.Names.GENERIC,
+                BlobCacheMetrics.NOOP
             )
         ) {
             assertEquals(val2.getBytes(), cacheService.getStats().size());

+ 3 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

@@ -12,6 +12,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.blobcache.BlobCacheMetrics;
 import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -328,7 +329,8 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng
                 nodeEnvironment,
                 settings,
                 threadPool,
-                SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
+                SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+                new BlobCacheMetrics(services.telemetryProvider().getMeterRegistry())
             );
             this.frozenCacheService.set(sharedBlobCacheService);
             components.add(cacheService);

+ 7 - 3
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java

@@ -14,6 +14,7 @@ import org.apache.lucene.store.ByteBuffersIndexOutput;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
+import org.elasticsearch.blobcache.BlobCacheMetrics;
 import org.elasticsearch.blobcache.common.ByteRange;
 import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -143,7 +144,8 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
             nodeEnvironment,
             Settings.EMPTY,
             threadPool,
-            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
+            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+            BlobCacheMetrics.NOOP
         );
     }
 
@@ -165,7 +167,8 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
             singlePathNodeEnvironment,
             cacheSettings.build(),
             threadPool,
-            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
+            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+            BlobCacheMetrics.NOOP
         );
     }
 
@@ -189,7 +192,8 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe
                 .put(SharedBlobCacheService.SHARED_CACHE_RANGE_SIZE_SETTING.getKey(), cacheRangeSize)
                 .build(),
             threadPool,
-            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
+            SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+            BlobCacheMetrics.NOOP
         );
     }
 

+ 3 - 1
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInputTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.searchablesnapshots.store.input;
 
 import org.apache.lucene.store.IndexInput;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.blobcache.BlobCacheMetrics;
 import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
 import org.elasticsearch.blobcache.shared.SharedBytes;
 import org.elasticsearch.common.settings.Settings;
@@ -108,7 +109,8 @@ public class FrozenIndexInputTests extends AbstractSearchableSnapshotsTestCase {
                 nodeEnvironment,
                 settings,
                 threadPool,
-                SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME
+                SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME,
+                BlobCacheMetrics.NOOP
             );
             CacheService cacheService = randomCacheService();
             TestSearchableSnapshotDirectory directory = new TestSearchableSnapshotDirectory(