Ver Fonte

[Transform] Make force-stopping the transform always remove persistent task from cluster state (#106989)

Przemysław Witek há 1 ano atrás
pai
commit
e21f2e30fb

+ 7 - 0
docs/changelog/106989.yaml

@@ -0,0 +1,7 @@
+pr: 106989
+summary: Make force-stopping the transform always remove persistent task from cluster
+  state
+area: Transform
+type: bug
+issues:
+ - 106811

+ 14 - 13
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java

@@ -241,38 +241,39 @@ public class TransformIT extends TransformRestTestCase {
             long sleepAfterStartMillis = randomLongBetween(0, 5_000);
             boolean force = randomBoolean();
             try {
-                // Create the continuous transform
+                // Create the continuous transform.
                 putTransform(transformId, config, RequestOptions.DEFAULT);
                 assertThat(getTransformTasks(), is(empty()));
                 assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
                 startTransform(transformId, RequestOptions.DEFAULT);
-                // There is 1 transform task after start
+                // There is 1 transform task after start.
                 assertThat(getTransformTasks(), hasSize(1));
                 assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
                 Thread.sleep(sleepAfterStartMillis);
-                // There should still be 1 transform task as the transform is continuous
+                // There should still be 1 transform task as the transform is continuous.
                 assertThat(getTransformTasks(), hasSize(1));
                 assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
-                // Stop the transform with force set randomly
+                // Stop the transform with force set randomly.
                 stopTransform(transformId, true, null, false, force);
-                // After the transform is stopped, there should be no transform task left
-                assertThat(getTransformTasks(), is(empty()));
+                if (force) {
+                    // If the "force" has been used, then the persistent task is removed from the cluster state but the local task can still
+                    // be seen by the PersistentTasksNodeService. We need to wait until PersistentTasksNodeService reconciles the state.
+                    assertBusy(() -> assertThat(getTransformTasks(), is(empty())));
+                } else {
+                    // If the "force" hasn't been used then we can expect the local task to be already gone.
+                    assertThat(getTransformTasks(), is(empty()));
+                }
+                // After the transform is stopped, there should be no transform task left in the cluster state.
                 assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
                 // Delete the transform
                 deleteTransform(transformId);
             } catch (AssertionError | Exception e) {
                 throw new AssertionError(
-                    format(
-                        "Failure at iteration %d (sleepAfterStartMillis=%s,force=%s): %s",
-                        i,
-                        sleepAfterStartMillis,
-                        force,
-                        e.getMessage()
-                    ),
+                    format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()),
                     e
                 );
             }

+ 0 - 3
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

@@ -65,9 +65,6 @@ import static org.hamcrest.core.Is.is;
 
 public abstract class TransformRestTestCase extends TransformCommonRestTestCase {
 
-    protected static final String AUTH_KEY = "Authorization";
-    protected static final String SECONDARY_AUTH_KEY = "es-secondary-authorization";
-
     private final Set<String> createdTransformIds = new HashSet<>();
 
     protected void cleanUp() throws Exception {

+ 46 - 5
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.transform.integration;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
-import org.elasticsearch.core.Strings;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
 
@@ -19,6 +18,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.core.Strings.format;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -86,10 +86,10 @@ public class TransformRobustnessIT extends TransformRestTestCase {
         deleteTransform(transformId);
     }
 
-    public void testCreateAndDeleteTransformInALoop() throws IOException {
+    public void testBatchTransformLifecycltInALoop() throws IOException {
         createReviewsIndex();
 
-        String transformId = "test_create_and_delete_in_a_loop";
+        String transformId = "test_batch_lifecycle_in_a_loop";
         String destIndex = transformId + "-dest";
         for (int i = 0; i < 100; ++i) {
             try {
@@ -108,7 +108,48 @@ public class TransformRobustnessIT extends TransformRestTestCase {
                 // Delete the transform
                 deleteTransform(transformId);
             } catch (AssertionError | Exception e) {
-                fail("Failure at iteration " + i + ": " + e.getMessage());
+                throw new AssertionError(format("Failure at iteration %d: %s", i, e.getMessage()), e);
+            }
+        }
+    }
+
+    public void testContinuousTransformLifecycleInALoop() throws Exception {
+        createReviewsIndex();
+
+        String transformId = "test_cont_lifecycle_in_a_loop";
+        String destIndex = transformId + "-dest";
+        for (int i = 0; i < 100; ++i) {
+            long sleepAfterStartMillis = randomLongBetween(0, 5_000);
+            boolean force = randomBoolean();
+            try {
+                // Create the continuous transform.
+                createContinuousPivotReviewsTransform(transformId, destIndex, null);
+                assertThat(getTransformTasks(), is(empty()));
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
+
+                startTransform(transformId);
+                // There is 1 transform task after start.
+                assertThat(getTransformTasks(), hasSize(1));
+                assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
+
+                Thread.sleep(sleepAfterStartMillis);
+                // There should still be 1 transform task as the transform is continuous.
+                assertThat(getTransformTasks(), hasSize(1));
+                assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
+
+                // Stop the transform with force set randomly.
+                stopTransform(transformId, force);
+                // After the transform is stopped, there should be no transform task left.
+                assertThat(getTransformTasks(), is(empty()));
+                assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
+
+                // Delete the transform.
+                deleteTransform(transformId);
+            } catch (AssertionError | Exception e) {
+                throw new AssertionError(
+                    format("Failure at iteration %d (sleepAfterStart=%sms,force=%s): %s", i, sleepAfterStartMillis, force, e.getMessage()),
+                    e
+                );
             }
         }
     }
@@ -168,7 +209,7 @@ public class TransformRobustnessIT extends TransformRestTestCase {
     }
 
     private static String createConfig(String sourceIndex, String destIndex) {
-        return Strings.format("""
+        return format("""
             {
               "source": {
                 "index": "%s"

+ 9 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java

@@ -65,6 +65,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         createContinuousPivotReviewsTransform(transformId, transformIndex, null);
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
         startTransform(transformId);
         awaitState(transformId, TransformStats.State.FAILED);
@@ -78,6 +79,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));
 
         assertThat(getTransformTasks(), hasSize(1));
+        assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
         // verify that we cannot stop a failed transform
         ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
@@ -99,6 +101,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
     }
 
     public void testForceResetFailedTransform() throws Exception {
@@ -109,6 +112,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         createContinuousPivotReviewsTransform(transformId, transformIndex, null);
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
         startTransform(transformId);
         awaitState(transformId, TransformStats.State.FAILED);
@@ -122,6 +126,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));
 
         assertThat(getTransformTasks(), hasSize(1));
+        assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
         // verify that we cannot reset a failed transform
         ResponseException ex = expectThrows(ResponseException.class, () -> resetTransform(transformId, false));
@@ -135,6 +140,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         resetTransform(transformId, true);
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
     }
 
     public void testStartFailedTransform() throws Exception {
@@ -145,6 +151,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         createContinuousPivotReviewsTransform(transformId, transformIndex, null);
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
 
         startTransform(transformId);
         awaitState(transformId, TransformStats.State.FAILED);
@@ -158,6 +165,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         assertThat((String) XContentMapValues.extractValue("reason", fullState), startsWith(failureReason));
 
         assertThat(getTransformTasks(), hasSize(1));
+        assertThat(getTransformTasksFromClusterState(transformId), hasSize(1));
 
         var expectedFailure = "Unable to start transform [test-force-start-failed-transform] "
             + "as it is in a failed state. Use force stop and then restart the transform once error is resolved. More details: ["
@@ -172,6 +180,7 @@ public class TransformTaskFailedStateIT extends TransformRestTestCase {
         stopTransform(transformId, true);
 
         assertThat(getTransformTasks(), is(empty()));
+        assertThat(getTransformTasksFromClusterState(transformId), is(empty()));
     }
 
     private void awaitState(String transformId, TransformStats.State state) throws Exception {

+ 41 - 50
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

@@ -164,18 +164,23 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
                         state
                     );
 
-                    final ActionListener<Response> doExecuteListener;
-                    if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
-                        doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
-                    } else {
-                        doExecuteListener = finalListener;
-                    }
+                    final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
+                        transformNodeAssignments.getWaitingForAssignment(),
+                        finalListener
+                    );
 
-                    if (transformNodeAssignments.getExecutorNodes().size() > 0) {
+                    if (request.isForce()) {
+                        // When force==true, we **do not** fan out to individual tasks (i.e. taskOperation method will not be called) as we
+                        // want to make sure that the persistent tasks will be removed from cluster state even if these tasks are no longer
+                        // visible by the PersistentTasksService.
+                        cancelTransformTasksListener(transformNodeAssignments.getAssigned(), doExecuteListener).onResponse(
+                            new Response(true)
+                        );
+                    } else if (transformNodeAssignments.getExecutorNodes().isEmpty()) {
+                        doExecuteListener.onResponse(new Response(true));
+                    } else {
                         request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
                         super.doExecute(task, request, doExecuteListener);
-                    } else {
-                        doExecuteListener.onResponse(new Response(true));
                     }
                 }, e -> {
                     if (e instanceof ResourceNotFoundException) {
@@ -189,13 +194,10 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
                             listener.onFailure(e);
                             // found transforms without a config
                         } else if (request.isForce()) {
-                            final ActionListener<Response> doExecuteListener;
-
-                            if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
-                                doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
-                            } else {
-                                doExecuteListener = finalListener;
-                            }
+                            final ActionListener<Response> doExecuteListener = cancelTransformTasksListener(
+                                transformNodeAssignments.getWaitingForAssignment(),
+                                finalListener
+                            );
 
                             if (transformNodeAssignments.getExecutorNodes().size() > 0) {
                                 request.setExpandedIds(transformNodeAssignments.getAssigned());
@@ -235,7 +237,6 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
         TransformTask transformTask,
         ActionListener<Response> listener
     ) {
-
         Set<String> ids = request.getExpandedIds();
         if (ids == null) {
             listener.onFailure(new IllegalStateException("Request does not have expandedIds set"));
@@ -243,20 +244,6 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
         }
 
         if (ids.contains(transformTask.getTransformId())) {
-            if (request.isForce()) {
-                // If force==true, we skip the additional step (setShouldStopAtCheckpoint) and move directly to shutting down the task.
-                // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners).
-                try {
-                    // Here the task is deregistered in scheduler and marked as completed in persistent task service.
-                    transformTask.shutdown();
-                    // Here the indexer is aborted so that its thread finishes work ASAP.
-                    transformTask.onCancelled();
-                    listener.onResponse(new Response(true));
-                } catch (ElasticsearchException ex) {
-                    listener.onFailure(ex);
-                }
-                return;
-            }
             // move the call to the generic thread pool, so we do not block the network thread
             threadPool.generic().execute(() -> {
                 transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> {
@@ -306,7 +293,6 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
     }
 
     private ActionListener<Response> waitForStopListener(Request request, ActionListener<Response> listener) {
-
         ActionListener<Response> onStopListener = ActionListener.wrap(
             waitResponse -> transformConfigManager.refresh(ActionListener.wrap(r -> listener.onResponse(waitResponse), e -> {
                 if ((ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
@@ -393,6 +379,7 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
     ) {
         // This map is accessed in the predicate and the listener callbacks
         final Map<String, ElasticsearchException> exceptions = new ConcurrentHashMap<>();
+
         persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetadata -> {
             if (persistentTasksCustomMetadata == null) {
                 return true;
@@ -501,34 +488,38 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
         }));
     }
 
-    private ActionListener<Response> cancelTransformTasksWithNoAssignment(
-        final ActionListener<Response> finalListener,
-        final TransformNodeAssignments transformNodeAssignments
+    /**
+     * Creates and returns the listener that sends remove request for every task in the given set.
+     *
+     * @param transformTasks set of transform tasks that should be removed
+     * @param finalListener listener that should be called once all the given tasks are removed
+     * @return listener that removes given tasks in parallel
+     */
+    private ActionListener<Response> cancelTransformTasksListener(
+        final Set<String> transformTasks,
+        final ActionListener<Response> finalListener
     ) {
-        final ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
+        if (transformTasks.isEmpty()) {
+            return finalListener;
+        }
+        return ActionListener.wrap(response -> {
             GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
-                transformNodeAssignments.getWaitingForAssignment().size(),
-                ActionListener.wrap(r -> {
-                    finalListener.onResponse(response);
-                }, finalListener::onFailure)
+                transformTasks.size(),
+                ActionListener.wrap(r -> finalListener.onResponse(response), finalListener::onFailure)
             );
 
-            for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
-                persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener);
+            for (String taskId : transformTasks) {
+                persistentTasksService.sendRemoveRequest(taskId, null, groupedListener);
             }
-
         }, e -> {
             GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
-                transformNodeAssignments.getWaitingForAssignment().size(),
-                ActionListener.wrap(r -> {
-                    finalListener.onFailure(e);
-                }, finalListener::onFailure)
+                transformTasks.size(),
+                ActionListener.wrap(r -> finalListener.onFailure(e), finalListener::onFailure)
             );
 
-            for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
-                persistentTasksService.sendRemoveRequest(unassignedTaskId, null, groupedListener);
+            for (String taskId : transformTasks) {
+                persistentTasksService.sendRemoveRequest(taskId, null, groupedListener);
             }
         });
-        return doExecuteListener;
     }
 }

+ 14 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java

@@ -65,4 +65,18 @@ public final class TransformNodeAssignments {
     public Set<String> getStopped() {
         return stopped;
     }
+
+    @Override
+    public String toString() {
+        return new StringBuilder("TransformNodeAssignments[").append("executorNodes=")
+            .append(executorNodes)
+            .append(",assigned=")
+            .append(assigned)
+            .append(",waitingForAssignment=")
+            .append(waitingForAssignment)
+            .append(",stopped=")
+            .append(stopped)
+            .append("]")
+            .toString();
+    }
 }

+ 46 - 6
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java

@@ -9,8 +9,6 @@ package org.elasticsearch.xpack.transform.transforms;
 
 import org.elasticsearch.test.ESTestCase;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -19,10 +17,11 @@ import static org.hamcrest.Matchers.is;
 public class TransformNodeAssignmentsTests extends ESTestCase {
 
     public void testConstructorAndGetters() {
-        Set<String> executorNodes = new HashSet<>(Arrays.asList("executor-1", "executor-2"));
-        Set<String> assigned = new HashSet<>(Arrays.asList("assigned-1", "assigned-2"));
-        Set<String> waitingForAssignment = new HashSet<>(Arrays.asList("waiting-1", "waitingv-2"));
-        Set<String> stopped = new HashSet<>(Arrays.asList("stopped-1", "stopped-2"));
+        Set<String> executorNodes = Set.of("executor-1", "executor-2");
+        Set<String> assigned = Set.of("assigned-1", "assigned-2");
+        Set<String> waitingForAssignment = Set.of("waiting-1", "waiting-2");
+        Set<String> stopped = Set.of("stopped-1", "stopped-2");
+
         TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
 
         assertThat(assignments.getExecutorNodes(), is(equalTo(executorNodes)));
@@ -30,4 +29,45 @@ public class TransformNodeAssignmentsTests extends ESTestCase {
         assertThat(assignments.getWaitingForAssignment(), is(equalTo(waitingForAssignment)));
         assertThat(assignments.getStopped(), is(equalTo(stopped)));
     }
+
+    public void testToString() {
+        Set<String> executorNodes = Set.of("executor-1");
+        Set<String> assigned = Set.of("assigned-1");
+        Set<String> waitingForAssignment = Set.of("waiting-1");
+        Set<String> stopped = Set.of("stopped-1");
+
+        TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
+
+        assertThat(
+            assignments.toString(),
+            is(
+                equalTo(
+                    "TransformNodeAssignments["
+                        + "executorNodes=[executor-1],"
+                        + "assigned=[assigned-1],"
+                        + "waitingForAssignment=[waiting-1],"
+                        + "stopped=[stopped-1]"
+                        + "]"
+                )
+            )
+        );
+    }
+
+    public void testToString_EmptyCollections() {
+        Set<String> executorNodes = Set.of();
+        Set<String> assigned = Set.of();
+        Set<String> waitingForAssignment = Set.of();
+        Set<String> stopped = Set.of();
+
+        TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
+
+        assertThat(
+            assignments.toString(),
+            is(
+                equalTo(
+                    "TransformNodeAssignments[" + "executorNodes=[]," + "assigned=[]," + "waitingForAssignment=[]," + "stopped=[]" + "]"
+                )
+            )
+        );
+    }
 }