瀏覽代碼

Add enrich node cache (#76800)

Introduce a LRU cache to avoid searches that occur frequently
from the enrich processor.

Relates to #48988
Martijn van Groningen 4 年之前
父節點
當前提交
1ae4f3c937
共有 16 個文件被更改,包括 703 次插入36 次删除
  1. 82 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/StatsResponse.java
  2. 19 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java
  3. 43 0
      docs/reference/ingest/apis/enrich/enrich-stats.asciidoc
  4. 98 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java
  5. 5 1
      x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java
  6. 106 0
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java
  7. 9 3
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java
  8. 20 8
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java
  9. 17 2
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java
  10. 8 1
      x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java
  11. 160 0
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java
  12. 113 7
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java
  13. 1 0
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java
  14. 8 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java
  15. 13 2
      x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java
  16. 1 1
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java

+ 82 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/StatsResponse.java

@@ -17,19 +17,21 @@ import java.util.Objects;
 
 public final class StatsResponse {
 
-    private static ParseField EXECUTING_POLICIES_FIELD = new ParseField("executing_policies");
-    private static ParseField COORDINATOR_STATS_FIELD = new ParseField("coordinator_stats");
+    private static final ParseField EXECUTING_POLICIES_FIELD = new ParseField("executing_policies");
+    private static final ParseField COORDINATOR_STATS_FIELD = new ParseField("coordinator_stats");
+    private static final ParseField CACHE_STATS_FIELD = new ParseField("cache_stats");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<StatsResponse, Void> PARSER = new ConstructingObjectParser<>(
         "stats_response",
         true,
-        args -> new StatsResponse((List<ExecutingPolicy>) args[0], (List<CoordinatorStats>) args[1])
+        args -> new StatsResponse((List<ExecutingPolicy>) args[0], (List<CoordinatorStats>) args[1], (List<CacheStats>) args[2])
     );
 
     static {
         PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), ExecutingPolicy.PARSER::apply, EXECUTING_POLICIES_FIELD);
         PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), CoordinatorStats.PARSER::apply, COORDINATOR_STATS_FIELD);
+        PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), CacheStats.PARSER::apply, CACHE_STATS_FIELD);
     }
 
     public static StatsResponse fromXContent(XContentParser parser) {
@@ -38,10 +40,12 @@ public final class StatsResponse {
 
     private final List<ExecutingPolicy> executingPolicies;
     private final List<CoordinatorStats> coordinatorStats;
+    private final List<CacheStats> cacheStats;
 
-    public StatsResponse(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
+    public StatsResponse(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats, List<CacheStats> cacheStats) {
         this.executingPolicies = executingPolicies;
         this.coordinatorStats = coordinatorStats;
+        this.cacheStats = cacheStats;
     }
 
     public List<ExecutingPolicy> getExecutingPolicies() {
@@ -52,6 +56,10 @@ public final class StatsResponse {
         return coordinatorStats;
     }
 
+    public List<CacheStats> getCacheStats() {
+        return cacheStats;
+    }
+
     public static final class CoordinatorStats {
 
         static ParseField NODE_ID_FIELD = new ParseField("node_id");
@@ -177,4 +185,74 @@ public final class StatsResponse {
         }
     }
 
+    public static final class CacheStats {
+
+        static ParseField NODE_ID_FIELD = new ParseField("node_id");
+        static ParseField COUNT_FIELD = new ParseField("count");
+        static ParseField HITS_FIELD = new ParseField("hits");
+        static ParseField MISSES_FIELD = new ParseField("misses");
+        static ParseField EVICTIONS_FIELD = new ParseField("evictions");
+
+        private static final ConstructingObjectParser<CacheStats, Void> PARSER = new ConstructingObjectParser<>(
+            "coordinator_stats_item",
+            true,
+            args -> new CacheStats((String) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4])
+        );
+
+        static {
+            PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
+            PARSER.declareLong(ConstructingObjectParser.constructorArg(), COUNT_FIELD);
+            PARSER.declareLong(ConstructingObjectParser.constructorArg(), HITS_FIELD);
+            PARSER.declareLong(ConstructingObjectParser.constructorArg(), MISSES_FIELD);
+            PARSER.declareLong(ConstructingObjectParser.constructorArg(), EVICTIONS_FIELD);
+        }
+
+        private final String nodeId;
+        private final long count;
+        private final long hits;
+        private final long misses;
+        private final long evictions;
+
+        public CacheStats(String nodeId, long count, long hits, long misses, long evictions) {
+            this.nodeId = nodeId;
+            this.count = count;
+            this.hits = hits;
+            this.misses = misses;
+            this.evictions = evictions;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public long getCount() {
+            return count;
+        }
+
+        public long getHits() {
+            return hits;
+        }
+
+        public long getMisses() {
+            return misses;
+        }
+
+        public long getEvictions() {
+            return evictions;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            CacheStats that = (CacheStats) o;
+            return count == that.count && hits == that.hits && misses == that.misses &&
+                evictions == that.evictions && nodeId.equals(that.nodeId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(nodeId, count, hits, misses, evictions);
+        }
+    }
 }

+ 19 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java

@@ -34,13 +34,19 @@ public class StatsResponseTests extends AbstractResponseTestCase<EnrichStatsActi
         }
         int numCoordinatingStats = randomIntBetween(0, 16);
         List<EnrichStatsAction.Response.CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
+        List<EnrichStatsAction.Response.CacheStats> cacheStats = new ArrayList<>(numCoordinatingStats);
         for (int i = 0; i < numCoordinatingStats; i++) {
+            String nodeId = randomAlphaOfLength(4);
             EnrichStatsAction.Response.CoordinatorStats stats = new EnrichStatsAction.Response.CoordinatorStats(
-                randomAlphaOfLength(4), randomIntBetween(0, 8096), randomIntBetween(0, 8096), randomNonNegativeLong(),
+                nodeId, randomIntBetween(0, 8096), randomIntBetween(0, 8096), randomNonNegativeLong(),
                 randomNonNegativeLong());
             coordinatorStats.add(stats);
+            cacheStats.add(
+                new EnrichStatsAction.Response.CacheStats(nodeId, randomNonNegativeLong(), randomNonNegativeLong(),
+                    randomNonNegativeLong(), randomNonNegativeLong())
+            );
         }
-        return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
+        return new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats);
     }
 
     @Override
@@ -68,6 +74,17 @@ public class StatsResponseTests extends AbstractResponseTestCase<EnrichStatsActi
             assertThat(actual.getRemoteRequestsTotal(), equalTo(expected.getRemoteRequestsTotal()));
             assertThat(actual.getExecutedSearchesTotal(), equalTo(expected.getExecutedSearchesTotal()));
         }
+
+        assertThat(clientInstance.getCacheStats().size(), equalTo(serverTestInstance.getCacheStats().size()));
+        for (int i = 0; i < clientInstance.getCacheStats().size(); i++) {
+            StatsResponse.CacheStats actual = clientInstance.getCacheStats().get(i);
+            EnrichStatsAction.Response.CacheStats expected = serverTestInstance.getCacheStats().get(i);
+            assertThat(actual.getNodeId(), equalTo(expected.getNodeId()));
+            assertThat(actual.getCount(), equalTo(expected.getCount()));
+            assertThat(actual.getHits(), equalTo(expected.getHits()));
+            assertThat(actual.getMisses(), equalTo(expected.getMisses()));
+            assertThat(actual.getEvictions(), equalTo(expected.getEvictions()));
+        }
     }
 
     private static TaskInfo randomTaskInfo() {

+ 43 - 0
docs/reference/ingest/apis/enrich/enrich-stats.asciidoc

@@ -85,6 +85,36 @@ that enrich processors have executed
 since node startup.
 --
 
+`cache_stats`::
++
+--
+(Array of objects)
+Objects containing information about the enrich
+cache stats on each ingest node.
+
+Returned parameters include:
+
+`node_id`::
+(String)
+ID of the ingest node with a enrich cache.
+
+`count`::
+(Integer)
+Number of cached entries.
+
+`hits`::
+(Integer)
+The number of enrich lookups served from cache.
+
+`missed`::
+(Integer)
+The number of time enrich lookups couldn't be
+served from cache.
+
+`evictions`::
+(Integer)
+The number cache entries evicted from the cache.
+--
 
 [[enrich-stats-api-example]]
 ==== {api-examples-title}
@@ -126,6 +156,15 @@ The API returns the following response:
       "remote_requests_total": 0,
       "executed_searches_total": 0
     }
+  ],
+  "cache_stats": [
+    {
+      "node_id": "1sFM8cmSROZYhPxVsiWew",
+      "count": 0,
+      "hits": 0,
+      "misses": 0,
+      "evictions": 0
+    }
   ]
 }
 ----
@@ -133,3 +172,7 @@ The API returns the following response:
 // TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.coordinator_stats.0.node_id/]
 // TESTRESPONSE[s/"remote_requests_total": 0/"remote_requests_total" : $body.coordinator_stats.0.remote_requests_total/]
 // TESTRESPONSE[s/"executed_searches_total": 0/"executed_searches_total" : $body.coordinator_stats.0.executed_searches_total/]
+// TESTRESPONSE[s/"node_id": "1sFM8cmSROZYhPxVsiWew"/"node_id" : $body.cache_stats.0.node_id/]
+// TESTRESPONSE[s/"count": 0/"count" : $body.cache_stats.0.count/]
+// TESTRESPONSE[s/"misses": 0/"misses" : $body.cache_stats.0.misses/]
+// TESTRESPONSE[s/"evictions": 0/"evictions" : $body.cache_stats.0.evictions/]

+ 98 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/EnrichStatsAction.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.enrich.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
@@ -50,16 +51,19 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
 
         private final List<ExecutingPolicy> executingPolicies;
         private final List<CoordinatorStats> coordinatorStats;
+        private final List<CacheStats> cacheStats;
 
-        public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats) {
+        public Response(List<ExecutingPolicy> executingPolicies, List<CoordinatorStats> coordinatorStats, List<CacheStats> cacheStats) {
             this.executingPolicies = executingPolicies;
             this.coordinatorStats = coordinatorStats;
+            this.cacheStats = cacheStats;
         }
 
         public Response(StreamInput in) throws IOException {
             super(in);
             executingPolicies = in.readList(ExecutingPolicy::new);
             coordinatorStats = in.readList(CoordinatorStats::new);
+            cacheStats = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readList(CacheStats::new) : null;
         }
 
         public List<ExecutingPolicy> getExecutingPolicies() {
@@ -70,10 +74,17 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
             return coordinatorStats;
         }
 
+        public List<CacheStats> getCacheStats() {
+            return cacheStats;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeList(executingPolicies);
             out.writeList(coordinatorStats);
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                out.writeList(cacheStats);
+            }
         }
 
         @Override
@@ -93,6 +104,15 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
                 builder.endObject();
             }
             builder.endArray();
+            if (cacheStats != null) {
+                builder.startArray("cache_stats");
+                for (CacheStats cacheStat : cacheStats) {
+                    builder.startObject();
+                    cacheStat.toXContent(builder, params);
+                    builder.endObject();
+                }
+                builder.endArray();
+            }
             builder.endObject();
             return builder;
         }
@@ -103,12 +123,13 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
             if (o == null || getClass() != o.getClass()) return false;
             Response response = (Response) o;
             return executingPolicies.equals(response.executingPolicies) &&
-                coordinatorStats.equals(response.coordinatorStats);
+                coordinatorStats.equals(response.coordinatorStats) &&
+                Objects.equals(cacheStats, response.cacheStats);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(executingPolicies, coordinatorStats);
+            return Objects.hash(executingPolicies, coordinatorStats, cacheStats);
         }
 
         public static class CoordinatorStats implements Writeable, ToXContentFragment {
@@ -245,6 +266,80 @@ public class EnrichStatsAction extends ActionType<EnrichStatsAction.Response> {
                 return Objects.hash(name, taskInfo);
             }
         }
+
+        public static class CacheStats implements Writeable, ToXContentFragment {
+
+            private final String nodeId;
+            private final long count;
+            private final long hits;
+            private final long misses;
+            private final long evictions;
+
+            public CacheStats(String nodeId, long count, long hits, long misses, long evictions) {
+                this.nodeId = nodeId;
+                this.count = count;
+                this.hits = hits;
+                this.misses = misses;
+                this.evictions = evictions;
+            }
+
+            public CacheStats(StreamInput in) throws IOException {
+                this(in.readString(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
+            }
+
+            public String getNodeId() {
+                return nodeId;
+            }
+
+            public long getCount() {
+                return count;
+            }
+
+            public long getHits() {
+                return hits;
+            }
+
+            public long getMisses() {
+                return misses;
+            }
+
+            public long getEvictions() {
+                return evictions;
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                builder.field("node_id", nodeId);
+                builder.field("count", count);
+                builder.field("hits", hits);
+                builder.field("misses", misses);
+                builder.field("evictions", evictions);
+                return builder;
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                out.writeString(nodeId);
+                out.writeVLong(count);
+                out.writeVLong(hits);
+                out.writeVLong(misses);
+                out.writeVLong(evictions);
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) return true;
+                if (o == null || getClass() != o.getClass()) return false;
+                CacheStats that = (CacheStats) o;
+                return count == that.count && hits == that.hits && misses == that.misses && evictions == that.evictions &&
+                    nodeId.equals(that.nodeId);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(nodeId, count, hits, misses, evictions);
+            }
+        }
     }
 
 }

+ 5 - 1
x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java

@@ -51,9 +51,11 @@ import static org.elasticsearch.test.NodeRoles.masterOnlyNode;
 import static org.elasticsearch.test.NodeRoles.nonIngestNode;
 import static org.elasticsearch.test.NodeRoles.nonMasterNode;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -243,7 +245,9 @@ public class EnrichMultiNodeIT extends ESIntegTestCase {
         CoordinatorStats stats = statsResponse.getCoordinatorStats().stream().filter(s -> s.getNodeId().equals(nodeId)).findAny().get();
         assertThat(stats.getNodeId(), equalTo(nodeId));
         assertThat(stats.getRemoteRequestsTotal(), greaterThanOrEqualTo(1L));
-        assertThat(stats.getExecutedSearchesTotal(), equalTo((long) numDocs));
+        // 'numDocs' lookups are done, but not 'numDocs' searches, because searches may get cached:
+        // and not all enrichments may happen via the same node.
+        assertThat(stats.getExecutedSearchesTotal(), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo((long) numDocs)));
     }
 
     private static List<String> createSourceIndex(int numDocs) {

+ 106 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

@@ -0,0 +1,106 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.enrich;
+
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.cluster.metadata.IndexAbstraction;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.common.cache.Cache;
+import org.elasticsearch.common.cache.CacheBuilder;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+
+import java.util.Objects;
+
+/**
+ * A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
+ * multiple enrich processors with different policies will use this cache.
+ *
+ * The key of the cache is based on the search request and the enrich index that will be used.
+ * Search requests that enrich generates target the alias for an enrich policy, this class
+ * resolves the alias to the actual enrich index and uses that for the cache key. This way
+ * no stale entries will be returned if a policy execution happens and a new enrich index is created.
+ *
+ * There is no cleanup mechanism of stale entries in case a new enrich index is created
+ * as part of a policy execution. This shouldn't be needed as cache entries for prior enrich
+ * indices will be eventually evicted, because these entries will not end up being used. The
+ * latest enrich index name will be used as cache key after an enrich policy execution.
+ * (Also a cleanup mechanism also wouldn't be straightforward to implement,
+ * since there is no easy check to see that an enrich index used as cache key no longer is the
+ * current enrich index the enrich alias of an policy refers to. It would require checking
+ * all cached entries on each cluster state update)
+ */
+public final class EnrichCache {
+
+    private final Cache<CacheKey, SearchResponse> cache;
+    private volatile Metadata metadata;
+
+    EnrichCache(long maxSize) {
+        this.cache = CacheBuilder.<CacheKey, SearchResponse>builder().setMaximumWeight(maxSize).build();
+    }
+
+    SearchResponse get(SearchRequest searchRequest) {
+        String enrichIndex = getEnrichIndexKey(searchRequest);
+        CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
+
+        return cache.get(cacheKey);
+    }
+
+    void put(SearchRequest searchRequest, SearchResponse searchResponse) {
+        String enrichIndex = getEnrichIndexKey(searchRequest);
+        CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
+
+        cache.put(cacheKey, searchResponse);
+    }
+
+    void setMetadata(Metadata metadata) {
+        this.metadata = metadata;
+    }
+
+    public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) {
+        Cache.CacheStats cacheStats = cache.stats();
+        return new EnrichStatsAction.Response.CacheStats(
+            localNodeId,
+            cache.count(),
+            cacheStats.getHits(),
+            cacheStats.getMisses(),
+            cacheStats.getEvictions()
+        );
+    }
+
+    private String getEnrichIndexKey(SearchRequest searchRequest) {
+        String alias = searchRequest.indices()[0];
+        IndexAbstraction ia = metadata.getIndicesLookup().get(alias);
+        return ia.getIndices().get(0).getIndex().getName();
+    }
+
+    private static class CacheKey {
+
+        final String enrichIndex;
+        final SearchRequest searchRequest;
+
+        private CacheKey(String enrichIndex, SearchRequest searchRequest) {
+            this.enrichIndex = enrichIndex;
+            this.searchRequest = searchRequest;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            CacheKey cacheKey = (CacheKey) o;
+            return enrichIndex.equals(cacheKey.enrichIndex) && searchRequest.equals(cacheKey.searchRequest);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(enrichIndex, searchRequest);
+        }
+    }
+
+}

+ 9 - 3
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

@@ -127,15 +127,19 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
         return String.valueOf(maxConcurrentRequests * maxLookupsPerRequest);
     }, val -> Setting.parseInt(val, 1, Integer.MAX_VALUE, QUEUE_CAPACITY_SETTING_NAME), Setting.Property.NodeScope);
 
+    public static final Setting<Long> CACHE_SIZE = Setting.longSetting("enrich.cache_size", 1000, 0, Setting.Property.NodeScope);
+
     private final Settings settings;
+    private final EnrichCache enrichCache;
 
     public EnrichPlugin(final Settings settings) {
         this.settings = settings;
+        this.enrichCache = new EnrichCache(CACHE_SIZE.get(settings));
     }
 
     @Override
     public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
-        EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client, parameters.scriptService);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client, parameters.scriptService, enrichCache);
         parameters.ingestService.addIngestClusterStateListener(factory);
         return Map.of(EnrichProcessorFactory.TYPE, factory);
     }
@@ -215,7 +219,8 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
             enrichPolicyLocks,
             new EnrichCoordinatorProxyAction.Coordinator(client, settings),
             enrichPolicyMaintenanceService,
-            enrichPolicyExecutor
+            enrichPolicyExecutor,
+            enrichCache
         );
     }
 
@@ -246,7 +251,8 @@ public class EnrichPlugin extends Plugin implements SystemIndexPlugin, IngestPlu
             COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS,
             COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,
             COORDINATOR_PROXY_QUEUE_CAPACITY,
-            ENRICH_MAX_FORCE_MERGE_ATTEMPTS
+            ENRICH_MAX_FORCE_MERGE_ATTEMPTS,
+            CACHE_SIZE
         );
     }
 

+ 20 - 8
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

@@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -36,12 +37,14 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
     static final String TYPE = "enrich";
     private final Client client;
     private final ScriptService scriptService;
+    private final EnrichCache enrichCache;
 
     volatile Metadata metadata;
 
-    EnrichProcessorFactory(Client client, ScriptService scriptService) {
+    EnrichProcessorFactory(Client client, ScriptService scriptService, EnrichCache enrichCache) {
         this.client = client;
         this.scriptService = scriptService;
+        this.enrichCache = Objects.requireNonNull(enrichCache);
     }
 
     @Override
@@ -75,7 +78,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
         if (maxMatches < 1 || maxMatches > 128) {
             throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128");
         }
-        BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner = createSearchRunner(client);
+        BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner = createSearchRunner(client, enrichCache);
         switch (policyType) {
             case EnrichPolicy.MATCH_TYPE:
                 return new MatchProcessor(
@@ -117,16 +120,25 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
     @Override
     public void accept(ClusterState state) {
         metadata = state.getMetadata();
+        enrichCache.setMetadata(metadata);
     }
 
-    private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
+    private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(
+        Client client,
+        EnrichCache enrichCache
+    ) {
         Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
         return (req, handler) -> {
-            originClient.execute(
-                EnrichCoordinatorProxyAction.INSTANCE,
-                req,
-                ActionListener.wrap(resp -> { handler.accept(resp, null); }, e -> { handler.accept(null, e); })
-            );
+            // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
+            SearchResponse response = enrichCache.get(req);
+            if (response != null) {
+                handler.accept(response, null);
+            } else {
+                originClient.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(resp -> {
+                    enrichCache.put(req, resp);
+                    handler.accept(resp, null);
+                }, e -> { handler.accept(null, e); }));
+            }
         };
     }
 }

+ 17 - 2
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorStatsAction.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.enrich.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
@@ -26,6 +27,7 @@ import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
+import org.elasticsearch.xpack.enrich.EnrichCache;
 
 import java.io.IOException;
 import java.util.List;
@@ -88,15 +90,18 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
 
     public static class NodeResponse extends BaseNodeResponse {
 
+        private final EnrichStatsAction.Response.CacheStats cacheStats;
         private final CoordinatorStats coordinatorStats;
 
-        NodeResponse(DiscoveryNode node, CoordinatorStats coordinatorStats) {
+        NodeResponse(DiscoveryNode node, EnrichStatsAction.Response.CacheStats cacheStats, CoordinatorStats coordinatorStats) {
             super(node);
+            this.cacheStats = cacheStats;
             this.coordinatorStats = coordinatorStats;
         }
 
         NodeResponse(StreamInput in) throws IOException {
             super(in);
+            this.cacheStats = in.getVersion().onOrAfter(Version.V_8_0_0) ? new EnrichStatsAction.Response.CacheStats(in) : null;
             this.coordinatorStats = new CoordinatorStats(in);
         }
 
@@ -104,15 +109,23 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
             return coordinatorStats;
         }
 
+        public EnrichStatsAction.Response.CacheStats getCacheStats() {
+            return cacheStats;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                cacheStats.writeTo(out);
+            }
             coordinatorStats.writeTo(out);
         }
     }
 
     public static class TransportAction extends TransportNodesAction<Request, Response, NodeRequest, NodeResponse> {
 
+        private final EnrichCache enrichCache;
         private final EnrichCoordinatorProxyAction.Coordinator coordinator;
 
         @Inject
@@ -121,6 +134,7 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
             ClusterService clusterService,
             TransportService transportService,
             ActionFilters actionFilters,
+            EnrichCache enrichCache,
             EnrichCoordinatorProxyAction.Coordinator coordinator
         ) {
             super(
@@ -134,6 +148,7 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
                 ThreadPool.Names.SAME,
                 NodeResponse.class
             );
+            this.enrichCache = enrichCache;
             this.coordinator = coordinator;
         }
 
@@ -161,7 +176,7 @@ public class EnrichCoordinatorStatsAction extends ActionType<EnrichCoordinatorSt
         @Override
         protected NodeResponse nodeOperation(NodeRequest request, Task task) {
             DiscoveryNode node = clusterService.localNode();
-            return new NodeResponse(node, coordinator.getStats(node.getId()));
+            return new NodeResponse(node, enrichCache.getStats(node.getId()), coordinator.getStats(node.getId()));
         }
     }
 

+ 8 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportEnrichStatsAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.xpack.enrich.EnrichPolicyExecutor;
 
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class TransportEnrichStatsAction extends TransportMasterNodeAction<EnrichStatsAction.Request, EnrichStatsAction.Response> {
@@ -92,7 +93,13 @@ public class TransportEnrichStatsAction extends TransportMasterNodeAction<Enrich
                 .map(t -> new ExecutingPolicy(t.getDescription(), t))
                 .sorted(Comparator.comparing(ExecutingPolicy::getName))
                 .collect(Collectors.toList());
-            listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats));
+            List<EnrichStatsAction.Response.CacheStats> cacheStats = response.getNodes()
+                .stream()
+                .map(EnrichCoordinatorStatsAction.NodeResponse::getCacheStats)
+                .filter(Objects::nonNull)
+                .sorted(Comparator.comparing(EnrichStatsAction.Response.CacheStats::getNodeId))
+                .collect(Collectors.toList());
+            listener.onResponse(new EnrichStatsAction.Response(policyExecutionTasks, coordinatorStats, cacheStats));
         }, listener::onFailure);
         client.execute(EnrichCoordinatorStatsAction.INSTANCE, statsRequest, statsListener);
     }

+ 160 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

@@ -0,0 +1,160 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.enrich;
+
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.index.query.MatchQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.search.profile.SearchProfileShardResults;
+import org.elasticsearch.search.suggest.Suggest;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class EnrichCacheTests extends ESTestCase {
+
+    public void testCaching() {
+        // Emulate cluster metadata:
+        // (two enrich indices with corresponding alias entries)
+        var metadata = Metadata.builder()
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-1")
+                    .settings(settings(Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build())
+            )
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-1")
+                    .settings(settings(Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build())
+            )
+            .build();
+
+        // Emulated search requests that an enrich processor could generate:
+        // (two unique searches for two enrich policies)
+        var searchRequest1 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source(
+            new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1"))
+        );
+        var searchRequest2 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source(
+            new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "2"))
+        );
+        var searchRequest3 = new SearchRequest(EnrichPolicy.getBaseName("policy2")).source(
+            new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1"))
+        );
+        var searchRequest4 = new SearchRequest(EnrichPolicy.getBaseName("policy2")).source(
+            new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "2"))
+        );
+        // Emulated search response (content doesn't matter, since it isn't used, it just a cache entry)
+        var searchResponse = new SearchResponse(
+            new InternalSearchResponse(
+                new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
+                InternalAggregations.EMPTY,
+                new Suggest(Collections.emptyList()),
+                new SearchProfileShardResults(Collections.emptyMap()),
+                false,
+                false,
+                1
+            ),
+            "",
+            1,
+            1,
+            0,
+            0,
+            ShardSearchFailure.EMPTY_ARRAY,
+            SearchResponse.Clusters.EMPTY
+        );
+
+        var enrichCache = new EnrichCache(3);
+        enrichCache.setMetadata(metadata);
+        enrichCache.put(searchRequest1, searchResponse);
+        enrichCache.put(searchRequest2, searchResponse);
+        enrichCache.put(searchRequest3, searchResponse);
+        var cacheStats = enrichCache.getStats("_id");
+        assertThat(cacheStats.getCount(), equalTo(3L));
+        assertThat(cacheStats.getHits(), equalTo(0L));
+        assertThat(cacheStats.getMisses(), equalTo(0L));
+        assertThat(cacheStats.getEvictions(), equalTo(0L));
+
+        assertThat(enrichCache.get(searchRequest1), notNullValue());
+        assertThat(enrichCache.get(searchRequest2), notNullValue());
+        assertThat(enrichCache.get(searchRequest3), notNullValue());
+        assertThat(enrichCache.get(searchRequest4), nullValue());
+        cacheStats = enrichCache.getStats("_id");
+        assertThat(cacheStats.getCount(), equalTo(3L));
+        assertThat(cacheStats.getHits(), equalTo(3L));
+        assertThat(cacheStats.getMisses(), equalTo(1L));
+        assertThat(cacheStats.getEvictions(), equalTo(0L));
+
+        enrichCache.put(searchRequest4, searchResponse);
+        cacheStats = enrichCache.getStats("_id");
+        assertThat(cacheStats.getCount(), equalTo(3L));
+        assertThat(cacheStats.getHits(), equalTo(3L));
+        assertThat(cacheStats.getMisses(), equalTo(1L));
+        assertThat(cacheStats.getEvictions(), equalTo(1L));
+
+        // Simulate enrich policy execution, which should make current cache entries unused.
+        metadata = Metadata.builder()
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-2")
+                    .settings(settings(Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build())
+            )
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-2")
+                    .settings(settings(Version.CURRENT))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build())
+            )
+            .build();
+        enrichCache.setMetadata(metadata);
+
+        // Because enrich index has changed, cache can't serve cached entries
+        assertThat(enrichCache.get(searchRequest1), nullValue());
+        assertThat(enrichCache.get(searchRequest2), nullValue());
+        assertThat(enrichCache.get(searchRequest3), nullValue());
+        assertThat(enrichCache.get(searchRequest4), nullValue());
+
+        // Add new entries using new enrich index name as key
+        enrichCache.put(searchRequest1, searchResponse);
+        enrichCache.put(searchRequest2, searchResponse);
+        enrichCache.put(searchRequest3, searchResponse);
+
+        // Entries can now be served:
+        assertThat(enrichCache.get(searchRequest1), notNullValue());
+        assertThat(enrichCache.get(searchRequest2), notNullValue());
+        assertThat(enrichCache.get(searchRequest3), notNullValue());
+        assertThat(enrichCache.get(searchRequest4), nullValue());
+        cacheStats = enrichCache.getStats("_id");
+        assertThat(cacheStats.getCount(), equalTo(3L));
+        assertThat(cacheStats.getHits(), equalTo(6L));
+        assertThat(cacheStats.getMisses(), equalTo(6L));
+        assertThat(cacheStats.getEvictions(), equalTo(4L));
+    }
+
+}

+ 113 - 7
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

@@ -6,18 +6,36 @@
  */
 package org.elasticsearch.xpack.enrich;
 
+import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.internal.InternalSearchResponse;
+import org.elasticsearch.search.profile.SearchProfileShardResults;
+import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.client.NoOpClient;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -30,11 +48,13 @@ import java.util.Map;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 
 public class EnrichProcessorFactoryTests extends ESTestCase {
 
     private ScriptService scriptService;
+    private EnrichCache enrichCache = new EnrichCache(0L);
 
     @Before
     public void initializeScriptService() {
@@ -45,7 +65,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
         EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues);
         try (Client client = new NoOpClient(this.getClass().getSimpleName() + "TestClient")) {
-            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService);
+            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
             factory.metadata = createMetadata("majestic", policy);
 
             Map<String, Object> config = new HashMap<>();
@@ -96,7 +116,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
 
     public void testPolicyDoesNotExist() {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
-        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService, enrichCache);
         factory.metadata = Metadata.builder().build();
 
         Map<String, Object> config = new HashMap<>();
@@ -125,7 +145,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
 
     public void testPolicyNameMissing() {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
-        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService, enrichCache);
 
         Map<String, Object> config = new HashMap<>();
         config.put("enrich_key", "host");
@@ -154,7 +174,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
         EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues);
         try (Client client = new NoOpClient(this.getClass().getSimpleName() + "TestClient")) {
-            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService);
+            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
             factory.metadata = createMetadata("majestic", policy);
 
             Map<String, Object> config = new HashMap<>();
@@ -175,7 +195,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
         EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues);
         try (Client client = new NoOpClient(this.getClass().getSimpleName() + "TestClient")) {
-            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService);
+            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
             factory.metadata = createMetadata("majestic", policy);
 
             Map<String, Object> config = new HashMap<>();
@@ -194,7 +214,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
     public void testNoTargetField() throws Exception {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
         EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues);
-        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService, enrichCache);
         factory.metadata = createMetadata("majestic", policy);
 
         Map<String, Object> config1 = new HashMap<>();
@@ -208,7 +228,7 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
     public void testIllegalMaxMatches() throws Exception {
         List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
         EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues);
-        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
+        EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService, enrichCache);
         factory.metadata = createMetadata("majestic", policy);
 
         Map<String, Object> config = new HashMap<>();
@@ -221,6 +241,92 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
         assertThat(e.getMessage(), equalTo("[max_matches] should be between 1 and 128"));
     }
 
+    public void testCaching() throws Exception {
+        int[] requestCounter = new int[1];
+        enrichCache = new EnrichCache(100L);
+        List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
+        EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues);
+        try (Client client = new NoOpClient(this.getClass().getSimpleName() + "testCaching") {
+
+            @Override
+            @SuppressWarnings("unchecked")
+            protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
+                ActionType<Response> action,
+                Request request,
+                ActionListener<Response> listener
+            ) {
+                assert EnrichCoordinatorProxyAction.NAME.equals(action.name());
+                var emptyResponse = new SearchResponse(
+                    new InternalSearchResponse(
+                        new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
+                        InternalAggregations.EMPTY,
+                        new Suggest(Collections.emptyList()),
+                        new SearchProfileShardResults(Collections.emptyMap()),
+                        false,
+                        false,
+                        1
+                    ),
+                    "",
+                    1,
+                    1,
+                    0,
+                    0,
+                    ShardSearchFailure.EMPTY_ARRAY,
+                    SearchResponse.Clusters.EMPTY
+                );
+                requestCounter[0]++;
+                listener.onResponse((Response) emptyResponse);
+            }
+        }) {
+            EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
+            factory.accept(ClusterState.builder(new ClusterName("_name")).metadata(createMetadata("majestic", policy)).build());
+
+            Map<String, Object> config = new HashMap<>();
+            config.put("policy_name", "majestic");
+            config.put("field", "domain");
+            config.put("target_field", "entry");
+            IngestDocument ingestDocument = new IngestDocument(
+                "_index",
+                "_id",
+                "_routing",
+                1L,
+                VersionType.INTERNAL,
+                Map.of("domain", "elastic.co")
+            );
+            MatchProcessor processor = (MatchProcessor) factory.create(Collections.emptyMap(), "_tag", null, config);
+
+            // A search is performed and that is cached:
+            IngestDocument[] result = new IngestDocument[1];
+            Exception[] failure = new Exception[1];
+            processor.execute(ingestDocument, (r, e) -> {
+                result[0] = r;
+                failure[0] = e;
+            });
+            assertThat(failure[0], nullValue());
+            assertThat(result[0], notNullValue());
+            assertThat(requestCounter[0], equalTo(1));
+            assertThat(enrichCache.getStats("_id").getCount(), equalTo(1L));
+            assertThat(enrichCache.getStats("_id").getMisses(), equalTo(1L));
+            assertThat(enrichCache.getStats("_id").getHits(), equalTo(0L));
+            assertThat(enrichCache.getStats("_id").getEvictions(), equalTo(0L));
+
+            // No search is performed, result is read from the cache:
+            result[0] = null;
+            failure[0] = null;
+            processor.execute(ingestDocument, (r, e) -> {
+                result[0] = r;
+                failure[0] = e;
+            });
+            assertThat(failure[0], nullValue());
+            assertThat(result[0], notNullValue());
+            assertThat(requestCounter[0], equalTo(1));
+            assertThat(enrichCache.getStats("_id").getCount(), equalTo(1L));
+            assertThat(enrichCache.getStats("_id").getMisses(), equalTo(1L));
+            assertThat(enrichCache.getStats("_id").getHits(), equalTo(1L));
+            assertThat(enrichCache.getStats("_id").getEvictions(), equalTo(0L));
+        }
+    }
+
     static Metadata createMetadata(String name, EnrichPolicy policy) throws IOException {
         Settings settings = Settings.builder()
             .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)

+ 1 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java

@@ -49,6 +49,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
     protected Settings nodeSettings() {
         // Severely throttle the processing throughput to reach max capacity easier
         return Settings.builder()
+            .put(EnrichPlugin.CACHE_SIZE.getKey(), 0)
             .put(EnrichPlugin.COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS.getKey(), 1)
             .put(EnrichPlugin.COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST.getKey(), 1)
             .put(EnrichPlugin.COORDINATOR_PROXY_QUEUE_CAPACITY.getKey(), 10)

+ 8 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/EnrichStatsResponseTests.java

@@ -11,6 +11,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
+import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CacheStats;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.CoordinatorStats;
 import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction.Response.ExecutingPolicy;
 
@@ -31,17 +32,22 @@ public class EnrichStatsResponseTests extends AbstractWireSerializingTestCase<En
         }
         int numCoordinatingStats = randomIntBetween(0, 16);
         List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatingStats);
+        List<CacheStats> cacheStats = new ArrayList<>(numCoordinatingStats);
         for (int i = 0; i < numCoordinatingStats; i++) {
+            String nodeId = randomAlphaOfLength(4);
             CoordinatorStats stats = new CoordinatorStats(
-                randomAlphaOfLength(4),
+                nodeId,
                 randomIntBetween(0, 8096),
                 randomIntBetween(0, 8096),
                 randomNonNegativeLong(),
                 randomNonNegativeLong()
             );
             coordinatorStats.add(stats);
+            cacheStats.add(
+                new CacheStats(nodeId, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
+            );
         }
-        return new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
+        return new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats);
     }
 
     @Override

+ 13 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/monitoring/collector/enrich/EnrichStatsCollectorTests.java

@@ -73,21 +73,32 @@ public class EnrichStatsCollectorTests extends BaseCollectorTestCase {
         }
         int numCoordinatorStats = randomIntBetween(0, 8);
         List<CoordinatorStats> coordinatorStats = new ArrayList<>(numCoordinatorStats);
+        List<EnrichStatsAction.Response.CacheStats> cacheStats = new ArrayList<>(numCoordinatorStats);
         for (int i = 0; i < numCoordinatorStats; i++) {
+            String nodeId = randomAlphaOfLength(4);
             coordinatorStats.add(
                 new CoordinatorStats(
-                    randomAlphaOfLength(4),
+                    nodeId,
                     randomIntBetween(0, Integer.MAX_VALUE),
                     randomIntBetween(0, Integer.MAX_VALUE),
                     randomNonNegativeLong(),
                     randomNonNegativeLong()
                 )
             );
+            cacheStats.add(
+                new EnrichStatsAction.Response.CacheStats(
+                    nodeId,
+                    randomNonNegativeLong(),
+                    randomNonNegativeLong(),
+                    randomNonNegativeLong(),
+                    randomNonNegativeLong()
+                )
+            );
         }
 
         @SuppressWarnings("unchecked")
         final ActionFuture<EnrichStatsAction.Response> future = (ActionFuture<EnrichStatsAction.Response>) mock(ActionFuture.class);
-        final EnrichStatsAction.Response response = new EnrichStatsAction.Response(executingPolicies, coordinatorStats);
+        final EnrichStatsAction.Response response = new EnrichStatsAction.Response(executingPolicies, coordinatorStats, cacheStats);
 
         when(client.execute(eq(EnrichStatsAction.INSTANCE), any(EnrichStatsAction.Request.class))).thenReturn(future);
         when(future.actionGet(timeout)).thenReturn(response);

+ 1 - 1
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/LocalStateMonitoring.java

@@ -139,7 +139,7 @@ public class LocalStateMonitoring extends LocalStateCompositeXPackPlugin {
 
         @Override
         protected void doExecute(Task task, EnrichStatsAction.Request request, ActionListener<EnrichStatsAction.Response> listener) {
-            listener.onResponse(new EnrichStatsAction.Response(Collections.emptyList(), Collections.emptyList()));
+            listener.onResponse(new EnrichStatsAction.Response(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
         }
     }