Browse Source

Adding a putIfAbsent() method on EnrichCache (#107499)

Keith Massey 1 year ago
parent
commit
5f5947799b

+ 29 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.enrich;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 
 /**
  * A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
@@ -52,6 +54,32 @@ public final class EnrichCache {
         this.cache = CacheBuilder.<CacheKey, List<Map<?, ?>>>builder().setMaximumWeight(maxSize).build();
     }
 
+    /**
+     * This method notifies the given listener of the value in this cache for the given searchRequest. If there is no value in the cache
+     * for the searchRequest, then the new cache value is computed using searchResponseFetcher.
+     * @param searchRequest The key for the cache request
+     * @param searchResponseFetcher The function used to compute the value to be put in the cache, if there is no value in the cache already
+     * @param listener A listener to be notified of the value in the cache
+     */
+    public void computeIfAbsent(
+        SearchRequest searchRequest,
+        BiConsumer<SearchRequest, ActionListener<SearchResponse>> searchResponseFetcher,
+        ActionListener<List<Map<?, ?>>> listener
+    ) {
+        // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
+        List<Map<?, ?>> response = get(searchRequest);
+        if (response != null) {
+            listener.onResponse(response);
+        } else {
+            searchResponseFetcher.accept(searchRequest, ActionListener.wrap(resp -> {
+                List<Map<?, ?>> value = toCacheValue(resp);
+                put(searchRequest, value);
+                listener.onResponse(deepCopy(value, false));
+            }, listener::onFailure));
+        }
+    }
+
+    // non-private for unit testing only
     List<Map<?, ?>> get(SearchRequest searchRequest) {
         String enrichIndex = getEnrichIndexKey(searchRequest);
         CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
@@ -64,6 +92,7 @@ public final class EnrichCache {
         }
     }
 
+    // non-private for unit testing only
     void put(SearchRequest searchRequest, List<Map<?, ?>> response) {
         String enrichIndex = getEnrichIndexKey(searchRequest);
         CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);

+ 9 - 10
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

@@ -131,16 +131,15 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
         Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
         return (req, handler) -> {
             // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
-            List<Map<?, ?>> response = enrichCache.get(req);
-            if (response != null) {
-                handler.accept(response, null);
-            } else {
-                originClient.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(resp -> {
-                    List<Map<?, ?>> value = EnrichCache.toCacheValue(resp);
-                    enrichCache.put(req, value);
-                    handler.accept(EnrichCache.deepCopy(value, false), null);
-                }, e -> { handler.accept(null, e); }));
-            }
+            enrichCache.computeIfAbsent(
+                req,
+                (searchRequest, searchResponseActionListener) -> originClient.execute(
+                    EnrichCoordinatorProxyAction.INSTANCE,
+                    searchRequest,
+                    searchResponseActionListener
+                ),
+                ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
+            );
         };
     }
 }

+ 118 - 0
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

@@ -6,21 +6,31 @@
  */
 package org.elasticsearch.xpack.enrich;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.query.MatchQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -138,6 +148,114 @@ public class EnrichCacheTests extends ESTestCase {
         assertThat(cacheStats.getEvictions(), equalTo(4L));
     }
 
+    public void testPutIfAbsent() throws InterruptedException {
+        // Emulate cluster metadata:
+        // (two enrich indices with corresponding alias entries)
+        var metadata = Metadata.builder()
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-1")
+                    .settings(settings(IndexVersion.current()))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build())
+            )
+            .put(
+                IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-1")
+                    .settings(settings(IndexVersion.current()))
+                    .numberOfShards(1)
+                    .numberOfReplicas(0)
+                    .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build())
+            )
+            .build();
+
+        // Emulated search requests that an enrich processor could generate:
+        // (two unique searches for two enrich policies)
+        var searchRequest1 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source(
+            new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1"))
+        );
+        final List<Map<String, ?>> searchResponseMap = List.of(
+            Map.of("key1", "value1", "key2", "value2"),
+            Map.of("key3", "value3", "key4", "value4")
+        );
+        EnrichCache enrichCache = new EnrichCache(3);
+        enrichCache.setMetadata(metadata);
+
+        {
+            CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
+            CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
+            enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> {
+                SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
+                searchResponseActionListener.onResponse(searchResponse);
+                searchResponse.decRef();
+                queriedDatabaseLatch.countDown();
+            }, new ActionListener<>() {
+                @Override
+                public void onResponse(List<Map<?, ?>> response) {
+                    assertThat(response, equalTo(searchResponseMap));
+                    notifiedOfResultLatch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    fail(e);
+                }
+            });
+            assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
+            assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
+        }
+
+        {
+            CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
+            enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> {
+                fail("Expected no call to the database because item should have been in the cache");
+            }, new ActionListener<>() {
+                @Override
+                public void onResponse(List<Map<?, ?>> maps) {
+                    notifiedOfResultLatch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    fail(e);
+                }
+            });
+            assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
+        }
+    }
+
+    private SearchResponse convertToSearchResponse(List<Map<String, ?>> searchResponseList) {
+        SearchHit[] hitArray = searchResponseList.stream().map(map -> {
+            try {
+                return SearchHit.unpooled(0, "id").sourceRef(convertMapToJson(map));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }).toArray(SearchHit[]::new);
+        SearchHits hits = SearchHits.unpooled(hitArray, null, 0);
+        return new SearchResponse(
+            hits,
+            null,
+            null,
+            false,
+            false,
+            null,
+            1,
+            null,
+            5,
+            4,
+            0,
+            randomLong(),
+            null,
+            SearchResponse.Clusters.EMPTY
+        );
+    }
+
+    private BytesReference convertMapToJson(Map<String, ?> simpleMap) throws IOException {
+        try (XContentBuilder builder = JsonXContent.contentBuilder().map(simpleMap)) {
+            return BytesReference.bytes(builder);
+        }
+    }
+
     public void testDeepCopy() {
         Map<String, Object> original = new HashMap<>();
         {