Browse Source

Remove enrich indices on delete policy (#45870)

When a policy is deleted, the enrich indices that are backing the policy
alias should also be deleted. This commit does that work and cleans up
the transport action a bit so that the lock release is easier to see, as
well as to ensure that any action carried out, regardless of exception,
unlocks the policy.
Michael Basnight 6 years ago
parent
commit
1c4ffd30cd

+ 71 - 22
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java

@@ -6,10 +6,14 @@
 package org.elasticsearch.xpack.enrich.action;
 
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -37,6 +41,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
 
     private final EnrichPolicyLocks enrichPolicyLocks;
     private final IngestService ingestService;
+    private final Client client;
+    // the most lenient we can get in order to not bomb out if no indices are found, which is a valid case
+    // where a user creates and deletes a policy before running execute
+    private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);
+
 
     @Inject
     public TransportDeleteEnrichPolicyAction(TransportService transportService,
@@ -44,10 +53,12 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
                                              ThreadPool threadPool,
                                              ActionFilters actionFilters,
                                              IndexNameExpressionResolver indexNameExpressionResolver,
+                                             Client client,
                                              EnrichPolicyLocks enrichPolicyLocks,
                                              IngestService ingestService) {
         super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
             DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
+        this.client = client;
         this.enrichPolicyLocks = enrichPolicyLocks;
         this.ingestService = ingestService;
     }
@@ -69,36 +80,74 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
     @Override
     protected void masterOperation(Task task, DeleteEnrichPolicyAction.Request request, ClusterState state,
                                    ActionListener<AcknowledgedResponse> listener) throws Exception {
+        EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
+        if (policy == null) {
+            throw new ResourceNotFoundException("policy [{}] not found", request.getName());
+        }
+
         enrichPolicyLocks.lockPolicy(request.getName());
-        List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
-        EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
-        List<String> pipelinesWithProcessors = new ArrayList<>();
-
-        for (PipelineConfiguration pipelineConfiguration : pipelines) {
-            List<AbstractEnrichProcessor> enrichProcessors =
-                ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
-            for (AbstractEnrichProcessor processor: enrichProcessors) {
-                if (processor.getPolicyName().equals(request.getName())) {
-                    pipelinesWithProcessors.add(pipelineConfiguration.getId());
+        try {
+            List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
+            List<String> pipelinesWithProcessors = new ArrayList<>();
+
+            for (PipelineConfiguration pipelineConfiguration : pipelines) {
+                List<AbstractEnrichProcessor> enrichProcessors =
+                    ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
+                for (AbstractEnrichProcessor processor : enrichProcessors) {
+                    if (processor.getPolicyName().equals(request.getName())) {
+                        pipelinesWithProcessors.add(pipelineConfiguration.getId());
+                    }
                 }
             }
-        }
 
-        if (pipelinesWithProcessors.isEmpty() == false) {
+            if (pipelinesWithProcessors.isEmpty() == false) {
+                throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
+                        RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
+            }
+        } catch (Exception e) {
             enrichPolicyLocks.releasePolicy(request.getName());
-            listener.onFailure(
-                new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
-                    RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
+            listener.onFailure(e);
             return;
         }
 
-        EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
-            enrichPolicyLocks.releasePolicy(request.getName());
-           if (e == null) {
-               listener.onResponse(new AcknowledgedResponse(true));
-           } else {
-               listener.onFailure(e);
-           }
+        deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
+            (response) -> {
+                enrichPolicyLocks.releasePolicy(request.getName());
+                listener.onResponse(response);
+            },
+            (exc) -> {
+                enrichPolicyLocks.releasePolicy(request.getName());
+                listener.onFailure(exc);
+            }
+        ));
+    }
+
+    private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
+        // delete all enrich indices for this policy
+        DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
+            .indices(EnrichPolicy.getBaseName(name) + "-*")
+            .indicesOptions(LENIENT_OPTIONS);
+
+        client.admin().indices().delete(deleteRequest, ActionListener.wrap(
+            (response) -> {
+                if (response.isAcknowledged() == false) {
+                    listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
+                        RestStatus.INTERNAL_SERVER_ERROR, name));
+                } else {
+                    deletePolicy(name, listener);
+                }
+            },
+            (error) -> listener.onFailure(error)
+        ));
+    }
+
+    private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {
+        EnrichStore.deletePolicy(name, clusterService, e -> {
+            if (e == null) {
+                listener.onResponse(new AcknowledgedResponse(true));
+            } else {
+                listener.onFailure(e);
+            }
         });
     }
 

+ 1 - 1
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java

@@ -63,7 +63,7 @@ public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadActio
         } else {
             EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
             if (policy == null) {
-                throw new ResourceNotFoundException("Policy [{}] was not found", request.getName());
+                throw new ResourceNotFoundException("Policy [{}] not found", request.getName());
             }
             policies = Map.of(request.getName(), policy);
 

+ 1 - 1
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java

@@ -34,7 +34,7 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
         return error;
     }
 
-    void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
+    protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<Exception> error = new AtomicReference<>();
         EnrichStore.deletePolicy(name, clusterService, e -> {

+ 109 - 3
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnricyPolicyActionTests.java → x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java

@@ -6,16 +6,20 @@
 
 package org.elasticsearch.xpack.enrich.action;
 
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
-import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
 import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
+import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
+import org.elasticsearch.xpack.enrich.EnrichStore;
+import org.junit.After;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
@@ -25,9 +29,56 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
-public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
+public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {
+
+    @After
+    private void cleanupPolicy() {
+        ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+        String name = "my-policy";
+
+        try {
+            deleteEnrichPolicy(name, clusterService);
+        } catch (Exception e) {
+            // if the enrich policy does not exist, then just keep going
+        }
 
-    public void testDeleteIsNotLocked() throws InterruptedException {
+        // fail if the state of this is left locked
+        EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
+        assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
+    }
+
+    public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
+        String fakeId = "fake-id";
+        createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
+        createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<Exception> reference = new AtomicReference<>();
+        final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
+        ActionTestUtils.execute(transportAction, null,
+            new DeleteEnrichPolicyAction.Request(fakeId),
+            new ActionListener<AcknowledgedResponse>() {
+                @Override
+                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                    fail();
+                }
+
+                public void onFailure(final Exception e) {
+                    reference.set(e);
+                    latch.countDown();
+                }
+            });
+        latch.await();
+        assertNotNull(reference.get());
+        assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
+        assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found"));
+
+        // fail if the state of this is left locked
+        EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
+        assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
+    }
+
+    public void testDeleteWithoutIndex() throws Exception {
         EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
         ClusterService clusterService = getInstanceFromNode(ClusterService.class);
         String name = "my-policy";
@@ -54,6 +105,56 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
         latch.await();
         assertNotNull(reference.get());
         assertTrue(reference.get().isAcknowledged());
+
+        EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
+        assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
+
+        assertNull(EnrichStore.getPolicy(name, clusterService.state()));
+    }
+
+    public void testDeleteIsNotLocked() throws Exception {
+        EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
+        ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+        String name = "my-policy";
+
+        AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
+        assertThat(error.get(), nullValue());
+
+        createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
+        createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
+
+        client().admin().indices().prepareGetIndex().setIndices(
+            EnrichPolicy.getBaseName(name) + "-foo1",
+            EnrichPolicy.getBaseName(name) + "-foo2").get();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
+        final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
+        ActionTestUtils.execute(transportAction, null,
+            new DeleteEnrichPolicyAction.Request(name),
+            new ActionListener<AcknowledgedResponse>() {
+                @Override
+                public void onResponse(AcknowledgedResponse acknowledgedResponse) {
+                    reference.set(acknowledgedResponse);
+                    latch.countDown();
+                }
+
+                public void onFailure(final Exception e) {
+                    fail();
+                }
+            });
+        latch.await();
+        assertNotNull(reference.get());
+        assertTrue(reference.get().isAcknowledged());
+
+        expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
+            EnrichPolicy.getBaseName(name) + "-foo1",
+            EnrichPolicy.getBaseName(name) + "-foo2").get());
+
+        EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
+        assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
+
+        assertNull(EnrichStore.getPolicy(name, clusterService.state()));
     }
 
     public void testDeleteLocked() throws InterruptedException {
@@ -64,6 +165,9 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
         AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
         assertThat(error.get(), nullValue());
 
+        createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
+        createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
+
         EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
         assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
 
@@ -117,6 +221,8 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
             assertTrue(reference.get().isAcknowledged());
 
             assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
+
+            assertNull(EnrichStore.getPolicy(name, clusterService.state()));
         }
     }
 }

+ 13 - 22
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java

@@ -9,13 +9,12 @@ package org.elasticsearch.xpack.enrich.action;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionTestUtils;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
-import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
 import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
+import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
 import org.junit.After;
 
 import java.util.concurrent.CountDownLatch;
@@ -31,6 +30,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
 
     @After
     private void cleanupPolicies() throws InterruptedException {
+        ClusterService clusterService = getInstanceFromNode(ClusterService.class);
+
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
         final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
@@ -53,26 +54,16 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         GetEnrichPolicyAction.Response response = reference.get();
 
         for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
-            final CountDownLatch loopLatch = new CountDownLatch(1);
-            final AtomicReference<AcknowledgedResponse> loopReference = new AtomicReference<>();
-            final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
-            ActionTestUtils.execute(deleteAction, null,
-                new DeleteEnrichPolicyAction.Request(policy.getName()),
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(AcknowledgedResponse acknowledgedResponse) {
-                        loopReference.set(acknowledgedResponse);
-                        loopLatch.countDown();
-                    }
-
-                    public void onFailure(final Exception e) {
-                        fail();
-                    }
-                });
-            loopLatch.await();
-            assertNotNull(loopReference.get());
-            assertTrue(loopReference.get().isAcknowledged());
+            try {
+                deleteEnrichPolicy(policy.getName(), clusterService);
+            } catch (Exception e) {
+                // if the enrich policy does not exist, then just keep going
+            }
         }
+
+        // fail if the state of this is left locked
+        EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
+        assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
     }
 
     public void testListPolicies() throws InterruptedException {
@@ -205,6 +196,6 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
         assertNotNull(reference.get());
         assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
         assertThat(reference.get().getMessage(),
-            equalTo("Policy [non-exists] was not found"));
+            equalTo("Policy [non-exists] not found"));
     }
 }