Browse Source

Fix enrich caches outdated value after policy run (#133680)

Joe Gallo 1 month ago
parent
commit
dc57b3e001

+ 5 - 0
docs/changelog/133680.yaml

@@ -0,0 +1,5 @@
+pr: 133680
+summary: Fix enrich caches outdated value after policy run
+area: Ingest Node
+type: bug
+issues: []

+ 173 - 0
x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java

@@ -0,0 +1,173 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.enrich;
+
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.ingest.common.IngestCommonPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
+import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class EnrichPolicyChangeIT extends ESSingleNodeTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> getPlugins() {
+        return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder()
+            // TODO Change this to run with security enabled
+            // https://github.com/elastic/elasticsearch/issues/75940
+            .put(XPackSettings.SECURITY_ENABLED.getKey(), false)
+            .build();
+    }
+
+    private final String policyName = "device-enrich-policy";
+    private final String sourceIndexName = "devices-idx";
+
+    public void testEnrichCacheValuesCannotBeCorrupted() throws Exception {
+        // create and store the enrich policy
+        final var enrichPolicy = new EnrichPolicy(
+            EnrichPolicy.MATCH_TYPE,
+            null,
+            List.of(sourceIndexName),
+            "host.ip",
+            List.of("device.name", "host.ip")
+        );
+
+        // create the source index
+        createSourceIndices(client(), enrichPolicy);
+
+        // add a single document to the enrich index
+        setEnrichDeviceName("some.device." + randomAlphaOfLength(10));
+
+        // store the enrich policy
+        var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
+        client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
+
+        // execute the policy once
+        executeEnrichPolicy();
+
+        // add a low priority cluster state applier to increase the odds of a race occurring between
+        // the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the
+        // cluster state *listeners* having been run (which adjusts the alias and therefore the search results)
+        final var clusterService = node().injector().getInstance(ClusterService.class);
+        clusterService.addLowPriorityApplier((event) -> safeSleep(10));
+
+        // kick off some threads that just bang on _simulate in the background
+        final var finished = new AtomicBoolean(false);
+        for (int i = 0; i < 5; i++) {
+            new Thread(() -> {
+                while (finished.get() == false) {
+                    simulatePipeline();
+                }
+            }).start();
+        }
+
+        try {
+            for (int i = 0; i < randomIntBetween(10, 100); i++) {
+                final String deviceName = "some.device." + randomAlphaOfLength(10);
+
+                // add a single document to the enrich index
+                setEnrichDeviceName(deviceName);
+
+                // execute the policy
+                executeEnrichPolicy();
+
+                // simulate the pipeline and confirm that we see the expected result
+                assertBusy(() -> {
+                    var result = simulatePipeline();
+                    assertThat(result.getFailure(), nullValue());
+                    assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName));
+                });
+            }
+        } finally {
+            // we're finished, so those threads can all quit now
+            finished.set(true);
+        }
+    }
+
+    private SimulateDocumentBaseResult simulatePipeline() {
+        final var simulatePipelineRequest = jsonSimulatePipelineRequest("""
+            {
+              "pipeline": {
+                "processors": [
+                  {
+                    "enrich": {
+                      "policy_name": "device-enrich-policy",
+                      "field": "host.ip",
+                      "target_field": "_tmp.device"
+                    }
+                  },
+                  {
+                    "rename" : {
+                      "field" : "_tmp.device.device.name",
+                      "target_field" : "device.name"
+                    }
+                  }
+                ]
+              },
+              "docs": [
+                {
+                  "_source": {
+                    "host": {
+                      "ip": "10.151.80.8"
+                    }
+                  }
+                }
+              ]
+            }
+            """);
+        final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
+        return (SimulateDocumentBaseResult) response.getResults().getFirst();
+    }
+
+    private void setEnrichDeviceName(final String deviceName) {
+        final var indexRequest = new IndexRequest(sourceIndexName);
+        indexRequest.id("1"); // there's only one document, and we keep overwriting it
+        indexRequest.source(Strings.format("""
+            {
+              "host": {
+                "ip": "10.151.80.8"
+              },
+              "device": {
+                "name": "%s"
+              }
+            }
+            """, deviceName), XContentType.JSON);
+        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        client().index(indexRequest).actionGet();
+    }
+
+    private void executeEnrichPolicy() {
+        final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
+        client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
+    }
+
+}

+ 4 - 8
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

@@ -14,13 +14,12 @@ import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.script.TemplateScript;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 public abstract class AbstractEnrichProcessor extends AbstractProcessor {
 
@@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
     private final boolean overrideEnabled;
     protected final String matchField;
     protected final int maxMatches;
-    private final String indexAlias;
 
     protected AbstractEnrichProcessor(
         String tag,
@@ -55,8 +53,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
         this.overrideEnabled = overrideEnabled;
         this.matchField = matchField;
         this.maxMatches = maxMatches;
-        // note: since the policyName determines the indexAlias, we can calculate this once
-        this.indexAlias = EnrichPolicy.getBaseName(policyName);
     }
 
     public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@@ -72,7 +68,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
                 return;
             }
 
-            Supplier<SearchRequest> searchRequestSupplier = () -> {
+            final Function<String, SearchRequest> searchRequestBuilder = (concreteEnrichIndex) -> {
                 QueryBuilder queryBuilder = getQueryBuilder(value);
                 ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
                 SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
@@ -82,13 +78,13 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
                 searchBuilder.fetchSource(true);
                 searchBuilder.query(constantScore);
                 SearchRequest req = new SearchRequest();
-                req.indices(indexAlias);
+                req.indices(concreteEnrichIndex);
                 req.preference(Preference.LOCAL.type());
                 req.source(searchBuilder);
                 return req;
             };
 
-            searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
+            searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> {
                 if (e != null) {
                     handler.accept(null, e);
                     return;

+ 6 - 5
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;
 
@@ -137,17 +137,18 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
     }
 
     private SearchRunner createSearchRunner(final ProjectId projectId, final String indexAlias) {
-        Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
+        final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
         return (value, maxMatches, reqSupplier, handler) -> {
+            final String concreteEnrichIndex = getEnrichIndexKey(projectId, indexAlias);
             // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
             enrichCache.computeIfAbsent(
                 projectId,
-                getEnrichIndexKey(projectId, indexAlias),
+                concreteEnrichIndex,
                 value,
                 maxMatches,
                 (searchResponseActionListener) -> originClient.execute(
                     EnrichCoordinatorProxyAction.INSTANCE,
-                    reqSupplier.get(),
+                    reqSupplier.apply(concreteEnrichIndex),
                     searchResponseActionListener
                 ),
                 ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
@@ -167,7 +168,7 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
         void accept(
             Object value,
             int maxMatches,
-            Supplier<SearchRequest> searchRequestSupplier,
+            Function<String, SearchRequest> searchRequestBuilder,
             BiConsumer<List<Map<?, ?>>, Exception> handler
         );
     }

+ 3 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java

@@ -28,7 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
 import static org.hamcrest.Matchers.emptyArray;
@@ -162,10 +162,10 @@ public class GeoMatchProcessorTests extends ESTestCase {
         public void accept(
             Object value,
             int maxMatches,
-            Supplier<SearchRequest> searchRequestSupplier,
+            Function<String, SearchRequest> searchRequestBuilder,
             BiConsumer<List<Map<?, ?>>, Exception> handler
         ) {
-            capturedRequest.set(searchRequestSupplier.get());
+            capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
             if (exception != null) {
                 handler.accept(null, exception);
             } else {

+ 3 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java

@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.equalTo;
@@ -405,10 +405,10 @@ public class MatchProcessorTests extends ESTestCase {
         public void accept(
             Object value,
             int maxMatches,
-            Supplier<SearchRequest> searchRequestSupplier,
+            Function<String, SearchRequest> searchRequestBuilder,
             BiConsumer<List<Map<?, ?>>, Exception> handler
         ) {
-            capturedRequest.set(searchRequestSupplier.get());
+            capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
             if (exception != null) {
                 handler.accept(null, exception);
             } else {