ソースを参照

Fail delete policy if pipeline exists (#44438)

If a pipeline that refrences the policy exists, we should not allow the
policy to be deleted. The user will need to remove the processor from
the pipeline before deleting the policy. This commit adds a check to
ensure that the policy cannot be deleted if it is referenced by any
pipeline in the system.
Michael Basnight 6 年 前
コミット
9e22fd4db8

+ 1 - 0
docs/reference/ingest/ingest-node.asciidoc

@@ -950,6 +950,7 @@ Which returns:
 
 [source,js]
 --------------------------------------------------
+DELETE /_ingest/pipeline/user_lookup
 DELETE /_enrich/policy/users-policy
 --------------------------------------------------
 // CONSOLE

+ 1 - 0
docs/reference/ingest/processors/enrich.asciidoc

@@ -92,6 +92,7 @@ PUT _ingest/pipeline/user_lookup
 
 [source,js]
 --------------------------------------------------
+DELETE /_ingest/pipeline/user_lookup
 DELETE /_enrich/policy/users-policy
 --------------------------------------------------
 // CONSOLE

+ 44 - 1
x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java

@@ -33,7 +33,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
         }
     }
 
-    public void testBasicFlow() throws Exception {
+    private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
         // Create the policy:
         Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
         putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"match_field\": \"host\", " +
@@ -75,6 +75,15 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
         assertThat(_source.get("host"), equalTo("elastic.co"));
         assertThat(_source.get("global_rank"), equalTo(25));
         assertThat(_source.get("tld_rank"), equalTo(7));
+
+        if (deletePipeilne) {
+            // delete the pipeline so the policies can be deleted
+            client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
+        }
+    }
+
+    public void testBasicFlow() throws Exception {
+        setupGenericLifecycleTest(true);
     }
 
     public void testImmutablePolicy() throws IOException {
@@ -87,6 +96,40 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
         assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));
     }
 
+    public void testDeleteIsCaseSensitive() throws Exception {
+        Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
+        putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"enrich_key\": \"host\", " +
+            "\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"]}");
+        assertOK(client().performRequest(putPolicyRequest));
+
+        ResponseException exc = expectThrows(ResponseException.class,
+            () -> client().performRequest(new Request("DELETE", "/_enrich/policy/MY_POLICY")));
+        assertTrue(exc.getMessage().contains("policy [MY_POLICY] not found"));
+    }
+
+    public void testDeleteExistingPipeline() throws Exception {
+        // lets not delete the pipeline at first, to test the failure
+        setupGenericLifecycleTest(false);
+
+        Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
+        putPipelineRequest.setJsonEntity("{\"processors\":[" +
+            "{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
+            "{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
+            "{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
+            "]}}" +
+            "]}");
+        assertOK(client().performRequest(putPipelineRequest));
+
+        ResponseException exc = expectThrows(ResponseException.class,
+            () -> client().performRequest(new Request("DELETE", "/_enrich/policy/my_policy")));
+        assertTrue(exc.getMessage().contains("Could not delete policy [my_policy] because" +
+            " a pipeline is referencing it [my_pipeline, another_pipeline]"));
+
+        // delete the pipelines so the policies can be deleted
+        client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
+        client().performRequest(new Request("DELETE", "/_ingest/pipeline/another_pipeline"));
+    }
+
     private static Map<String, Object> toMap(Response response) throws IOException {
         return toMap(EntityUtils.toString(response.getEntity()));
     }

+ 22 - 0
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.enrich;
+
+import org.elasticsearch.ingest.AbstractProcessor;
+
+public abstract class AbstractEnrichProcessor extends AbstractProcessor {
+
+    private final String policyName;
+
+    protected AbstractEnrichProcessor(String tag, String policyName) {
+        super(tag);
+        this.policyName = policyName;
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+}

+ 3 - 10
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java

@@ -12,7 +12,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.routing.Preference;
 import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.ingest.AbstractProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -24,12 +23,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 
-final class ExactMatchProcessor extends AbstractProcessor {
+public final class ExactMatchProcessor extends AbstractEnrichProcessor {
 
     static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
 
     private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
-    private final String policyName;
     private final String enrichKey;
     private final boolean ignoreMissing;
     private final boolean overrideEnabled;
@@ -60,9 +58,8 @@ final class ExactMatchProcessor extends AbstractProcessor {
                         boolean ignoreMissing,
                         boolean overrideEnabled,
                         List<EnrichSpecification> specifications) {
-        super(tag);
+        super(tag, policyName);
         this.searchRunner = searchRunner;
-        this.policyName = policyName;
         this.enrichKey = enrichKey;
         this.ignoreMissing = ignoreMissing;
         this.overrideEnabled = overrideEnabled;
@@ -89,7 +86,7 @@ final class ExactMatchProcessor extends AbstractProcessor {
             searchBuilder.query(constantScore);
 
             SearchRequest req = new SearchRequest();
-            req.indices(EnrichPolicy.getBaseName(policyName));
+            req.indices(EnrichPolicy.getBaseName(getPolicyName()));
             req.preference(Preference.LOCAL.type());
             req.source(searchBuilder);
 
@@ -137,10 +134,6 @@ final class ExactMatchProcessor extends AbstractProcessor {
         return EnrichProcessorFactory.TYPE;
     }
 
-    String getPolicyName() {
-        return policyName;
-    }
-
     String getEnrichKey() {
         return enrichKey;
     }

+ 33 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.enrich.action;
 
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -16,24 +17,35 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.ingest.PipelineConfiguration;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.AbstractEnrichProcessor;
 import org.elasticsearch.xpack.enrich.EnrichStore;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
 
+    private final IngestService ingestService;
+
     @Inject
     public TransportDeleteEnrichPolicyAction(TransportService transportService,
                                              ClusterService clusterService,
                                              ThreadPool threadPool,
                                              ActionFilters actionFilters,
-                                             IndexNameExpressionResolver indexNameExpressionResolver) {
+                                             IndexNameExpressionResolver indexNameExpressionResolver,
+                                             IngestService ingestService) {
         super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
             DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
+        this.ingestService = ingestService;
     }
 
     @Override
@@ -53,6 +65,26 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
     @Override
     protected void masterOperation(Task task, DeleteEnrichPolicyAction.Request request, ClusterState state,
                                    ActionListener<AcknowledgedResponse> listener) throws Exception {
+        List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
+        EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
+        List<String> pipelinesWithProcessors = new ArrayList<>();
+
+        for (PipelineConfiguration pipelineConfiguration : pipelines) {
+            List<AbstractEnrichProcessor> enrichProcessors =
+                ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
+            for (AbstractEnrichProcessor processor: enrichProcessors) {
+                if (processor.getPolicyName().equals(request.getName())) {
+                    pipelinesWithProcessors.add(pipelineConfiguration.getId());
+                }
+            }
+        }
+
+        if (pipelinesWithProcessors.isEmpty() == false) {
+            listener.onFailure(
+                new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
+                    RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
+        }
+
         EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
            if (e == null) {
                listener.onResponse(new AcknowledgedResponse(true));