Forráskód Böngészése

[Transform] Delete Alias Write Index (#122074)

When the Transform is configured to write to an alias, specifying
`DELETE _transform/<id>?delete_dest_index` will follow the alias
to the concrete destination index.

Fix #121913

Co-authored-by: Przemysław Witek <przemyslaw.witek@elastic.co>
Pat Whelan 8 hónapja
szülő
commit
99c5398137

+ 8 - 0
docs/changelog/122074.yaml

@@ -0,0 +1,8 @@
+pr: 122074
+summary: If the Transform is configured to write to an alias as its destination index,
+  when the delete_dest_index parameter is set to true, then the Delete API will now
+  delete the write index backing the alias
+area: Transform
+type: bug
+issues:
+ - 121913

+ 86 - 11
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDeleteIT.java

@@ -110,7 +110,7 @@ public class TransformDeleteIT extends TransformRestTestCase {
 
         deleteTransform(transformId, false, true);
         assertFalse(indexExists(transformDest));
-        assertFalse(aliasExists(transformDest));
+        assertFalse(aliasExists(transformDestAlias));
     }
 
     public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception {
@@ -139,7 +139,7 @@ public class TransformDeleteIT extends TransformRestTestCase {
         assertFalse(aliasExists(transformDestAlias));
     }
 
-    public void testDeleteWithParamDoesNotDeleteManuallySetUpAlias() throws Exception {
+    public void testDeleteWithManuallyCreatedIndexAndManuallyCreatedAlias() throws Exception {
         String transformId = "transform-4";
         String transformDest = transformId + "_idx";
         String transformDestAlias = transformId + "_alias";
@@ -158,31 +158,106 @@ public class TransformDeleteIT extends TransformRestTestCase {
         assertTrue(indexExists(transformDest));
         assertTrue(aliasExists(transformDestAlias));
 
+        deleteTransform(transformId, false, true);
+        assertFalse(indexExists(transformDest));
+        assertFalse(aliasExists(transformDestAlias));
+    }
+
+    public void testDeleteDestinationIndexIsNoOpWhenNoDestinationIndexExists() throws Exception {
+        String transformId = "transform-5";
+        String transformDest = transformId + "_idx";
+        String transformDestAlias = transformId + "_alias";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
+
+        createTransform(transformId, transformDest, transformDestAlias);
+        assertFalse(indexExists(transformDest));
+        assertFalse(aliasExists(transformDestAlias));
+
+        deleteTransform(transformId, false, true);
+        assertFalse(indexExists(transformDest));
+        assertFalse(aliasExists(transformDestAlias));
+    }
+
+    public void testDeleteWithAliasPointingToManyIndices() throws Exception {
+        var transformId = "transform-6";
+        var transformDest = transformId + "_idx";
+        var otherIndex = "some-other-index-6";
+        String transformDestAlias = transformId + "_alias";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, otherIndex, transformDestAlias);
+
+        createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }");
+        createIndex(otherIndex, null, null, "\"" + transformDestAlias + "\": {}");
+
+        assertTrue(indexExists(transformDest));
+        assertTrue(indexExists(otherIndex));
+        assertTrue(aliasExists(transformDestAlias));
+
+        createTransform(transformId, transformDestAlias, null);
+
+        startTransform(transformId);
+        waitForTransformCheckpoint(transformId, 1);
+
+        stopTransform(transformId, false);
+
+        assertTrue(indexExists(transformDest));
+        assertTrue(indexExists(otherIndex));
+        assertTrue(aliasExists(transformDestAlias));
+
+        deleteTransform(transformId, false, true);
+
+        assertFalse(indexExists(transformDest));
+        assertTrue(indexExists(otherIndex));
+        assertTrue(aliasExists(transformDestAlias));
+    }
+
+    public void testDeleteWithNoWriteIndexThrowsException() throws Exception {
+        var transformId = "transform-7";
+        var transformDest = transformId + "_idx";
+        var otherIndex = "some-other-index-7";
+        String transformDestAlias = transformId + "_alias";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, otherIndex, transformDestAlias);
+
+        createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": {}");
+
+        assertTrue(indexExists(transformDest));
+        assertTrue(aliasExists(transformDestAlias));
+
+        createTransform(transformId, transformDestAlias, null);
+
+        createIndex(otherIndex, null, null, "\"" + transformDestAlias + "\": {}");
+        assertTrue(indexExists(otherIndex));
+
         ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, false, true));
         assertThat(
             e.getMessage(),
             containsString(
                 Strings.format(
-                    "The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.",
+                    "Cannot disambiguate destination index alias [%s]. Alias points to many indices with no clear write alias."
+                        + " Retry with delete_dest_index=false and manually clean up destination index.",
                     transformDestAlias
                 )
             )
         );
     }
 
-    public void testDeleteDestinationIndexIsNoOpWhenNoDestinationIndexExists() throws Exception {
-        String transformId = "transform-5";
-        String transformDest = transformId + "_idx";
-        String transformDestAlias = transformId + "_alias";
-        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
+    public void testDeleteWithAlreadyDeletedIndex() throws Exception {
+        var transformId = "transform-8";
+        var transformDest = transformId + "_idx";
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
+
+        createIndex(transformDest);
+
+        assertTrue(indexExists(transformDest));
+
+        createTransform(transformId, transformDest, null);
+
+        deleteIndex(transformDest);
 
-        createTransform(transformId, transformDest, transformDestAlias);
         assertFalse(indexExists(transformDest));
-        assertFalse(aliasExists(transformDestAlias));
 
         deleteTransform(transformId, false, true);
+
         assertFalse(indexExists(transformDest));
-        assertFalse(aliasExists(transformDestAlias));
     }
 
     private void createTransform(String transformId, String destIndex, String destAlias) throws IOException {

+ 1 - 1
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -412,7 +412,7 @@ public abstract class TransformRestTestCase extends TransformCommonRestTestCase
         }
         updateTransformRequest.setJsonEntity(update);
 
-        client().performRequest(updateTransformRequest);
+        assertOKAndConsume(client().performRequest(updateTransformRequest));
     }
 
     protected void startTransform(String transformId) throws IOException {

+ 80 - 19
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java

@@ -10,9 +10,13 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesAction;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
 import org.elasticsearch.client.internal.Client;
@@ -27,6 +31,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -42,6 +47,8 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
 import org.elasticsearch.xpack.transform.transforms.TransformTask;
 
+import java.util.Objects;
+
 import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
@@ -146,20 +153,31 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN
         TimeValue timeout,
         ActionListener<AcknowledgedResponse> listener
     ) {
-        // <3> Check if the error is "index not found" error. If so, just move on. The index is already deleted.
-        ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(listener::onResponse, e -> {
-            if (e instanceof IndexNotFoundException) {
-                listener.onResponse(AcknowledgedResponse.TRUE);
-            } else {
-                listener.onFailure(e);
-            }
-        });
+        getTransformConfig(transformId).<AcknowledgedResponse>andThen((l, r) -> deleteDestinationIndex(r.v1(), parentTaskId, timeout, l))
+            .addListener(listener.delegateResponse((l, e) -> {
+                if (e instanceof IndexNotFoundException) {
+                    l.onResponse(AcknowledgedResponse.TRUE);
+                } else {
+                    l.onFailure(e);
+                }
+            }));
+    }
 
-        // <2> Delete destination index
-        ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
-            transformConfigAndVersion -> {
-                TransformConfig config = transformConfigAndVersion.v1();
-                String destIndex = config.getDestination().getIndex();
+    private SubscribableListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfig(String transformId) {
+        return SubscribableListener.newForked(l -> transformConfigManager.getTransformConfigurationForUpdate(transformId, l));
+    }
+
+    /**
+     * Delete the destination index.  If the Transform is configured to write to an alias, then follow that alias to the concrete index.
+     */
+    private void deleteDestinationIndex(
+        TransformConfig config,
+        TaskId parentTaskId,
+        TimeValue timeout,
+        ActionListener<AcknowledgedResponse> listener
+    ) {
+        SubscribableListener.<String>newForked(l -> resolveDestinationIndex(config, parentTaskId, timeout, l))
+            .<AcknowledgedResponse>andThen((l, destIndex) -> {
                 DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
                 deleteDestIndexRequest.ackTimeout(timeout);
                 deleteDestIndexRequest.setParentTask(parentTaskId);
@@ -169,14 +187,57 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN
                     client,
                     TransportDeleteIndexAction.TYPE,
                     deleteDestIndexRequest,
-                    deleteDestIndexListener
+                    l
                 );
-            },
-            listener::onFailure
-        );
+            })
+            .addListener(listener);
+    }
+
+    private void resolveDestinationIndex(TransformConfig config, TaskId parentTaskId, TimeValue timeout, ActionListener<String> listener) {
+        var destIndex = config.getDestination().getIndex();
+        var responseListener = ActionListener.<GetAliasesResponse>wrap(r -> findDestinationIndexInAliases(r, destIndex, listener), e -> {
+            if (e instanceof AliasesNotFoundException) {
+                // no alias == the destIndex is our concrete index
+                listener.onResponse(destIndex);
+            } else {
+                listener.onFailure(e);
+            }
+        });
+
+        GetAliasesRequest request = new GetAliasesRequest(timeout, destIndex);
+        request.setParentTask(parentTaskId);
+        executeWithHeadersAsync(config.getHeaders(), TRANSFORM_ORIGIN, client, GetAliasesAction.INSTANCE, request, responseListener);
+    }
 
-        // <1> Fetch transform configuration
-        transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener);
+    private static void findDestinationIndexInAliases(GetAliasesResponse aliases, String destIndex, ActionListener<String> listener) {
+        var indexToAliases = aliases.getAliases();
+        if (indexToAliases.isEmpty()) {
+            // if the alias list is empty, that means the index is a concrete index
+            listener.onResponse(destIndex);
+        } else if (indexToAliases.size() == 1) {
+            // if there is one value, the alias will treat it as the write index, so it's our destination index
+            listener.onResponse(indexToAliases.keySet().iterator().next());
+        } else {
+            // if there is more than one index, there may be more than one alias for each index
+            // we have to search for the alias that matches our destination index name AND is declared the write index for that alias
+            indexToAliases.entrySet().stream().map(entry -> {
+                if (entry.getValue().stream().anyMatch(md -> destIndex.equals(md.getAlias()) && Boolean.TRUE.equals(md.writeIndex()))) {
+                    return entry.getKey();
+                } else {
+                    return null;
+                }
+            }).filter(Objects::nonNull).findFirst().ifPresentOrElse(listener::onResponse, () -> {
+                listener.onFailure(
+                    new ElasticsearchStatusException(
+                        "Cannot disambiguate destination index alias ["
+                            + destIndex
+                            + "]. Alias points to many indices with no clear write alias. Retry with delete_dest_index=false and manually"
+                            + " clean up destination index.",
+                        RestStatus.CONFLICT
+                    )
+                );
+            });
+        }
     }
 
     @Override