Browse Source

Delete empty .ml-state* indices during nightly maintenance task. (#53587)

Przemysław Witek 5 years ago
parent
commit
bb582f1317

+ 42 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

@@ -6,6 +6,8 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -14,6 +16,7 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.update.UpdateAction;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -41,6 +44,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -277,6 +281,44 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
             nonExistingJobDocsCount, equalTo(0));
     }
 
+    /**
+     * Verifies empty state indices deletion. Here is the summary of indices used by the test:
+     *
+     * +------------------+--------+----------+-------------------------+
+     * | index name       | empty? | current? | expected to be removed? |
+     * +------------------+--------+----------+-------------------------+
+     * | .ml-state        | yes    | no       | yes                     |
+     * | .ml-state-000001 | no     | no       | no                      |
+     * | .ml-state-000003 | yes    | no       | yes                     |
+     * | .ml-state-000005 | no     | no       | no                      |
+     * | .ml-state-000007 | yes    | yes      | no                      |
+     * +------------------+--------+----------+-------------------------+
+     */
+    public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception {
+        client().admin().indices().prepareCreate(".ml-state").get();
+        client().admin().indices().prepareCreate(".ml-state-000001").get();
+        client().prepareIndex(".ml-state-000001").setSource("field_1", "value_1").get();
+        client().admin().indices().prepareCreate(".ml-state-000003").get();
+        client().admin().indices().prepareCreate(".ml-state-000005").get();
+        client().prepareIndex(".ml-state-000005").setSource("field_5", "value_5").get();
+        client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get();
+        refresh();
+
+        GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
+        assertThat(Strings.toString(getIndexResponse),
+            getIndexResponse.getIndices(),
+            is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007")));
+
+        client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
+        refresh();
+
+        getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
+        assertThat(Strings.toString(getIndexResponse),
+            getIndexResponse.getIndices(),
+            // Only non-empty or current indices should survive deletion process
+            is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
+    }
+
     private static Job.Builder newJobBuilder(String id) {
         Detector.Builder detector = new Detector.Builder();
         detector.setFunction("count");

+ 3 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

@@ -21,6 +21,7 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
 import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.job.retention.EmptyStateIndexRemover;
 import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
 import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
 import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
@@ -84,7 +85,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                 new ExpiredResultsRemover(client, auditor, threadPool),
                 new ExpiredForecastsRemover(client, threadPool),
                 new ExpiredModelSnapshotsRemover(client, threadPool),
-                new UnusedStateRemover(client, clusterService)
+                new UnusedStateRemover(client, clusterService),
+                new EmptyStateIndexRemover(client)
         );
         Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
         deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true);

+ 109 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java

@@ -0,0 +1,109 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.job.retention;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * This class deletes empty indices matching .ml-state* pattern that are not pointed at by the .ml-state-write alias.
+ */
+public class EmptyStateIndexRemover implements MlDataRemover {
+    
+    private final OriginSettingClient client;
+
+    public EmptyStateIndexRemover(OriginSettingClient client) {
+        this.client = Objects.requireNonNull(client);
+    }
+
+    @Override
+    public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
+        try {
+            if (isTimedOutSupplier.get()) {
+                listener.onResponse(false);
+                return;
+            }
+            getEmptyStateIndices(
+                ActionListener.wrap(
+                    emptyStateIndices -> {
+                        if (emptyStateIndices.isEmpty()) {
+                            listener.onResponse(true);
+                            return;
+                        }
+                        getCurrentStateIndices(
+                            ActionListener.wrap(
+                                currentStateIndices -> {
+                                    Set<String> stateIndicesToRemove = Sets.difference(emptyStateIndices, currentStateIndices);
+                                    if (stateIndicesToRemove.isEmpty()) {
+                                        listener.onResponse(true);
+                                        return;
+                                    }
+                                    executeDeleteEmptyStateIndices(stateIndicesToRemove, listener);
+                                },
+                                listener::onFailure
+                            )
+                        );
+                    },
+                    listener::onFailure
+                )
+            );
+        } catch (Exception e) {
+            listener.onFailure(e);
+        }
+    }
+
+    private void getEmptyStateIndices(ActionListener<Set<String>> listener) {
+        IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest().indices(AnomalyDetectorsIndex.jobStateIndexPattern());
+        client.admin().indices().stats(
+            indicesStatsRequest,
+            ActionListener.wrap(
+                indicesStatsResponse -> {
+                    Set<String> emptyStateIndices =
+                        indicesStatsResponse.getIndices().values().stream()
+                            .filter(stats -> stats.getTotal().getDocs().getCount() == 0)
+                            .map(IndexStats::getIndex)
+                            .collect(toSet());
+                    listener.onResponse(emptyStateIndices);
+                },
+                listener::onFailure
+            )
+        );
+    }
+
+    private void getCurrentStateIndices(ActionListener<Set<String>> listener) {
+        GetIndexRequest getIndexRequest = new GetIndexRequest().indices(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
+        client.admin().indices().getIndex(
+            getIndexRequest,
+            ActionListener.wrap(
+                getIndexResponse -> listener.onResponse(Set.of(getIndexResponse.getIndices())),
+                listener::onFailure
+            )
+        );
+    }
+
+    private void executeDeleteEmptyStateIndices(Set<String> emptyStateIndices, ActionListener<Boolean> listener) {
+        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(emptyStateIndices.toArray(new String[0]));
+        client.admin().indices().delete(
+            deleteIndexRequest,
+            ActionListener.wrap(
+                deleteResponse -> listener.onResponse(deleteResponse.isAcknowledged()),
+                listener::onFailure
+            )
+        );
+    }
+}

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

@@ -41,9 +41,9 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
- * If for any reason a job is deleted by some of its state documents
+ * If for any reason a job is deleted but some of its state documents
  * are left behind, this class deletes any unused documents stored
- * in the .ml-state index.
+ * in the .ml-state* indices.
  */
 public class UnusedStateRemover implements MlDataRemover {
 

+ 176 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java

@@ -0,0 +1,176 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.job.retention;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.get.GetIndexAction;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.stats.CommonStats;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
+
+import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class EmptyStateIndexRemoverTests extends ESTestCase {
+
+    private Client client;
+    private ActionListener<Boolean> listener;
+    private ArgumentCaptor<DeleteIndexRequest> deleteIndexRequestCaptor;
+
+    private EmptyStateIndexRemover remover;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUpTests() {
+        client = mock(Client.class);
+        OriginSettingClient originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
+        listener = mock(ActionListener.class);
+        deleteIndexRequestCaptor = ArgumentCaptor.forClass(DeleteIndexRequest.class);
+
+        remover = new EmptyStateIndexRemover(originSettingClient);
+    }
+
+    @After
+    public void verifyNoOtherInteractionsWithMocks() {
+        verify(client).settings();
+        verify(client, atLeastOnce()).threadPool();
+        verifyNoMoreInteractions(client, listener);
+    }
+
+    public void testRemove_TimedOut() {
+        remover.remove(listener, () -> true);
+
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(listener).onResponse(false);
+    }
+
+    public void testRemove_NoStateIndices() {
+        IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+        when(indicesStatsResponse.getIndices()).thenReturn(Map.of());
+        doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any());
+
+        remover.remove(listener, () -> false);
+
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+        inOrder.verify(listener).onResponse(true);
+    }
+
+    public void testRemove_NoEmptyStateIndices() {
+        IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+        doReturn(
+            Map.of(
+                ".ml-state-a", indexStats(".ml-state-a", 1),
+                ".ml-state-b", indexStats(".ml-state-b", 2),
+                ".ml-state-c", indexStats(".ml-state-c", 1),
+                ".ml-state-d", indexStats(".ml-state-d", 2))).when(indicesStatsResponse).getIndices();
+        doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+
+        remover.remove(listener, () -> false);
+
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+        inOrder.verify(listener).onResponse(true);
+    }
+
+    private void assertDeleteActionExecuted(boolean acknowledged) {
+        IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+        doReturn(
+            Map.of(
+                ".ml-state-a", indexStats(".ml-state-a", 1),
+                ".ml-state-b", indexStats(".ml-state-b", 0),
+                ".ml-state-c", indexStats(".ml-state-c", 2),
+                ".ml-state-d", indexStats(".ml-state-d", 0),
+                ".ml-state-e", indexStats(".ml-state-e", 0))).when(indicesStatsResponse).getIndices();
+        doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+
+        GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-e" }, null, null, null, null);
+        doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
+
+        AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged);
+        doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any());
+
+        remover.remove(listener, () -> false);
+
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+        inOrder.verify(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
+        inOrder.verify(client).execute(eq(DeleteIndexAction.INSTANCE), deleteIndexRequestCaptor.capture(), any());
+        inOrder.verify(listener).onResponse(acknowledged);
+
+        DeleteIndexRequest deleteIndexRequest = deleteIndexRequestCaptor.getValue();
+        assertThat(deleteIndexRequest.indices(), arrayContainingInAnyOrder(".ml-state-b", ".ml-state-d"));
+    }
+
+    public void testRemove_DeleteAcknowledged() {
+        assertDeleteActionExecuted(true);
+    }
+
+    public void testRemove_DeleteNotAcknowledged() {
+        assertDeleteActionExecuted(false);
+    }
+
+    public void testRemove_NoIndicesToRemove() {
+        IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class);
+        doReturn(Map.of(".ml-state-a", indexStats(".ml-state-a", 0))).when(indicesStatsResponse).getIndices();
+        doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+
+        GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null);
+        doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
+
+        remover.remove(listener, () -> false);
+
+        InOrder inOrder = inOrder(client, listener);
+        inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
+        inOrder.verify(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
+        inOrder.verify(listener).onResponse(true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <Response> Answer<Response> withResponse(Response response) {
+        return invocationOnMock -> {
+            ActionListener<Response> listener = (ActionListener<Response>) invocationOnMock.getArguments()[2];
+            listener.onResponse(response);
+            return null;
+        };
+    }
+
+    private static IndexStats indexStats(String index, int docCount) {
+        CommonStats indexTotalStats = mock(CommonStats.class);
+        when(indexTotalStats.getDocs()).thenReturn(new DocsStats(docCount, 0, 0));
+        IndexStats indexStats = mock(IndexStats.class);
+        when(indexStats.getIndex()).thenReturn(index);
+        when(indexStats.getTotal()).thenReturn(indexTotalStats);
+        return indexStats;
+    }
+}