Browse Source

Add new setting to disable persistent tasks allocations (#29137)

This commit adds a new setting `cluster.persistent_tasks.allocation.enable`
that can be used to enable or disable the allocation of persistent tasks.
The setting accepts the values `all` (default) or `none`. When set to
none, the persistent tasks that are created (or that must be reassigned)
won't be assigned to a node but will reside in the cluster state with
a no "executor node" and a reason describing why it is not assigned:

```
"assignment" : {
  "executor_node" : null,
  "explanation" : "persistent task [foo/bar] cannot be assigned [no
  persistent task assignments are allowed due to cluster settings]"
}
```
Tanguy Leroux 7 years ago
parent
commit
edf27a599e

+ 26 - 0
docs/reference/modules/cluster/misc.asciidoc

@@ -56,3 +56,29 @@ PUT /_cluster/settings
 }
 -------------------------------
 // CONSOLE
+
+
+[[persistent-tasks-allocation]]
+==== Persistent Tasks Allocations
+
+Plugins can create a kind of tasks called persistent tasks. Those tasks are
+usually long-live tasks and are stored in the cluster state, allowing the
+tasks to be revived after a full cluster restart.
+
+Every time a persistent task is created, the master nodes takes care of
+assigning the task to a node of the cluster, and the assigned node will then
+pick up the task and execute it locally. The process of assigning persistent
+tasks to nodes is controlled by the following property, which can be updated
+dynamically:
+
+`cluster.persistent_tasks.allocation.enable`::
++
+--
+Enable or disable allocation for persistent tasks:
+
+* `all` -             (default) Allows persistent tasks to be assigned to nodes
+* `none` -            No allocations are allowed for any type of persistent task
+
+This setting does not affect the persistent tasks that are already being executed.
+Only newly created persistent tasks, or tasks that must be reassigned (after a node
+left the cluster, for example), are impacted by this setting.

+ 3 - 1
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -79,6 +79,7 @@ import org.elasticsearch.monitor.jvm.JvmService;
 import org.elasticsearch.monitor.os.OsService;
 import org.elasticsearch.monitor.process.ProcessService;
 import org.elasticsearch.node.Node;
+import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -420,6 +421,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
                     Node.BREAKER_TYPE_KEY,
                     OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
-                    IndexGraveyard.SETTING_MAX_TOMBSTONES
+                    IndexGraveyard.SETTING_MAX_TOMBSTONES,
+                    EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
             )));
 }

+ 18 - 2
server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

@@ -34,6 +34,8 @@ import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
+import org.elasticsearch.persistent.decider.AssignmentDecision;
+import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
 import org.elasticsearch.tasks.Task;
 
 import java.util.Objects;
@@ -45,12 +47,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements
 
     private final ClusterService clusterService;
     private final PersistentTasksExecutorRegistry registry;
+    private final EnableAssignmentDecider decider;
 
     public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) {
         super(settings);
         this.clusterService = clusterService;
         clusterService.addListener(this);
         this.registry = registry;
+        this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
     }
 
     /**
@@ -224,6 +228,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
                                                                               final @Nullable Params taskParams,
                                                                               final ClusterState currentState) {
         PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
+
+        AssignmentDecision decision = decider.canAssign();
+        if (decision.getType() == AssignmentDecision.Type.NO) {
+            return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
+        }
+
         return persistentTasksExecutor.getAssignment(taskParams, currentState);
     }
 
@@ -249,7 +259,8 @@ public class PersistentTasksClusterService extends AbstractComponent implements
 
     /**
      * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
-     * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
+     * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the
+     * persistent tasks changed.
      */
     boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
         final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
@@ -259,7 +270,12 @@ public class PersistentTasksClusterService extends AbstractComponent implements
 
         boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false;
 
-        if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) {
+        if (persistentTasksChanged(event)
+            || event.nodesChanged()
+            || event.routingTableChanged()
+            || event.metaDataChanged()
+            || masterChanged) {
+
             for (PersistentTask<?> task : tasks.tasks()) {
                 if (needsReassignment(task.getAssignment(), event.state().nodes())) {
                     Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());

+ 72 - 0
server/src/main/java/org/elasticsearch/persistent/decider/AssignmentDecision.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent.decider;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * {@link AssignmentDecision} represents the decision made during the process of
+ * assigning a persistent task to a node of the cluster.
+ *
+ * @see EnableAssignmentDecider
+ */
+public final class AssignmentDecision {
+
+    public static final AssignmentDecision YES = new AssignmentDecision(Type.YES, "");
+
+    private final Type type;
+    private final String reason;
+
+    public AssignmentDecision(final Type type, final String reason) {
+        this.type = Objects.requireNonNull(type);
+        this.reason = Objects.requireNonNull(reason);
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+    @Override
+    public String toString() {
+        return "assignment decision [type=" + type + ", reason=" + reason + "]";
+    }
+
+    public enum Type {
+        NO(0), YES(1);
+
+        private final int id;
+
+        Type(int id) {
+            this.id = id;
+        }
+
+        public int getId() {
+            return id;
+        }
+
+        public static Type resolve(final String s) {
+            return Type.valueOf(s.toUpperCase(Locale.ROOT));
+        }
+    }
+}

+ 101 - 0
server/src/main/java/org/elasticsearch/persistent/decider/EnableAssignmentDecider.java

@@ -0,0 +1,101 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent.decider;
+
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.Locale;
+
+import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
+import static org.elasticsearch.common.settings.Setting.Property.NodeScope;
+
+/**
+ * {@link EnableAssignmentDecider} is used to allow/disallow the persistent tasks
+ * to be assigned to cluster nodes.
+ * <p>
+ * Allocation settings can have the following values (non-casesensitive):
+ * <ul>
+ *     <li> <code>NONE</code> - no persistent tasks can be assigned
+ *     <li> <code>ALL</code> - all persistent tasks can be assigned to nodes
+ * </ul>
+ *
+ * @see Allocation
+ */
+public class EnableAssignmentDecider {
+
+    public static final Setting<Allocation> CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING =
+        new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope);
+
+    private volatile Allocation enableAssignment;
+
+    public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) {
+        this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings);
+        clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment);
+    }
+
+    public void setEnableAssignment(final Allocation enableAssignment) {
+        this.enableAssignment = enableAssignment;
+    }
+
+    /**
+     * Returns a {@link AssignmentDecision} whether the given persistent task can be assigned
+     * to a node of the cluster. The decision depends on the current value of the setting
+     * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}.
+     *
+     * @return the {@link AssignmentDecision}
+     */
+    public AssignmentDecision canAssign() {
+        if (enableAssignment == Allocation.NONE) {
+            return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings");
+        }
+        return AssignmentDecision.YES;
+    }
+
+    /**
+     * Allocation values or rather their string representation to be used used with
+     * {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}
+     * via cluster settings.
+     */
+    public enum Allocation {
+
+        NONE,
+        ALL;
+
+        public static Allocation fromString(final String strValue) {
+            if (strValue == null) {
+                return null;
+            } else {
+                String value = strValue.toUpperCase(Locale.ROOT);
+                try {
+                    return valueOf(value);
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("Illegal value [" + value + "] for ["
+                        + CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "]");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/persistent/package-info.java

@@ -30,7 +30,7 @@
  * task.
  * <p>
  * 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate
- * that there is a new persistent task is running in the system.
+ * that there is a new persistent task running in the system.
  * <p>
  * 3. The {@link org.elasticsearch.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in
  * the cluster state and starts execution of all new tasks assigned to the node it is running on.

+ 86 - 4
server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

@@ -36,9 +36,16 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
 import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
+import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,14 +59,41 @@ import static java.util.Collections.singleton;
 import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment;
 import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
 import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
+import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
-import static org.mockito.Mockito.mock;
 
 public class PersistentTasksClusterServiceTests extends ESTestCase {
 
+    /** Needed by {@link ClusterService} **/
+    private static ThreadPool threadPool;
+    /** Needed by {@link PersistentTasksClusterService} **/
+    private ClusterService clusterService;
+
+    @BeforeClass
+    public static void setUpThreadPool() {
+        threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        clusterService = createClusterService(threadPool);
+    }
+
+    @AfterClass
+    public static void tearDownThreadPool() throws Exception {
+        terminate(threadPool);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        clusterService.close();
+    }
+
     public void testReassignmentRequired() {
         final PersistentTasksClusterService service = createService((params, clusterState) ->
             "never_assign".equals(((TestParams) params).getTestParam()) ? NO_NODE_FOUND : randomNodeAssignment(clusterState.nodes())
@@ -81,6 +115,55 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         }
     }
 
+    public void testReassignmentRequiredOnMetadataChanges() {
+        EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values());
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+            .add(new DiscoveryNode("_node", buildNewFakeTransportAddress(), Version.CURRENT))
+            .localNodeId("_node")
+            .masterNodeId("_node")
+            .build();
+
+        boolean unassigned = randomBoolean();
+        PersistentTasksCustomMetaData tasks = PersistentTasksCustomMetaData.builder()
+            .addTask("_task_1", TestPersistentTasksExecutor.NAME, null, new Assignment(unassigned ? null : "_node", "_reason"))
+            .build();
+
+        MetaData metaData = MetaData.builder()
+            .putCustom(PersistentTasksCustomMetaData.TYPE, tasks)
+            .persistentSettings(Settings.builder()
+                    .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString())
+                    .build())
+            .build();
+
+        ClusterState previous = ClusterState.builder(new ClusterName("_name"))
+            .nodes(nodes)
+            .metaData(metaData)
+            .build();
+
+        ClusterState current;
+
+        final boolean changed = randomBoolean();
+        if (changed) {
+            allocation = randomValueOtherThan(allocation, () -> randomFrom(EnableAssignmentDecider.Allocation.values()));
+
+            current = ClusterState.builder(previous)
+                .metaData(MetaData.builder(previous.metaData())
+                    .persistentSettings(Settings.builder()
+                        .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString())
+                        .build())
+                    .build())
+                .build();
+        } else {
+            current = ClusterState.builder(previous).build();
+        }
+
+        final ClusterChangedEvent event = new ClusterChangedEvent("test", current, previous);
+
+        final PersistentTasksClusterService service = createService((params, clusterState) -> randomNodeAssignment(clusterState.nodes()));
+        assertThat(dumpEvent(event), service.shouldReassignPersistentTasks(event), equalTo(changed && unassigned));
+    }
+
     public void testReassignTasksWithNoTasks() {
         ClusterState clusterState = initialState();
         assertThat(reassign(clusterState).metaData().custom(PersistentTasksCustomMetaData.TYPE), nullValue());
@@ -527,7 +610,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
                 Version.CURRENT);
     }
 
-
     private ClusterState initialState() {
         MetaData.Builder metaData = MetaData.builder();
         RoutingTable.Builder routingTable = RoutingTable.builder();
@@ -558,7 +640,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
     }
 
     /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/
-    static <P extends PersistentTaskParams> PersistentTasksClusterService createService(final BiFunction<P, ClusterState, Assignment> fn) {
+    private <P extends PersistentTaskParams> PersistentTasksClusterService createService(final BiFunction<P, ClusterState, Assignment> fn) {
         PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
             singleton(new PersistentTasksExecutor<P>(Settings.EMPTY, TestPersistentTasksExecutor.NAME, null) {
                 @Override
@@ -571,6 +653,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
                     throw new UnsupportedOperationException();
                 }
             }));
-        return new PersistentTasksClusterService(Settings.EMPTY, registry, mock(ClusterService.class));
+        return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService);
     }
 }

+ 134 - 0
server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java

@@ -0,0 +1,134 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.function.Predicate;
+
+import static java.util.Collections.emptyList;
+import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
+
+public abstract class PersistentTasksDecidersTestCase extends ESTestCase {
+
+    /** Needed by {@link ClusterService} **/
+    private static ThreadPool threadPool;
+    /** Needed by {@link PersistentTasksClusterService} **/
+    private ClusterService clusterService;
+
+    private PersistentTasksClusterService persistentTasksClusterService;
+
+    @BeforeClass
+    public static void setUpThreadPool() {
+        threadPool = new TestThreadPool(getTestClass().getSimpleName());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        clusterService = createClusterService(threadPool);
+        PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(clusterService.getSettings(), emptyList()) {
+            @Override
+            public <Params extends PersistentTaskParams> PersistentTasksExecutor<Params> getPersistentTaskExecutorSafe(String taskName) {
+                return new PersistentTasksExecutor<Params>(clusterService.getSettings(), taskName, null) {
+                    @Override
+                    protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) {
+                        logger.debug("Executing task {}", task);
+                    }
+                };
+            }
+        };
+        persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService);
+    }
+
+    @AfterClass
+    public static void tearDownThreadPool() throws Exception {
+        terminate(threadPool);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        clusterService.close();
+    }
+
+    protected ClusterState reassign(final ClusterState clusterState) {
+        return persistentTasksClusterService.reassignTasks(clusterState);
+    }
+
+    protected void updateSettings(final Settings settings) {
+        ClusterSettings clusterSettings = clusterService.getClusterSettings();
+        Settings.Builder updated = Settings.builder();
+        clusterSettings.updateDynamicSettings(settings, updated, Settings.builder(), getTestClass().getName());
+        clusterSettings.applySettings(updated.build());
+    }
+
+    protected static ClusterState createClusterStateWithTasks(final int nbNodes, final int nbTasks) {
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
+        for (int i = 0; i < nbNodes; i++) {
+            nodes.add(new DiscoveryNode("_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
+        }
+
+        PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
+        for (int i = 0; i < nbTasks; i++) {
+            tasks.addTask("_task_" + i, "test", null, new PersistentTasksCustomMetaData.Assignment(null, "initialized"));
+        }
+
+        MetaData metaData = MetaData.builder()
+            .putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build())
+            .build();
+
+        return ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).metaData(metaData).build();
+    }
+
+    /** Asserts that the given cluster state contains nbTasks tasks that are assigned **/
+    protected static void assertNbAssignedTasks(final long nbTasks, final ClusterState clusterState) {
+        assertPersistentTasks(nbTasks, clusterState, PersistentTasksCustomMetaData.PersistentTask::isAssigned);
+    }
+
+    /** Asserts that the given cluster state contains nbTasks tasks that are NOT assigned **/
+    protected static void assertNbUnassignedTasks(final long nbTasks, final ClusterState clusterState) {
+        assertPersistentTasks(nbTasks, clusterState, task -> task.isAssigned() == false);
+    }
+
+    /** Asserts that the cluster state contains nbTasks tasks that verify the given predicate **/
+    protected static void assertPersistentTasks(final long nbTasks,
+                                              final ClusterState clusterState,
+                                              final Predicate<PersistentTasksCustomMetaData.PersistentTask> predicate) {
+        PersistentTasksCustomMetaData tasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
+        assertNotNull("Persistent tasks must be not null", tasks);
+        assertEquals(nbTasks, tasks.tasks().stream().filter(predicate).count());
+    }
+}

+ 33 - 0
server/src/test/java/org/elasticsearch/persistent/decider/AssignmentDecisionTests.java

@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent.decider;
+
+import org.elasticsearch.test.ESTestCase;
+
+public class AssignmentDecisionTests extends ESTestCase {
+
+    public void testConstantsTypes() {
+        assertEquals(AssignmentDecision.Type.YES, AssignmentDecision.YES.getType());
+    }
+
+    public void testResolveFromType() {
+        final AssignmentDecision.Type expected = randomFrom(AssignmentDecision.Type.values());
+        assertEquals(expected, AssignmentDecision.Type.resolve(expected.toString()));
+    }
+}

+ 173 - 0
server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java

@@ -0,0 +1,173 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent.decider;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.persistent.PersistentTaskParams;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
+import org.elasticsearch.persistent.PersistentTasksService;
+import org.elasticsearch.persistent.TestPersistentTasksPlugin;
+import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
+import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Collections.singletonList;
+import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.Allocation;
+import static org.elasticsearch.persistent.decider.EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(minNumDataNodes = 1)
+public class EnableAssignmentDeciderIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return singletonList(TestPersistentTasksPlugin.class);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> transportClientPlugins() {
+        return nodePlugins();
+    }
+
+    @Override
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+
+    /**
+     * Test that the {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING} setting correctly
+     * prevents persistent tasks to be assigned after a cluster restart.
+     */
+    public void testEnableAssignmentAfterRestart() throws Exception {
+        final int numberOfTasks = randomIntBetween(1, 10);
+        logger.trace("creating {} persistent tasks", numberOfTasks);
+
+        final CountDownLatch latch = new CountDownLatch(numberOfTasks);
+        for (int i = 0; i < numberOfTasks; i++) {
+            PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
+            service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(),
+                new ActionListener<PersistentTask<PersistentTaskParams>>() {
+                    @Override
+                    public void onResponse(PersistentTask<PersistentTaskParams> task) {
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        latch.countDown();
+                    }
+                });
+        }
+        latch.await();
+
+        ClusterService clusterService = internalCluster().clusterService(internalCluster().getMasterName());
+        PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        assertEquals(numberOfTasks, tasks.tasks().stream().filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName())).count());
+
+        logger.trace("waiting for the tasks to be running");
+        assertBusy(() -> {
+            ListTasksResponse listTasks = client().admin().cluster().prepareListTasks()
+                                                                    .setActions(TestPersistentTasksExecutor.NAME + "[c]")
+                                                                    .get();
+            assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks));
+        });
+
+        try {
+            logger.trace("disable persistent tasks assignment");
+            disablePersistentTasksAssignment();
+
+            logger.trace("restart the cluster");
+            internalCluster().fullRestart();
+            ensureYellow();
+
+            logger.trace("persistent tasks assignment is still disabled");
+            assertEnableAssignmentSetting(Allocation.NONE);
+
+            logger.trace("persistent tasks are not assigned");
+            tasks = internalCluster().clusterService().state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+            assertEquals(numberOfTasks, tasks.tasks().stream()
+                .filter(t -> TestPersistentTasksExecutor.NAME.equals(t.getTaskName()))
+                .filter(t -> t.isAssigned() == false)
+                .count());
+
+            ListTasksResponse runningTasks = client().admin().cluster().prepareListTasks()
+                .setActions(TestPersistentTasksExecutor.NAME + "[c]")
+                .get();
+            assertThat(runningTasks.getTasks().size(), equalTo(0));
+
+            logger.trace("enable persistent tasks assignment");
+            if (randomBoolean()) {
+                enablePersistentTasksAssignment();
+            } else {
+                resetPersistentTasksAssignment();
+            }
+
+            assertBusy(() -> {
+                ListTasksResponse listTasks = client().admin().cluster().prepareListTasks()
+                    .setActions(TestPersistentTasksExecutor.NAME + "[c]")
+                    .get();
+                assertThat(listTasks.getTasks().size(), equalTo(numberOfTasks));
+            });
+
+        } finally {
+            resetPersistentTasksAssignment();
+        }
+    }
+
+    private void assertEnableAssignmentSetting(final Allocation expected) {
+        ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().clear().setMetaData(true).get();
+        Settings settings = clusterStateResponse.getState().getMetaData().settings();
+
+        String value = settings.get(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey());
+        assertThat(Allocation.fromString(value), equalTo(expected));
+    }
+
+    private void disablePersistentTasksAssignment() {
+        Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.NONE);
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+    }
+
+    private void enablePersistentTasksAssignment() {
+        Settings.Builder settings = Settings.builder().put(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), Allocation.ALL);
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+    }
+
+    private void resetPersistentTasksAssignment() {
+        Settings.Builder settings = Settings.builder().putNull(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey());
+        assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));
+    }
+
+    /** Returns a random task parameter **/
+    private static PersistentTaskParams randomTaskParams() {
+        if (randomBoolean()) {
+            return null;
+        }
+        return new TestParams(randomAlphaOfLength(10));
+    }
+}

+ 52 - 0
server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderTests.java

@@ -0,0 +1,52 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.persistent.decider;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.persistent.PersistentTasksDecidersTestCase;
+
+public class EnableAssignmentDeciderTests extends PersistentTasksDecidersTestCase {
+
+    public void testAllocationValues() {
+        final String all = randomFrom("all", "All", "ALL");
+        assertEquals(EnableAssignmentDecider.Allocation.ALL, EnableAssignmentDecider.Allocation.fromString(all));
+
+        final String none = randomFrom("none", "None", "NONE");
+        assertEquals(EnableAssignmentDecider.Allocation.NONE, EnableAssignmentDecider.Allocation.fromString(none));
+    }
+
+    public void testEnableAssignment() {
+        final int nbTasks = randomIntBetween(1, 10);
+        final int nbNodes = randomIntBetween(1, 5);
+        final EnableAssignmentDecider.Allocation allocation = randomFrom(EnableAssignmentDecider.Allocation.values());
+
+        Settings settings = Settings.builder()
+            .put(EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey(), allocation.toString())
+            .build();
+        updateSettings(settings);
+
+        ClusterState clusterState = reassign(createClusterStateWithTasks(nbNodes, nbTasks));
+        if (allocation == EnableAssignmentDecider.Allocation.ALL) {
+            assertNbAssignedTasks(nbTasks, clusterState);
+        } else {
+            assertNbUnassignedTasks(nbTasks, clusterState);
+        }
+    }
+}