Browse Source

Removing local abort availability checks (#75785)

Now that #74115 is backported to 7.x the code to check whether
local abort is supported within a cluster is redundant, as 8.x
only supports running in a mixed cluster with 7.last, and 7.last
contains the local abort functionality.

This change removes the redundant code.

Followup to #74115
David Roberts 4 years ago
parent
commit
f0008d3c27

+ 0 - 11
server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

@@ -122,28 +122,17 @@ public class AllocatedPersistentTask extends CancellableTask {
      * reassigned to the same node unless separate measures have been taken to prevent
      * this. The task should complete any graceful shutdown actions before calling this
      * method.
-     * @throws IllegalStateException This exception will be thrown if the cluster contains
-     *                               nodes that are too old to understand the concept of
-     *                               locally aborting tasks. In this situation callers
-     *                               should revert to whatever the functionality was prior
-     *                               to local abort being possible. It is possible to check
-     *                               if local abort is possible before starting a sequence
-     *                               of steps that will end in a local abort by calling
-     *                               {@link PersistentTasksService#isLocalAbortSupported}.
      * @param localAbortReason Reason for the task being aborted on this node. This
      *                         will be recorded as the reason for unassignment of the
      *                         persistent task.
      */
     public void markAsLocallyAborted(String localAbortReason) {
-        persistentTasksService.validateLocalAbortSupported();
         completeAndNotifyIfNeeded(null, Objects.requireNonNull(localAbortReason));
     }
 
     private void completeAndNotifyIfNeeded(@Nullable Exception failure, @Nullable String localAbortReason) {
         assert failure == null || localAbortReason == null
             : "completion notification has both exception " + failure + " and local abort reason " + localAbortReason;
-        assert localAbortReason == null || persistentTasksService.isLocalAbortSupported()
-            : "local abort reason provided to inner implementation when it is not supported: " + localAbortReason;
         final State desiredState = (localAbortReason == null) ? State.COMPLETED : State.LOCAL_ABORTED;
         final State prevState = state.getAndUpdate(
             currentState -> (currentState != State.COMPLETED && currentState != State.LOCAL_ABORTED) ? desiredState : currentState);

+ 2 - 19
server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

@@ -7,7 +7,6 @@
  */
 package org.elasticsearch.persistent;
 
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
@@ -42,8 +41,6 @@ public class CompletionPersistentTaskAction extends ActionType<PersistentTaskRes
     public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
     public static final String NAME = "cluster:admin/persistent/completion";
 
-    public static final Version LOCAL_ABORT_AVAILABLE_VERSION = Version.V_7_15_0;
-
     private CompletionPersistentTaskAction() {
         super(NAME, PersistentTaskResponse::new);
     }
@@ -65,9 +62,7 @@ public class CompletionPersistentTaskAction extends ActionType<PersistentTaskRes
             taskId = in.readString();
             allocationId = in.readLong();
             exception = in.readException();
-            if (in.getVersion().onOrAfter(LOCAL_ABORT_AVAILABLE_VERSION)) {
-                localAbortReason = in.readOptionalString();
-            }
+            localAbortReason = in.readOptionalString();
         }
 
         public Request(String taskId, long allocationId, Exception exception, String localAbortReason) {
@@ -79,23 +74,11 @@ public class CompletionPersistentTaskAction extends ActionType<PersistentTaskRes
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
-            if (localAbortReason != null && out.getVersion().before(LOCAL_ABORT_AVAILABLE_VERSION)) {
-                // This case cannot be handled by simply not serializing the new field, as the
-                // old master node would then treat it as a signal that the task should NOT be
-                // reassigned to a different node, i.e. completely different semantics. We
-                // should never get here in reality, as this action is for internal use only
-                // (it has no REST layer) and the places where it's called defend against this
-                // situation.
-                throw new IllegalArgumentException("attempt to abort a persistent task locally in a cluster that contains a node that is "
-                    + "too old: found node version [" + out.getVersion() + "], minimum required [" + LOCAL_ABORT_AVAILABLE_VERSION + "]");
-            }
             super.writeTo(out);
             out.writeString(taskId);
             out.writeLong(allocationId);
             out.writeException(exception);
-            if (out.getVersion().onOrAfter(LOCAL_ABORT_AVAILABLE_VERSION)) {
-                out.writeOptionalString(localAbortReason);
-            }
+            out.writeOptionalString(localAbortReason);
         }
 
         @Override

+ 0 - 34
server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

@@ -9,7 +9,6 @@ package org.elasticsearch.persistent;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRequest;
@@ -29,8 +28,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.function.Predicate;
 
-import static org.elasticsearch.persistent.CompletionPersistentTaskAction.LOCAL_ABORT_AVAILABLE_VERSION;
-
 /**
  * This service is used by persistent tasks and allocated persistent tasks to communicate changes
  * to the master node so that the master can update the cluster state and can track of the states
@@ -71,18 +68,12 @@ public class PersistentTasksService {
      * At most one of {@code failure} and {@code localAbortReason} may be
      * provided. When both {@code failure} and {@code localAbortReason} are
      * {@code null}, the persistent task is considered as successfully completed.
-     * {@code localAbortReason} must not be provided unless all nodes in the cluster
-     * are on version {@link CompletionPersistentTaskAction#LOCAL_ABORT_AVAILABLE_VERSION}
-     * or higher.
      */
     public void sendCompletionRequest(final String taskId,
                                       final long taskAllocationId,
                                       final @Nullable Exception taskFailure,
                                       final @Nullable String localAbortReason,
                                       final ActionListener<PersistentTask<?>> listener) {
-        if (localAbortReason != null) {
-            validateLocalAbortSupported();
-        }
         CompletionPersistentTaskAction.Request request =
             new CompletionPersistentTaskAction.Request(taskId, taskAllocationId, taskFailure, localAbortReason);
         execute(request, CompletionPersistentTaskAction.INSTANCE, listener);
@@ -125,31 +116,6 @@ public class PersistentTasksService {
         execute(request, RemovePersistentTaskAction.INSTANCE, listener);
     }
 
-    /**
-     * Is the cluster able to support locally aborting persistent tasks?
-     * This requires that every node in the cluster is on version
-     * {@link CompletionPersistentTaskAction#LOCAL_ABORT_AVAILABLE_VERSION}
-     * or above.
-     */
-    public boolean isLocalAbortSupported() {
-        return isLocalAbortSupported(clusterService.state());
-    }
-
-    public static boolean isLocalAbortSupported(ClusterState state) {
-        return state.nodes().getMinNodeVersion().onOrAfter(LOCAL_ABORT_AVAILABLE_VERSION);
-    }
-
-    /**
-     * Throw an exception if the cluster is not able locally abort persistent tasks.
-     */
-    public void validateLocalAbortSupported() {
-        Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
-        if (minNodeVersion.before(LOCAL_ABORT_AVAILABLE_VERSION)) {
-            throw new IllegalStateException("attempt to abort a persistent task locally in a cluster that does not support this: "
-                + "minimum node version [" + minNodeVersion + "], version required [" + LOCAL_ABORT_AVAILABLE_VERSION + "]");
-        }
-    }
-
     /**
      * Executes an asynchronous persistent task action using the client.
      * <p>

+ 0 - 15
server/src/test/java/org/elasticsearch/persistent/CompletionPersistentTaskRequestTests.java

@@ -7,16 +7,10 @@
  */
 package org.elasticsearch.persistent;
 
-import org.elasticsearch.Version;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.persistent.CompletionPersistentTaskAction.Request;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class CompletionPersistentTaskRequestTests extends AbstractWireSerializingTestCase<Request> {
 
     @Override
@@ -32,13 +26,4 @@ public class CompletionPersistentTaskRequestTests extends AbstractWireSerializin
     protected Writeable.Reader<Request> instanceReader() {
         return Request::new;
     }
-
-    public void testSerializeToOldNodeThrows() {
-        Request request = new Request(randomAlphaOfLength(10), randomNonNegativeLong(), null, randomAlphaOfLength(20));
-        StreamOutput out = mock(StreamOutput.class);
-        when(out.getVersion()).thenReturn(Version.V_7_14_0);
-        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> request.writeTo(out));
-        assertThat(e.getMessage(), equalTo("attempt to abort a persistent task locally in a cluster that contains a node that is too "
-            + "old: found node version [7.14.0], minimum required [7.15.0]"));
-    }
 }

+ 0 - 33
server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

@@ -243,18 +243,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
                                               final ActionListener<PersistentTask<?>> listener) {
                 fail("Shouldn't be called during Cluster State cancellation");
             }
-
-            @Override
-            public void validateLocalAbortSupported() {
-                if (isLocalAbortSupported() == false) {
-                    fail("this test should not cover local abort");
-                }
-            }
-
-            @Override
-            public boolean isLocalAbortSupported() {
-                return randomBoolean();
-            }
         };
         @SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
         when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@@ -341,15 +329,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
                 capturedLocalAbortReason.set(localAbortReason);
                 assertThat(taskFailure, nullValue());
             }
-
-            @Override
-            public void validateLocalAbortSupported() {
-            }
-
-            @Override
-            public boolean isLocalAbortSupported() {
-                return true;
-            }
         };
         @SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
         when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@@ -436,18 +415,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
                 listener.onResponse(mock(PersistentTask.class));
                 latch.countDown();
             }
-
-            @Override
-            public void validateLocalAbortSupported() {
-                if (isLocalAbortSupported() == false) {
-                    fail("this test should not cover local abort");
-                }
-            }
-
-            @Override
-            public boolean isLocalAbortSupported() {
-                return randomBoolean();
-            }
         };
 
         @SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);

+ 0 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

@@ -10,7 +10,6 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.component.LifecycleListener;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
-import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
 import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
@@ -74,12 +73,6 @@ public class MlLifeCycleService {
     }
 
     static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) {
-        // If we are in a mixed version cluster that doesn't support locally aborting persistent tasks then
-        // we cannot perform graceful shutdown, so just revert to the behaviour of previous versions where
-        // the node shutdown API didn't exist
-        if (PersistentTasksService.isLocalAbortSupported(state) == false) {
-            return true;
-        }
         PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
         // TODO: currently only considering anomaly detection jobs - could extend in the future
         // Ignore failed jobs - the persistent task still exists to remember the failure (because no
@@ -101,14 +94,6 @@ public class MlLifeCycleService {
     }
 
     void signalGracefulShutdown(ClusterState state, Collection<String> shutdownNodeIds) {
-
-        // If we are in a mixed version cluster that doesn't support locally aborting persistent tasks then
-        // we cannot perform graceful shutdown, so just revert to the behaviour of previous versions where
-        // the node shutdown API didn't exist
-        if (PersistentTasksService.isLocalAbortSupported(state) == false) {
-            return;
-        }
-
         if (shutdownNodeIds.contains(state.nodes().getLocalNodeId())) {
 
             datafeedRunner.vacateAllDatafeedsOnThisNode(