Browse Source

Periodically try to reassign unassigned persistent tasks (#36069)

Previously persistent task assignment was checked in the
following situations:

- Persistent tasks are changed
- A node joins or leaves the cluster
- The routing table is changed
- Custom metadata in the cluster state is changed
- A new master node is elected

However, there could be situations when a persistent
task that could not be assigned to a node could become
assignable due to some other change, such as memory
usage on the nodes.

This change adds a timed recheck of persistent task
assignment to account for such situations.  The timer
is suspended while checks triggered by cluster state
changes are in-flight to avoid adding burden to an
already busy cluster.

Closes #35792
David Roberts 6 years ago
parent
commit
13cb0fb98b

+ 12 - 2
docs/reference/modules/cluster/misc.asciidoc

@@ -135,10 +135,10 @@ Plugins can create a kind of tasks called persistent tasks. Those tasks are
 usually long-live tasks and are stored in the cluster state, allowing the
 tasks to be revived after a full cluster restart.
 
-Every time a persistent task is created, the master nodes takes care of
+Every time a persistent task is created, the master node takes care of
 assigning the task to a node of the cluster, and the assigned node will then
 pick up the task and execute it locally. The process of assigning persistent
-tasks to nodes is controlled by the following property, which can be updated
+tasks to nodes is controlled by the following properties, which can be updated
 dynamically:
 
 `cluster.persistent_tasks.allocation.enable`::
@@ -153,3 +153,13 @@ This setting does not affect the persistent tasks that are already being execute
 Only newly created persistent tasks, or tasks that must be reassigned (after a node
 left the cluster, for example), are impacted by this setting.
 --
+
+`cluster.persistent_tasks.allocation.recheck_interval`::
+
+     The master node will automatically check whether persistent tasks need to
+     be assigned when the cluster state changes significantly. However, there
+     may be other factors, such as memory usage, that affect whether persistent
+     tasks can be assigned to nodes but do not cause the cluster state to change.
+     This setting controls how often assignment checks are performed to react to
+     these factors. The default is 30 seconds. The minimum permitted value is 10
+     seconds.

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -92,6 +92,7 @@ import org.elasticsearch.monitor.jvm.JvmService;
 import org.elasticsearch.monitor.os.OsService;
 import org.elasticsearch.monitor.process.ProcessService;
 import org.elasticsearch.node.Node;
+import org.elasticsearch.persistent.PersistentTasksClusterService;
 import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.repositories.fs.FsRepository;
@@ -456,6 +457,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
                     Node.BREAKER_TYPE_KEY,
                     OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
                     IndexGraveyard.SETTING_MAX_TOMBSTONES,
+                    PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
                     EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
                     PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
                     PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,

+ 184 - 0
server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java

@@ -0,0 +1,184 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.Closeable;
+import java.util.Objects;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A base class for tasks that need to repeat.
+ */
+public abstract class AbstractAsyncTask implements Runnable, Closeable {
+
+    private final Logger logger;
+    private final ThreadPool threadPool;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final boolean autoReschedule;
+    private volatile ScheduledFuture<?> scheduledFuture;
+    private volatile boolean isScheduledOrRunning;
+    private volatile Exception lastThrownException;
+    private volatile TimeValue interval;
+
+    protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
+        this.logger = logger;
+        this.threadPool = threadPool;
+        this.interval = interval;
+        this.autoReschedule = autoReschedule;
+    }
+
+    /**
+     * Change the interval between runs.
+     * If a future run is scheduled then this will reschedule it.
+     * @param interval The new interval between runs.
+     */
+    public synchronized void setInterval(TimeValue interval) {
+        this.interval = interval;
+        if (scheduledFuture != null) {
+            rescheduleIfNecessary();
+        }
+    }
+
+    public TimeValue getInterval() {
+        return interval;
+    }
+
+    /**
+     * Test any external conditions that determine whether the task
+     * should be scheduled.  This method does *not* need to test if
+     * the task is closed, as being closed automatically prevents
+     * scheduling.
+     * @return Should the task be scheduled to run?
+     */
+    protected abstract boolean mustReschedule();
+
+    /**
+     * Schedule the task to run after the configured interval if it
+     * is not closed and any further conditions imposed by derived
+     * classes are met.  Any previously scheduled invocation is
+     * cancelled.
+     */
+    public synchronized void rescheduleIfNecessary() {
+        if (isClosed()) {
+            return;
+        }
+        if (scheduledFuture != null) {
+            FutureUtils.cancel(scheduledFuture);
+        }
+        if (interval.millis() > 0 && mustReschedule()) {
+            if (logger.isTraceEnabled()) {
+                logger.trace("scheduling {} every {}", toString(), interval);
+            }
+            scheduledFuture = threadPool.schedule(interval, getThreadPool(), this);
+            isScheduledOrRunning = true;
+        } else {
+            logger.trace("scheduled {} disabled", toString());
+            scheduledFuture = null;
+            isScheduledOrRunning = false;
+        }
+    }
+
+    public boolean isScheduled() {
+        // Currently running counts as scheduled to avoid an oscillating return value
+        // from this method when a task is repeatedly running and rescheduling itself.
+        return isScheduledOrRunning;
+    }
+
+    /**
+     * Cancel any scheduled run, but do not prevent subsequent restarts.
+     */
+    public synchronized void cancel() {
+        FutureUtils.cancel(scheduledFuture);
+        scheduledFuture = null;
+        isScheduledOrRunning = false;
+    }
+
+    /**
+     * Cancel any scheduled run
+     */
+    @Override
+    public synchronized void close() {
+        if (closed.compareAndSet(false, true)) {
+            cancel();
+        }
+    }
+
+    public boolean isClosed() {
+        return this.closed.get();
+    }
+
+    @Override
+    public final void run() {
+        synchronized (this) {
+            scheduledFuture = null;
+            isScheduledOrRunning = autoReschedule;
+        }
+        try {
+            runInternal();
+        } catch (Exception ex) {
+            if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
+                // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
+                logger.warn(
+                    () -> new ParameterizedMessage(
+                        "failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
+                        toString()),
+                    ex);
+                lastThrownException = ex;
+            }
+        } finally {
+            if (autoReschedule) {
+                rescheduleIfNecessary();
+            }
+        }
+    }
+
+    private static boolean sameException(Exception left, Exception right) {
+        if (left.getClass() == right.getClass()) {
+            if (Objects.equals(left.getMessage(), right.getMessage())) {
+                StackTraceElement[] stackTraceLeft = left.getStackTrace();
+                StackTraceElement[] stackTraceRight = right.getStackTrace();
+                if (stackTraceLeft.length == stackTraceRight.length) {
+                    for (int i = 0; i < stackTraceLeft.length; i++) {
+                        if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    protected abstract void runInternal();
+
+    /**
+     * Use the same threadpool by default.
+     * Derived classes can change this if required.
+     */
+    protected String getThreadPool() {
+        return ThreadPool.Names.SAME;
+    }
+}

+ 6 - 89
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -38,8 +38,8 @@ import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
@@ -87,7 +87,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -805,100 +804,18 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         }
     }
 
-    abstract static class BaseAsyncTask implements Runnable, Closeable {
+    abstract static class BaseAsyncTask extends AbstractAsyncTask {
         protected final IndexService indexService;
-        protected final ThreadPool threadPool;
-        private final TimeValue interval;
-        private ScheduledFuture<?> scheduledFuture;
-        private final AtomicBoolean closed = new AtomicBoolean(false);
-        private volatile Exception lastThrownException;
 
         BaseAsyncTask(IndexService indexService, TimeValue interval) {
+            super(indexService.logger, indexService.threadPool, interval, true);
             this.indexService = indexService;
-            this.threadPool = indexService.getThreadPool();
-            this.interval = interval;
-            onTaskCompletion();
+            rescheduleIfNecessary();
         }
 
-        boolean mustReschedule() {
+        protected boolean mustReschedule() {
             // don't re-schedule if its closed or if we don't have a single shard here..., we are done
-            return indexService.closed.get() == false
-                && closed.get() == false && interval.millis() > 0;
-        }
-
-        private synchronized void onTaskCompletion() {
-            if (mustReschedule()) {
-                if (indexService.logger.isTraceEnabled()) {
-                    indexService.logger.trace("scheduling {} every {}", toString(), interval);
-                }
-                this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this);
-            } else {
-                indexService.logger.trace("scheduled {} disabled", toString());
-                this.scheduledFuture = null;
-            }
-        }
-
-        boolean isScheduled() {
-            return scheduledFuture != null;
-        }
-
-        @Override
-        public final void run() {
-            try {
-                runInternal();
-            } catch (Exception ex) {
-                if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
-                    // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
-                    indexService.logger.warn(
-                        () -> new ParameterizedMessage(
-                            "failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
-                            toString()),
-                        ex);
-                    lastThrownException = ex;
-                }
-            } finally {
-                onTaskCompletion();
-            }
-        }
-
-        private static boolean sameException(Exception left, Exception right) {
-            if (left.getClass() == right.getClass()) {
-                if (Objects.equals(left.getMessage(), right.getMessage())) {
-                    StackTraceElement[] stackTraceLeft = left.getStackTrace();
-                    StackTraceElement[] stackTraceRight = right.getStackTrace();
-                    if (stackTraceLeft.length == stackTraceRight.length) {
-                        for (int i = 0; i < stackTraceLeft.length; i++) {
-                            if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
-                                return false;
-                            }
-                        }
-                        return true;
-                    }
-                }
-            }
-            return false;
-        }
-
-        protected abstract void runInternal();
-
-        protected String getThreadPool() {
-            return ThreadPool.Names.SAME;
-        }
-
-        @Override
-        public synchronized void close() {
-            if (closed.compareAndSet(false, true)) {
-                FutureUtils.cancel(scheduledFuture);
-                scheduledFuture = null;
-            }
-        }
-
-        TimeValue getInterval() {
-            return interval;
-        }
-
-        boolean isClosed() {
-            return this.closed.get();
+            return indexService.closed.get() == false;
         }
     }
 

+ 2 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -501,7 +501,8 @@ public class Node implements Closeable {
 
             final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
             final PersistentTasksClusterService persistentTasksClusterService =
-                new PersistentTasksClusterService(settings, registry, clusterService);
+                new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
+            resourcesToClose.add(persistentTasksClusterService);
             final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
 
             modules.add(b -> {

+ 102 - 16
server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

@@ -31,30 +31,55 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.persistent.decider.AssignmentDecision;
 import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
+import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.Closeable;
 import java.util.Objects;
 
 /**
  * Component that runs only on the master node and is responsible for assigning running tasks to nodes
  */
-public class PersistentTasksClusterService implements ClusterStateListener {
+public class PersistentTasksClusterService implements ClusterStateListener, Closeable {
+
+    public static final Setting<TimeValue> CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING =
+        Setting.timeSetting("cluster.persistent_tasks.allocation.recheck_interval", TimeValue.timeValueSeconds(30),
+            TimeValue.timeValueSeconds(10), Setting.Property.Dynamic, Setting.Property.NodeScope);
 
     private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class);
 
     private final ClusterService clusterService;
     private final PersistentTasksExecutorRegistry registry;
     private final EnableAssignmentDecider decider;
+    private final ThreadPool threadPool;
+    private final PeriodicRechecker periodicRechecker;
 
-    public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) {
+    public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
+                                         ThreadPool threadPool) {
         this.clusterService = clusterService;
-        clusterService.addListener(this);
         this.registry = registry;
         this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
+        this.threadPool = threadPool;
+        this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
+        clusterService.addListener(this);
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
+            this::setRecheckInterval);
+    }
+
+    void setRecheckInterval(TimeValue recheckInterval) {
+        periodicRechecker.setInterval(recheckInterval);
+    }
+
+    @Override
+    public void close() {
+        periodicRechecker.close();
     }
 
     /**
@@ -91,7 +116,11 @@ public class PersistentTasksClusterService implements ClusterStateListener {
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
                 if (tasks != null) {
-                    listener.onResponse(tasks.getTask(taskId));
+                    PersistentTask<?> task = tasks.getTask(taskId);
+                    listener.onResponse(task);
+                    if (task != null && task.isAssigned() == false && periodicRechecker.isScheduled() == false) {
+                        periodicRechecker.rescheduleIfNecessary();
+                    }
                 } else {
                     listener.onResponse(null);
                 }
@@ -155,7 +184,7 @@ public class PersistentTasksClusterService implements ClusterStateListener {
     public void removePersistentTask(String id, ActionListener<PersistentTask<?>> listener) {
         clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
             @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
+            public ClusterState execute(ClusterState currentState) {
                 PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
                 if (tasksInProgress.hasTask(id)) {
                     return update(currentState, tasksInProgress.removeTask(id));
@@ -243,22 +272,41 @@ public class PersistentTasksClusterService implements ClusterStateListener {
     public void clusterChanged(ClusterChangedEvent event) {
         if (event.localNodeMaster()) {
             if (shouldReassignPersistentTasks(event)) {
+                // We want to avoid a periodic check duplicating this work
+                periodicRechecker.cancel();
                 logger.trace("checking task reassignment for cluster state {}", event.state().getVersion());
-                clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
-                    @Override
-                    public ClusterState execute(ClusterState currentState) {
-                        return reassignTasks(currentState);
-                    }
-
-                    @Override
-                    public void onFailure(String source, Exception e) {
-                        logger.warn("failed to reassign persistent tasks", e);
-                    }
-                });
+                reassignPersistentTasks();
             }
         }
     }
 
+    /**
+     * Submit a cluster state update to reassign any persistent tasks that need reassigning
+     */
+    private void reassignPersistentTasks() {
+        clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                return reassignTasks(currentState);
+            }
+
+            @Override
+            public void onFailure(String source, Exception e) {
+                logger.warn("failed to reassign persistent tasks", e);
+                // There must be a task that's worth rechecking because there was one
+                // that caused this method to be called and the method failed to assign it
+                periodicRechecker.rescheduleIfNecessary();
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                if (isAnyTaskUnassigned(newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) {
+                    periodicRechecker.rescheduleIfNecessary();
+                }
+            }
+        });
+    }
+
     /**
      * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
      * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the
@@ -290,6 +338,13 @@ public class PersistentTasksClusterService implements ClusterStateListener {
         return false;
     }
 
+    /**
+     * Returns true if any persistent task is unassigned.
+     */
+    private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks) {
+        return tasks != null && tasks.tasks().stream().anyMatch(task -> task.getAssignment().isAssigned() == false);
+    }
+
     /**
      * Evaluates the cluster state and tries to assign tasks to nodes.
      *
@@ -347,4 +402,35 @@ public class PersistentTasksClusterService implements ClusterStateListener {
             return currentState;
         }
     }
+
+    /**
+     * Class to periodically try to reassign unassigned persistent tasks.
+     */
+    private class PeriodicRechecker extends AbstractAsyncTask {
+
+        PeriodicRechecker(TimeValue recheckInterval) {
+            super(logger, threadPool, recheckInterval, false);
+        }
+
+        @Override
+        protected boolean mustReschedule() {
+            return true;
+        }
+
+        @Override
+        public void runInternal() {
+            if (clusterService.localNode().isMasterNode()) {
+                final ClusterState state = clusterService.state();
+                logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion());
+                if (isAnyTaskUnassigned(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) {
+                    reassignPersistentTasks();
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "persistent_task_recheck";
+        }
+    }
 }

+ 206 - 0
server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java

@@ -0,0 +1,206 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AbstractAsyncTaskTests extends ESTestCase {
+
+    private static ThreadPool threadPool;
+
+    @BeforeClass
+    public static void setUpThreadPool() {
+        threadPool = new TestThreadPool(AbstractAsyncTaskTests.class.getSimpleName());
+    }
+
+    @AfterClass
+    public static void tearDownThreadPool() {
+        terminate(threadPool);
+    }
+
+    public void testAutoRepeat() throws Exception {
+
+        boolean shouldRunThrowException = randomBoolean();
+        final CyclicBarrier barrier1 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
+        final CyclicBarrier barrier2 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
+        final AtomicInteger count = new AtomicInteger();
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) {
+
+            @Override
+            protected boolean mustReschedule() {
+                return true;
+            }
+
+            @Override
+            protected void runInternal() {
+                assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]"));
+                try {
+                    barrier1.await();
+                } catch (Exception e) {
+                    fail("interrupted");
+                }
+                count.incrementAndGet();
+                try {
+                    barrier2.await();
+                } catch (Exception e) {
+                    fail("interrupted");
+                }
+                if (shouldRunThrowException) {
+                    throw new RuntimeException("foo");
+                }
+            }
+
+            @Override
+            protected String getThreadPool() {
+                return ThreadPool.Names.GENERIC;
+            }
+        };
+
+        assertFalse(task.isScheduled());
+        task.rescheduleIfNecessary();
+        assertTrue(task.isScheduled());
+        barrier1.await();
+        assertTrue(task.isScheduled());
+        barrier2.await();
+        assertEquals(1, count.get());
+        barrier1.reset();
+        barrier2.reset();
+        barrier1.await();
+        assertTrue(task.isScheduled());
+        task.close();
+        barrier2.await();
+        assertEquals(2, count.get());
+        assertTrue(task.isClosed());
+        assertFalse(task.isScheduled());
+        assertEquals(2, count.get());
+    }
+
+    public void testManualRepeat() throws Exception {
+
+        boolean shouldRunThrowException = randomBoolean();
+        final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence
+        final AtomicInteger count = new AtomicInteger();
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) {
+
+            @Override
+            protected boolean mustReschedule() {
+                return true;
+            }
+
+            @Override
+            protected void runInternal() {
+                assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]"));
+                count.incrementAndGet();
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    fail("interrupted");
+                }
+                if (shouldRunThrowException) {
+                    throw new RuntimeException("foo");
+                }
+            }
+
+            @Override
+            protected String getThreadPool() {
+                return ThreadPool.Names.GENERIC;
+            }
+        };
+
+        assertFalse(task.isScheduled());
+        task.rescheduleIfNecessary();
+        barrier.await();
+        assertEquals(1, count.get());
+        assertFalse(task.isScheduled());
+        barrier.reset();
+        expectThrows(TimeoutException.class, () -> barrier.await(10, TimeUnit.MILLISECONDS));
+        assertEquals(1, count.get());
+        barrier.reset();
+        task.rescheduleIfNecessary();
+        barrier.await();
+        assertEquals(2, count.get());
+        assertFalse(task.isScheduled());
+        assertFalse(task.isClosed());
+        task.close();
+        assertTrue(task.isClosed());
+    }
+
+    public void testCloseWithNoRun() {
+
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10), true) {
+
+            @Override
+            protected boolean mustReschedule() {
+                return true;
+            }
+
+            @Override
+            protected void runInternal() {
+            }
+        };
+
+        assertFalse(task.isScheduled());
+        task.rescheduleIfNecessary();
+        assertTrue(task.isScheduled());
+        task.close();
+        assertTrue(task.isClosed());
+        assertFalse(task.isScheduled());
+    }
+
+    public void testChangeInterval() throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1), true) {
+
+            @Override
+            protected boolean mustReschedule() {
+                return latch.getCount() > 0;
+            }
+
+            @Override
+            protected void runInternal() {
+                latch.countDown();
+            }
+        };
+
+        assertFalse(task.isScheduled());
+        task.rescheduleIfNecessary();
+        assertTrue(task.isScheduled());
+        task.setInterval(TimeValue.timeValueMillis(1));
+        assertTrue(task.isScheduled());
+        // This should only take 2 milliseconds in ideal conditions, but allow 10 seconds in case of VM stalls
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertBusy(() -> assertFalse(task.isScheduled()));
+        task.close();
+        assertFalse(task.isScheduled());
+        assertTrue(task.isClosed());
+    }
+}

+ 0 - 1
server/src/test/java/org/elasticsearch/index/IndexServiceTests.java

@@ -136,7 +136,6 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
         assertNotSame(refreshTask, indexService.getRefreshTask());
         assertTrue(refreshTask.isClosed());
         assertFalse(refreshTask.isScheduled());
-        assertFalse(indexService.getRefreshTask().mustReschedule());
 
         // set it to 100ms
         client().admin().indices().prepareUpdateSettings("test")

+ 142 - 6
server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -32,6 +33,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
 import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
@@ -51,6 +53,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 
 import static java.util.Collections.emptyMap;
@@ -63,6 +67,11 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class PersistentTasksClusterServiceTests extends ESTestCase {
 
@@ -71,6 +80,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
     /** Needed by {@link PersistentTasksClusterService} **/
     private ClusterService clusterService;
 
+    private volatile boolean nonClusterStateCondition;
+
     @BeforeClass
     public static void setUpThreadPool() {
         threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName());
@@ -83,7 +94,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
     }
 
     @AfterClass
-    public static void tearDownThreadPool() throws Exception {
+    public static void tearDownThreadPool() {
         terminate(threadPool);
     }
 
@@ -177,7 +188,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         addTestNodes(nodes, randomIntBetween(1, 10));
         int numberOfTasks = randomIntBetween(2, 40);
         for (int i = 0; i < numberOfTasks; i++) {
-            addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
+            addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists");
         }
 
         MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
@@ -186,7 +197,42 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
 
         PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
         assertThat(tasksInProgress, notNullValue());
+    }
+
+    public void testNonClusterStateConditionAssignment() {
+        ClusterState clusterState = initialState();
+        ClusterState.Builder builder = ClusterState.builder(clusterState);
+        PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
+            clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
+        addTestNodes(nodes, randomIntBetween(1, 3));
+        addTask(tasks, "assign_based_on_non_cluster_state_condition", null);
+        MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
+        clusterState = builder.metaData(metaData).nodes(nodes).build();
+
+        nonClusterStateCondition = false;
+        ClusterState newClusterState = reassign(clusterState);
+
+        PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        assertThat(tasksInProgress, notNullValue());
+        for (PersistentTask<?> task : tasksInProgress.tasks()) {
+            assertThat(task.getExecutorNode(), nullValue());
+            assertThat(task.isAssigned(), equalTo(false));
+            assertThat(task.getAssignment().getExplanation(), equalTo("non-cluster state condition prevents assignment"));
+        }
+        assertThat(tasksInProgress.tasks().size(), equalTo(1));
+
+        nonClusterStateCondition = true;
+        ClusterState finalClusterState = reassign(newClusterState);
 
+        tasksInProgress = finalClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        assertThat(tasksInProgress, notNullValue());
+        for (PersistentTask<?> task : tasksInProgress.tasks()) {
+            assertThat(task.getExecutorNode(), notNullValue());
+            assertThat(task.isAssigned(), equalTo(true));
+            assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
+        }
+        assertThat(tasksInProgress.tasks().size(), equalTo(1));
     }
 
     public void testReassignTasks() {
@@ -201,14 +247,14 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
             switch (randomInt(2)) {
                 case 0:
                     // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned
-                    addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exits");
+                    addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exists");
                     break;
                 case 1:
                     // add a task assigned to non-existing node that should not get assigned
-                    addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exits");
+                    addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exists");
                     break;
                 case 2:
-                    addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits");
+                    addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists");
                     break;
 
             }
@@ -368,6 +414,80 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes));
     }
 
+    public void testPeriodicRecheck() throws Exception {
+        ClusterState initialState = initialState();
+        ClusterState.Builder builder = ClusterState.builder(initialState);
+        PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
+            initialState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes());
+        addTestNodes(nodes, randomIntBetween(1, 3));
+        addTask(tasks, "assign_based_on_non_cluster_state_condition", null);
+        MetaData.Builder metaData = MetaData.builder(initialState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
+        ClusterState clusterState = builder.metaData(metaData).nodes(nodes).build();
+
+        nonClusterStateCondition = false;
+
+        boolean shouldSimulateFailure = randomBoolean();
+        ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure);
+        PersistentTasksClusterService service = createService(recheckTestClusterService,
+            (params, currentState) -> assignBasedOnNonClusterStateCondition(currentState.nodes()));
+
+        ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, initialState);
+        service.clusterChanged(event);
+        ClusterState newClusterState = recheckTestClusterService.state();
+
+        {
+            PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+            assertThat(tasksInProgress, notNullValue());
+            for (PersistentTask<?> task : tasksInProgress.tasks()) {
+                assertThat(task.getExecutorNode(), nullValue());
+                assertThat(task.isAssigned(), equalTo(false));
+                assertThat(task.getAssignment().getExplanation(), equalTo(shouldSimulateFailure ?
+                    "explanation: assign_based_on_non_cluster_state_condition" : "non-cluster state condition prevents assignment"));
+            }
+            assertThat(tasksInProgress.tasks().size(), equalTo(1));
+        }
+
+        nonClusterStateCondition = true;
+        service.setRecheckInterval(TimeValue.timeValueMillis(1));
+
+        assertBusy(() -> {
+            PersistentTasksCustomMetaData tasksInProgress =
+                recheckTestClusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+            assertThat(tasksInProgress, notNullValue());
+            for (PersistentTask<?> task : tasksInProgress.tasks()) {
+                assertThat(task.getExecutorNode(), notNullValue());
+                assertThat(task.isAssigned(), equalTo(true));
+                assertThat(task.getAssignment().getExplanation(), equalTo("test assignment"));
+            }
+            assertThat(tasksInProgress.tasks().size(), equalTo(1));
+        });
+    }
+
+    private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
+        AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
+        AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
+        ClusterService recheckTestClusterService = mock(ClusterService.class);
+        when(recheckTestClusterService.getClusterSettings()).thenReturn(clusterService.getClusterSettings());
+        doAnswer(invocationOnMock -> state.get().getNodes().getLocalNode()).when(recheckTestClusterService).localNode();
+        doAnswer(invocationOnMock -> state.get()).when(recheckTestClusterService).state();
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
+            ClusterState before = state.get();
+            ClusterState after = task.execute(before);
+            if (testFailureNextTime.compareAndSet(true, false)) {
+                task.onFailure("testing failure", new RuntimeException("foo"));
+            } else {
+                state.set(after);
+                task.clusterStateProcessed("test", before, after);
+            }
+            return null;
+        }).when(recheckTestClusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
+
+        return recheckTestClusterService;
+    }
+
     private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) {
         for (int i = 0; i < nonLocalNodesCount; i++) {
             nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
@@ -387,6 +507,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
                     return null;
                 case "assign_one":
                     return assignOnlyOneTaskAtATime(currentState);
+                case "assign_based_on_non_cluster_state_condition":
+                    return assignBasedOnNonClusterStateCondition(currentState.nodes());
                 default:
                     fail("unknown param " + testParams.getTestParam());
             }
@@ -408,6 +530,14 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         }
     }
 
+    private Assignment assignBasedOnNonClusterStateCondition(DiscoveryNodes nodes) {
+        if (nonClusterStateCondition) {
+            return randomNodeAssignment(nodes);
+        } else {
+            return new Assignment(null, "non-cluster state condition prevents assignment");
+        }
+    }
+
     private Assignment randomNodeAssignment(DiscoveryNodes nodes) {
         if (nodes.getNodes().isEmpty()) {
             return NO_NODE_FOUND;
@@ -623,6 +753,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         nodes.masterNodeId("this_node");
 
         return ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(nodes)
                 .metaData(metaData)
                 .routingTable(routingTable.build())
                 .build();
@@ -640,6 +771,11 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
 
     /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/
     private <P extends PersistentTaskParams> PersistentTasksClusterService createService(final BiFunction<P, ClusterState, Assignment> fn) {
+        return createService(clusterService, fn);
+    }
+
+    private <P extends PersistentTaskParams> PersistentTasksClusterService createService(ClusterService clusterService,
+                                                                                         final BiFunction<P, ClusterState, Assignment> fn) {
         PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(
             singleton(new PersistentTasksExecutor<P>(TestPersistentTasksExecutor.NAME, null) {
                 @Override
@@ -652,6 +788,6 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
                     throw new UnsupportedOperationException();
                 }
             }));
-        return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService);
+        return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService, threadPool);
     }
 }

+ 3 - 2
server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java

@@ -69,11 +69,12 @@ public abstract class PersistentTasksDecidersTestCase extends ESTestCase {
                 };
             }
         };
-        persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService);
+        persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService,
+            threadPool);
     }
 
     @AfterClass
-    public static void tearDownThreadPool() throws Exception {
+    public static void tearDownThreadPool() {
         terminate(threadPool);
     }
 

+ 43 - 2
server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

@@ -36,6 +36,7 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTask
 import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
 import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder;
 import org.junit.After;
+import org.junit.Before;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -45,6 +46,7 @@ import java.util.Objects;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
@@ -64,6 +66,11 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
         return true;
     }
 
+    @Before
+    public void resetNonClusterStateCondition() {
+        TestPersistentTasksExecutor.setNonClusterStateCondition(true);
+    }
+
     @After
     public void cleanup() throws Exception {
         assertNoRunningTasks();
@@ -173,6 +180,42 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
         assertEquals(removeFuture.get().getId(), taskId);
     }
 
+    public void testPersistentActionWithNonClusterStateCondition() throws Exception {
+        PersistentTasksClusterService persistentTasksClusterService =
+            internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName());
+        // Speed up rechecks to a rate that is quicker than what settings would allow
+        persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
+
+        TestPersistentTasksExecutor.setNonClusterStateCondition(false);
+
+        PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
+        PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
+        TestParams testParams = new TestParams("Blah");
+        persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
+        String taskId = future.get().getId();
+
+        assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(),
+            empty());
+
+        TestPersistentTasksExecutor.setNonClusterStateCondition(true);
+
+        assertBusy(() -> {
+            // Wait for the task to start
+            assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
+                .size(), equalTo(1));
+        });
+        TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
+            .get().getTasks().get(0);
+
+        // Verifying the the task can now be assigned
+        assertThat(taskInfo.getTaskId().getNodeId(), notNullValue());
+
+        // Remove the persistent task
+        PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
+        persistentTasksService.sendRemoveRequest(taskId, removeFuture);
+        assertEquals(removeFuture.get().getId(), taskId);
+    }
+
     public void testPersistentActionStatusUpdate() throws Exception {
         PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
         PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
@@ -277,8 +320,6 @@ public class PersistentTasksExecutorIT extends ESIntegTestCase {
             assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId)
                     .get().getTasks().size(), equalTo(1));
         }
-
-
     }
 
     private void assertNoRunningTasks() throws Exception {

+ 9 - 1
server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

@@ -298,13 +298,22 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
         public static final String NAME = "cluster:admin/persistent/test";
         private final ClusterService clusterService;
 
+        private static volatile boolean nonClusterStateCondition = true;
+
         public TestPersistentTasksExecutor(ClusterService clusterService) {
             super(NAME, ThreadPool.Names.GENERIC);
             this.clusterService = clusterService;
         }
 
+        public static void setNonClusterStateCondition(boolean nonClusterStateCondition) {
+            TestPersistentTasksExecutor.nonClusterStateCondition = nonClusterStateCondition;
+        }
+
         @Override
         public Assignment getAssignment(TestParams params, ClusterState clusterState) {
+            if (nonClusterStateCondition == false) {
+                return new Assignment(null, "non cluster state condition prevents assignment");
+            }
             if (params == null || params.getExecutorNodeAttr() == null) {
                 return super.getAssignment(params, clusterState);
             } else {
@@ -315,7 +324,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
                 } else {
                     return NO_NODE_FOUND;
                 }
-
             }
         }