Browse Source

Add size_in_bytes to enrich cache stats (#110578)

As preparation for #106081, this PR adds the `size_in_bytes`
field to the enrich cache. This field is calculated by summing
the ByteReference sizes of all the search hits in the cache.
It's not a perfect representation of the size of the enrich cache
on the heap, but some experimentation showed that it's quite close.
Niels Bauman 1 year ago
parent
commit
86727a8741

+ 5 - 0
docs/changelog/110578.yaml

@@ -0,0 +1,5 @@
+pr: 110578
+summary: Add `size_in_bytes` to enrich cache stats
+area: Ingest Node
+type: enhancement
+issues: []

+ 7 - 1
docs/reference/ingest/apis/enrich/enrich-stats.asciidoc

@@ -121,6 +121,10 @@ The amount of time in milliseconds spent fetching data from the cache on success
 `misses_time_in_millis`::
 (Long)
 The amount of time in milliseconds spent fetching data from the enrich index and updating the cache, on cache misses only.
+
+`size_in_bytes`::
+(Long)
+An _approximation_ of the size in bytes that the enrich cache takes up on the heap.
 --
 
 [[enrich-stats-api-example]]
@@ -172,7 +176,8 @@ The API returns the following response:
       "misses": 0,
       "evictions": 0,
       "hits_time_in_millis": 0,
-      "misses_time_in_millis": 0
+      "misses_time_in_millis": 0,
+      "size_in_bytes": 0
     }
   ]
 }
@@ -187,3 +192,4 @@ The API returns the following response:
 // TESTRESPONSE[s/"evictions": 0/"evictions" : $body.cache_stats.0.evictions/]
 // TESTRESPONSE[s/"hits_time_in_millis": 0/"hits_time_in_millis" : $body.cache_stats.0.hits_time_in_millis/]
 // TESTRESPONSE[s/"misses_time_in_millis": 0/"misses_time_in_millis" : $body.cache_stats.0.misses_time_in_millis/]
+// TESTRESPONSE[s/"size_in_bytes": 0/"size_in_bytes" : $body.cache_stats.0.size_in_bytes/]

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -213,6 +213,7 @@ public class TransportVersions {
     public static final TransportVersion INFERENCE_ADAPTIVE_ALLOCATIONS = def(8_704_00_0);
     public static final TransportVersion INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN = def(8_705_00_0);
     public static final TransportVersion ML_INFERENCE_COHERE_UNUSED_RERANK_SETTINGS_REMOVED = def(8_706_00_0);
+    public static final TransportVersion ENRICH_CACHE_STATS_SIZE_ADDED = def(8_707_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 9 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.xcontent.ToXContentFragment;
@@ -195,7 +196,8 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
             long misses,
             long evictions,
             long hitsTimeInMillis,
-            long missesTimeInMillis
+            long missesTimeInMillis,
+            long cacheSizeInBytes
         ) implements Writeable, ToXContentFragment {
 
             public CacheStats(StreamInput in) throws IOException {
@@ -206,7 +208,8 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
                     in.readVLong(),
                     in.readVLong(),
                     in.getTransportVersion().onOrAfter(TransportVersions.ENRICH_CACHE_ADDITIONAL_STATS) ? in.readLong() : -1,
-                    in.getTransportVersion().onOrAfter(TransportVersions.ENRICH_CACHE_ADDITIONAL_STATS) ? in.readLong() : -1
+                    in.getTransportVersion().onOrAfter(TransportVersions.ENRICH_CACHE_ADDITIONAL_STATS) ? in.readLong() : -1,
+                    in.getTransportVersion().onOrAfter(TransportVersions.ENRICH_CACHE_STATS_SIZE_ADDED) ? in.readLong() : -1
                 );
             }
 
@@ -219,6 +222,7 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
                 builder.field("evictions", evictions);
                 builder.humanReadableField("hits_time_in_millis", "hits_time", new TimeValue(hitsTimeInMillis));
                 builder.humanReadableField("misses_time_in_millis", "misses_time", new TimeValue(missesTimeInMillis));
+                builder.humanReadableField("size_in_bytes", "size", ByteSizeValue.ofBytes(cacheSizeInBytes));
                 return builder;
             }
 
@@ -233,6 +237,9 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
                     out.writeLong(hitsTimeInMillis);
                     out.writeLong(missesTimeInMillis);
                 }
+                if (out.getTransportVersion().onOrAfter(TransportVersions.ENRICH_CACHE_STATS_SIZE_ADDED)) {
+                    out.writeLong(cacheSizeInBytes);
+                }
             }
         }
     }

+ 1 - 1
x-pack/plugin/enrich/qa/rest/build.gradle

@@ -8,7 +8,7 @@ import org.elasticsearch.gradle.internal.info.BuildParams
 
 restResources {
   restApi {
-    include '_common', 'bulk', 'indices', 'index', 'ingest.delete_pipeline', 'ingest.put_pipeline', 'enrich', 'get'
+    include '_common', 'bulk', 'indices', 'index', 'ingest.delete_pipeline', 'ingest.put_pipeline', 'enrich', 'get', 'capabilities'
   }
   restTests {
     includeXpack 'enrich'

+ 20 - 12
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

@@ -50,10 +50,11 @@ import java.util.function.LongSupplier;
  */
 public final class EnrichCache {
 
-    private final Cache<CacheKey, List<Map<?, ?>>> cache;
+    private final Cache<CacheKey, CacheValue> cache;
     private final LongSupplier relativeNanoTimeProvider;
     private final AtomicLong hitsTimeInNanos = new AtomicLong(0);
     private final AtomicLong missesTimeInNanos = new AtomicLong(0);
+    private final AtomicLong sizeInBytes = new AtomicLong(0);
     private volatile Metadata metadata;
 
     EnrichCache(long maxSize) {
@@ -63,7 +64,9 @@ public final class EnrichCache {
     // non-private for unit testing only
     EnrichCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
         this.relativeNanoTimeProvider = relativeNanoTimeProvider;
-        this.cache = CacheBuilder.<CacheKey, List<Map<?, ?>>>builder().setMaximumWeight(maxSize).build();
+        this.cache = CacheBuilder.<CacheKey, CacheValue>builder().setMaximumWeight(maxSize).removalListener(notification -> {
+            sizeInBytes.getAndAdd(-1 * notification.getValue().sizeInBytes);
+        }).build();
     }
 
     /**
@@ -86,12 +89,11 @@ public final class EnrichCache {
             hitsTimeInNanos.addAndGet(cacheRequestTime);
             listener.onResponse(response);
         } else {
-
             final long retrieveStart = relativeNanoTimeProvider.getAsLong();
             searchResponseFetcher.accept(searchRequest, ActionListener.wrap(resp -> {
-                List<Map<?, ?>> value = toCacheValue(resp);
+                CacheValue value = toCacheValue(resp);
                 put(searchRequest, value);
-                List<Map<?, ?>> copy = deepCopy(value, false);
+                List<Map<?, ?>> copy = deepCopy(value.hits, false);
                 long databaseQueryAndCachePutTime = relativeNanoTimeProvider.getAsLong() - retrieveStart;
                 missesTimeInNanos.addAndGet(cacheRequestTime + databaseQueryAndCachePutTime);
                 listener.onResponse(copy);
@@ -104,20 +106,21 @@ public final class EnrichCache {
         String enrichIndex = getEnrichIndexKey(searchRequest);
         CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
 
-        List<Map<?, ?>> response = cache.get(cacheKey);
+        CacheValue response = cache.get(cacheKey);
         if (response != null) {
-            return deepCopy(response, false);
+            return deepCopy(response.hits, false);
         } else {
             return null;
         }
     }
 
     // non-private for unit testing only
-    void put(SearchRequest searchRequest, List<Map<?, ?>> response) {
+    void put(SearchRequest searchRequest, CacheValue cacheValue) {
         String enrichIndex = getEnrichIndexKey(searchRequest);
         CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
 
-        cache.put(cacheKey, response);
+        cache.put(cacheKey, cacheValue);
+        sizeInBytes.addAndGet(cacheValue.sizeInBytes);
     }
 
     void setMetadata(Metadata metadata) {
@@ -133,7 +136,8 @@ public final class EnrichCache {
             cacheStats.getMisses(),
             cacheStats.getEvictions(),
             TimeValue.nsecToMSec(hitsTimeInNanos.get()),
-            TimeValue.nsecToMSec(missesTimeInNanos.get())
+            TimeValue.nsecToMSec(missesTimeInNanos.get()),
+            sizeInBytes.get()
         );
     }
 
@@ -146,12 +150,14 @@ public final class EnrichCache {
         return ia.getIndices().get(0).getName();
     }
 
-    static List<Map<?, ?>> toCacheValue(SearchResponse response) {
+    static CacheValue toCacheValue(SearchResponse response) {
         List<Map<?, ?>> result = new ArrayList<>(response.getHits().getHits().length);
+        long size = 0;
         for (SearchHit hit : response.getHits()) {
             result.add(deepCopy(hit.getSourceAsMap(), true));
+            size += hit.getSourceRef() != null ? hit.getSourceRef().ramBytesUsed() : 0;
         }
-        return Collections.unmodifiableList(result);
+        return new CacheValue(Collections.unmodifiableList(result), size);
     }
 
     @SuppressWarnings("unchecked")
@@ -205,4 +211,6 @@ public final class EnrichCache {
         }
     }
 
+    // Visibility for testing
+    record CacheValue(List<Map<?, ?>> hits, Long sizeInBytes) {}
 }

+ 8 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestEnrichStatsAction.java

@@ -16,12 +16,15 @@ import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
 
 import java.util.List;
+import java.util.Set;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
 
 @ServerlessScope(Scope.INTERNAL)
 public class RestEnrichStatsAction extends BaseRestHandler {
 
+    private static final Set<String> SUPPORTED_CAPABILITIES = Set.of("size-in-bytes");
+
     @Override
     public List<Route> routes() {
         return List.of(new Route(GET, "/_enrich/_stats"));
@@ -32,6 +35,11 @@ public class RestEnrichStatsAction extends BaseRestHandler {
         return "enrich_stats";
     }
 
+    @Override
+    public Set<String> supportedCapabilities() {
+        return SUPPORTED_CAPABILITIES;
+    }
+
     @Override
     protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
         final var request = new EnrichStatsAction.Request(RestUtils.getMasterNodeTimeout(restRequest));

+ 6 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

@@ -79,7 +79,7 @@ public class EnrichCacheTests extends ESTestCase {
             new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "2"))
         );
         // Emulated search response (content doesn't matter, since it isn't used, it just a cache entry)
-        List<Map<?, ?>> searchResponse = List.of(Map.of("test", "entry"));
+        EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L);
 
         EnrichCache enrichCache = new EnrichCache(3);
         enrichCache.setMetadata(metadata);
@@ -91,6 +91,7 @@ public class EnrichCacheTests extends ESTestCase {
         assertThat(cacheStats.hits(), equalTo(0L));
         assertThat(cacheStats.misses(), equalTo(0L));
         assertThat(cacheStats.evictions(), equalTo(0L));
+        assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
 
         assertThat(enrichCache.get(searchRequest1), notNullValue());
         assertThat(enrichCache.get(searchRequest2), notNullValue());
@@ -101,6 +102,7 @@ public class EnrichCacheTests extends ESTestCase {
         assertThat(cacheStats.hits(), equalTo(3L));
         assertThat(cacheStats.misses(), equalTo(1L));
         assertThat(cacheStats.evictions(), equalTo(0L));
+        assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
 
         enrichCache.put(searchRequest4, searchResponse);
         cacheStats = enrichCache.getStats("_id");
@@ -108,6 +110,7 @@ public class EnrichCacheTests extends ESTestCase {
         assertThat(cacheStats.hits(), equalTo(3L));
         assertThat(cacheStats.misses(), equalTo(1L));
         assertThat(cacheStats.evictions(), equalTo(1L));
+        assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
 
         // Simulate enrich policy execution, which should make current cache entries unused.
         metadata = Metadata.builder()
@@ -149,6 +152,7 @@ public class EnrichCacheTests extends ESTestCase {
         assertThat(cacheStats.hits(), equalTo(6L));
         assertThat(cacheStats.misses(), equalTo(6L));
         assertThat(cacheStats.evictions(), equalTo(4L));
+        assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
     }
 
     public void testComputeIfAbsent() throws InterruptedException {
@@ -331,7 +335,7 @@ public class EnrichCacheTests extends ESTestCase {
             new SearchSourceBuilder().query(new MatchQueryBuilder("test", "query"))
         );
         // Emulated search response (content doesn't matter, since it isn't used, it just a cache entry)
-        List<Map<?, ?>> searchResponse = List.of(Map.of("test", "entry"));
+        EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L);
 
         EnrichCache enrichCache = new EnrichCache(1);
         enrichCache.setMetadata(metadata);

+ 1 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java

@@ -51,6 +51,7 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
                     randomNonNegativeLong(),
                     randomNonNegativeLong(),
                     randomNonNegativeLong(),
+                    randomNonNegativeLong(),
                     randomNonNegativeLong()
                 )
             );

+ 1 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java

@@ -93,6 +93,7 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
                     randomNonNegativeLong(),
                     randomNonNegativeLong(),
                     randomNonNegativeLong(),
+                    randomNonNegativeLong(),
                     randomNonNegativeLong()
                 )
             );

+ 58 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/enrich/20_standard_index.yml

@@ -187,3 +187,61 @@ enrich documents over _bulk via an alias:
   - do:
       enrich.delete_policy:
         name: test_alias_policy
+
+---
+enrich stats REST response structure:
+  - requires:
+      test_runner_features: [capabilities]
+      capabilities:
+        - method: GET
+          path: /_enrich/stats
+          capabilities:
+            - size-in-bytes
+      reason: "Capability required to run test"
+
+  - do:
+      ingest.simulate:
+        id: test_pipeline
+        body: >
+          {
+            "docs": [
+              {
+                "_index": "enrich-cache-stats-index",
+                "_id": "1",
+                "_source": {"baz": "quick", "c": 1}
+              },
+              {
+                "_index": "enrich-cache-stats-index",
+                "_id": "2",
+                "_source": {"baz": "lazy", "c": 2}
+              },
+              {
+                "_index": "enrich-cache-stats-index",
+                "_id": "3",
+                "_source": {"baz": "slow", "c": 3}
+              }
+            ]
+          }
+  - length: { docs: 3 }
+
+  # This test's main purpose is to verify the REST response structure.
+  # So, rather than assessing specific values, we only assess the existence of fields.
+  - do:
+      enrich.stats: {}
+  - exists: executing_policies
+  - is_true: coordinator_stats
+  # We know there will be at least one node, but we don't want to be dependent on the exact number of nodes.
+  - is_true: coordinator_stats.0.node_id
+  - exists: coordinator_stats.0.queue_size
+  - exists: coordinator_stats.0.remote_requests_current
+  - exists: coordinator_stats.0.remote_requests_total
+  - exists: coordinator_stats.0.executed_searches_total
+  - is_true: cache_stats
+  - is_true: cache_stats.0.node_id
+  - exists: cache_stats.0.count
+  - exists: cache_stats.0.hits
+  - exists: cache_stats.0.misses
+  - exists: cache_stats.0.evictions
+  - exists: cache_stats.0.hits_time_in_millis
+  - exists: cache_stats.0.misses_time_in_millis
+  - exists: cache_stats.0.size_in_bytes