Browse Source

[ML] Migrate unallocated jobs and datafeeds (#37430)

Migrate ml job and datafeed config of open jobs and update
the parameters of the persistent tasks as they become unallocated
during a rolling upgrade. Block allocation of ml persistent tasks
until the configs are migrated.
David Kyle 6 years ago
parent
commit
bea46f7b52

+ 4 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

@@ -155,6 +155,10 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
 
     private static <T extends ToXContent> void mapValuesToXContent(ParseField field, Map<String, T> map, XContentBuilder builder,
                                                                    Params params) throws IOException {
+        if (map.isEmpty()) {
+            return;
+        }
+
         builder.startArray(field.getPreferredName());
         for (Map.Entry<String, T> entry : map.entrySet()) {
             entry.getValue().toXContent(builder, params);

+ 65 - 14
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

@@ -6,14 +6,16 @@
 
 package org.elasticsearch.xpack.core.ml;
 
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.persistent.PersistentTasksClusterService;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -133,6 +135,42 @@ public final class MlTasks {
                 .collect(Collectors.toSet());
     }
 
+    /**
+     * Get the job Ids of anomaly detector job tasks that do
+     * not have an assignment.
+     *
+     * @param tasks Persistent tasks. If null an empty set is returned.
+     * @param nodes The cluster nodes
+     * @return The job Ids of tasks to do not have an assignment.
+     */
+    public static Set<String> unallocatedJobIds(@Nullable PersistentTasksCustomMetaData tasks,
+                                                DiscoveryNodes nodes) {
+        return unallocatedJobTasks(tasks, nodes).stream()
+                .map(task -> task.getId().substring(JOB_TASK_ID_PREFIX.length()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * The job tasks that do not have an allocation as determined by
+     * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
+     *
+     * @param tasks Persistent tasks. If null an empty set is returned.
+     * @param nodes The cluster nodes
+     * @return Unallocated job tasks
+     */
+    public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedJobTasks(
+            @Nullable PersistentTasksCustomMetaData tasks,
+            DiscoveryNodes nodes) {
+        if (tasks == null) {
+            return Collections.emptyList();
+        }
+
+        return tasks.findTasks(JOB_TASK_NAME, task -> true)
+                .stream()
+                .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
+                .collect(Collectors.toList());
+    }
+
     /**
      * The datafeed Ids of started datafeed tasks
      *
@@ -151,26 +189,39 @@ public final class MlTasks {
     }
 
     /**
-     * Is there an ml anomaly detector job task for the job {@code jobId}?
-     * @param jobId The job id
-     * @param tasks Persistent tasks
-     * @return True if the job has a task
+     * Get the datafeed Ids of started datafeed tasks
+     * that do not have an assignment.
+     *
+     * @param tasks Persistent tasks. If null an empty set is returned.
+     * @param nodes The cluster nodes
+     * @return The job Ids of tasks to do not have an assignment.
      */
-    public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
-        return openJobIds(tasks).contains(jobId);
+    public static Set<String> unallocatedDatafeedIds(@Nullable PersistentTasksCustomMetaData tasks,
+                                                DiscoveryNodes nodes) {
+
+        return unallocatedDatafeedTasks(tasks, nodes).stream()
+                .map(task -> task.getId().substring(DATAFEED_TASK_ID_PREFIX.length()))
+                .collect(Collectors.toSet());
     }
 
     /**
-     * Read the active anomaly detector job tasks.
-     * Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
+     * The datafeed tasks that do not have an allocation as determined by
+     * {@link PersistentTasksClusterService#needsReassignment(PersistentTasksCustomMetaData.Assignment, DiscoveryNodes)}
      *
-     * @param tasks Persistent tasks
-     * @return The job tasks excluding closed and failed jobs
+     * @param tasks Persistent tasks. If null an empty set is returned.
+     * @param nodes The cluster nodes
+     * @return Unallocated datafeed tasks
      */
-    public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
-        return tasks.findTasks(JOB_TASK_NAME, task -> true)
+    public static Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedDatafeedTasks(
+            @Nullable PersistentTasksCustomMetaData tasks,
+            DiscoveryNodes nodes) {
+        if (tasks == null) {
+            return Collections.emptyList();
+        }
+
+        return tasks.findTasks(DATAFEED_TASK_NAME, task -> true)
                 .stream()
-                .filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
+                .filter(task -> PersistentTasksClusterService.needsReassignment(task.getAssignment(), nodes))
                 .collect(Collectors.toList());
     }
 }

+ 51 - 13
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java

@@ -6,6 +6,10 @@
 
 package org.elasticsearch.xpack.core.ml;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
@@ -14,12 +18,14 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 
+import java.net.InetAddress;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 
 public class MlTasksTests extends ESTestCase {
     public void testGetJobState() {
-        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
         // A missing task is a closed job
         assertEquals(JobState.CLOSED, MlTasks.getJobState("foo", tasksBuilder.build()));
         // A task with no status is opening
@@ -52,7 +58,7 @@ public class MlTasksTests extends ESTestCase {
     public void testGetJobTask() {
         assertNull(MlTasks.getJobTask("foo", null));
 
-        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
         tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
                 new PersistentTasksCustomMetaData.Assignment("bar", "test assignment"));
 
@@ -73,7 +79,7 @@ public class MlTasksTests extends ESTestCase {
     }
 
     public void testOpenJobIds() {
-        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
         assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
 
         tasksBuilder.addTask(MlTasks.jobTaskId("foo-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
@@ -92,7 +98,7 @@ public class MlTasksTests extends ESTestCase {
     }
 
     public void testStartedDatafeedIds() {
-        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
         assertThat(MlTasks.openJobIds(tasksBuilder.build()), empty());
 
         tasksBuilder.addTask(MlTasks.jobTaskId("job-1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
@@ -111,16 +117,48 @@ public class MlTasksTests extends ESTestCase {
         assertThat(MlTasks.startedDatafeedIds(null), empty());
     }
 
-    public void testTaskExistsForJob() {
-        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
-        assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
-
-        tasksBuilder.addTask(MlTasks.jobTaskId("foo"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo"),
-                new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
-        tasksBuilder.addTask(MlTasks.jobTaskId("bar"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("bar"),
+    public void testUnallocatedJobIds() {
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
+        tasksBuilder.addTask(MlTasks.jobTaskId("job_with_assignment"), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams("job_with_assignment"),
                 new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
+        tasksBuilder.addTask(MlTasks.jobTaskId("job_without_assignment"), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams("job_without_assignment"),
+                new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
+        tasksBuilder.addTask(MlTasks.jobTaskId("job_without_node"), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams("job_without_node"),
+                new PersistentTasksCustomMetaData.Assignment("dead-node", "expired node"));
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node-1")
+                .masterNodeId("node-1")
+                .build();
+
+        assertThat(MlTasks.unallocatedJobIds(tasksBuilder.build(), nodes),
+                containsInAnyOrder("job_without_assignment", "job_without_node"));
+    }
 
-        assertFalse(MlTasks.taskExistsForJob("job-1", tasksBuilder.build()));
-        assertTrue(MlTasks.taskExistsForJob("foo", tasksBuilder.build()));
+    public void testUnallocatedDatafeedIds() {
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
+        tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_with_assignment"), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams("datafeed_with_assignment", 0L),
+                new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_assignment"), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams("datafeed_without_assignment", 0L),
+                new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId("datafeed_without_node"), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams("datafeed_without_node", 0L),
+                new PersistentTasksCustomMetaData.Assignment("dead_node", "expired node"));
+
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node-1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node-1")
+                .masterNodeId("node-1")
+                .build();
+
+        assertThat(MlTasks.unallocatedDatafeedIds(tasksBuilder.build(), nodes),
+                containsInAnyOrder("datafeed_without_assignment", "datafeed_without_node"));
     }
 }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

@@ -58,7 +58,7 @@ public class MlAssignmentNotifier implements ClusterStateListener {
             return;
         }
 
-        mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
+        mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap(
                 response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
                 e -> {
                     logger.error("error migrating ml configurations", e);

+ 10 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java

@@ -79,9 +79,10 @@ public class MlConfigMigrationEligibilityCheck {
      *     False if {@link #canStartMigration(ClusterState)} returns {@code false}
      *     False if the job is not in the cluster state
      *     False if the {@link Job#isDeleting()}
-     *     False if the job has a persistent task
+     *     False if the job has an allocated persistent task
      *     True otherwise i.e. the job is present, not deleting
-     *     and does not have a persistent task.
+     *     and does not have a persistent task or its persistent
+     *     task is un-allocated
      *
      * @param jobId         The job Id
      * @param clusterState  The cluster state
@@ -100,15 +101,17 @@ public class MlConfigMigrationEligibilityCheck {
         }
 
         PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
-        return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
+        return MlTasks.openJobIds(persistentTasks).contains(jobId) == false ||
+                MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()).contains(jobId);
     }
 
     /**
      * Is the datafeed a eligible for migration? Returns:
      *     False if {@link #canStartMigration(ClusterState)} returns {@code false}
      *     False if the datafeed is not in the cluster state
-     *     False if the datafeed has a persistent task
-     *     True otherwise i.e. the datafeed is present and does not have a persistent task.
+     *     False if the datafeed has an allocated persistent task
+     *     True otherwise i.e. the datafeed is present and does not have a persistent
+     *     task or its persistent task is un-allocated
      *
      * @param datafeedId   The datafeed Id
      * @param clusterState  The cluster state
@@ -125,6 +128,7 @@ public class MlConfigMigrationEligibilityCheck {
         }
 
         PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
-        return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
+        return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false
+                || MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()).contains(datafeedId);
     }
 }

+ 128 - 33
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -36,6 +37,8 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -67,25 +70,28 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
 /**
  * Migrates job and datafeed configurations from the clusterstate to
- * index documents.
+ * index documents for closed or unallocated tasks.
  *
  * There are 3 steps to the migration process
  * 1. Read config from the clusterstate
+ *     - Find all job and datafeed configs that do not have an associated persistent
+ *       task or the persistent task is unallocated
  *     - If a job or datafeed is added after this call it will be added to the index
  *     - If deleted then it's possible the config will be copied before it is deleted.
  *       Mitigate against this by filtering out jobs marked as deleting
  * 2. Copy the config to the index
  *     - The index operation could fail, don't delete from clusterstate in this case
- * 3. Remove config from the clusterstate
+ * 3. Remove config from the clusterstate and update persistent task parameters
  *     - Before this happens config is duplicated in index and clusterstate, all ops
- *       must prefer to use the index config at this stage
+ *       must prefer to use the clusterstate config at this stage
  *     - If the clusterstate update fails then the config will remain duplicated
  *       and the migration process should try again
+ *     - Job and datafeed tasks opened prior to v6.6.0 need to be updated with new
+ *       parameters
  *
  * If there was an error in step 3 and the config is in both the clusterstate and
- * index then when the migrator retries it must not overwrite an existing job config
- * document as once the index document is present all update operations will function
- * on that rather than the clusterstate.
+ * index. At this point the clusterstate config is preferred and all update
+ * operations will function on that rather than the index.
  *
  * The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE}
  * pairs of datafeeds and jobs are migrated together.
@@ -130,7 +136,7 @@ public class MlConfigMigrator {
      * @param clusterState The current clusterstate
      * @param listener     The success listener
      */
-    public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {
+    public void migrateConfigs(ClusterState clusterState, ActionListener<Boolean> listener) {
         if (migrationInProgress.compareAndSet(false, true) == false) {
             listener.onResponse(Boolean.FALSE);
             return;
@@ -183,8 +189,8 @@ public class MlConfigMigrator {
         for (JobsAndDatafeeds batch : batches) {
             voidChainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap(
                 failedDocumentIds -> {
-                    List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
-                    List<String> successfulDatafeedWrites =
+                    List<Job> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs);
+                    List<DatafeedConfig> successfulDatafeedWrites =
                         filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs);
                     removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener);
                 },
@@ -215,24 +221,33 @@ public class MlConfigMigrator {
         );
     }
 
-    private void removeFromClusterState(List<String> jobsToRemoveIds, List<String> datafeedsToRemoveIds,
+    private void removeFromClusterState(List<Job> jobsToRemove, List<DatafeedConfig> datafeedsToRemove,
                                         ActionListener<Void> listener) {
-        if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) {
+        if (jobsToRemove.isEmpty() && datafeedsToRemove.isEmpty()) {
             listener.onResponse(null);
             return;
         }
 
+        Map<String, Job> jobsMap = jobsToRemove.stream().collect(Collectors.toMap(Job::getId, Function.identity()));
+        Map<String, DatafeedConfig> datafeedMap =
+                datafeedsToRemove.stream().collect(Collectors.toMap(DatafeedConfig::getId, Function.identity()));
+
         AtomicReference<RemovalResult> removedConfigs = new AtomicReference<>();
 
         clusterService.submitStateUpdateTask("remove-migrated-ml-configs", new ClusterStateUpdateTask() {
             @Override
             public ClusterState execute(ClusterState currentState) {
-                RemovalResult removed = removeJobsAndDatafeeds(jobsToRemoveIds, datafeedsToRemoveIds,
+                RemovalResult removed = removeJobsAndDatafeeds(jobsToRemove, datafeedsToRemove,
                         MlMetadata.getMlMetadata(currentState));
                 removedConfigs.set(removed);
+
+                PersistentTasksCustomMetaData updatedTasks = rewritePersistentTaskParams(jobsMap, datafeedMap,
+                        currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE), currentState.nodes());
+
                 ClusterState.Builder newState = ClusterState.builder(currentState);
                 newState.metaData(MetaData.builder(currentState.getMetaData())
                         .putCustom(MlMetadata.TYPE, removed.mlMetadata)
+                        .putCustom(PersistentTasksCustomMetaData.TYPE, updatedTasks)
                         .build());
                 return newState.build();
             }
@@ -257,6 +272,82 @@ public class MlConfigMigrator {
         });
     }
 
+    /**
+     * Find any unallocated datafeed and job tasks and update their persistent
+     * task parameters if they have missing fields that were added in v6.6. If
+     * a task exists with a missing field it must have been created in an earlier
+     * version and survived an elasticsearch upgrade.
+     *
+     * If there are no unallocated tasks the {@code currentTasks} argument is returned.
+     *
+     * @param jobs          Job configs
+     * @param datafeeds     Datafeed configs
+     * @param currentTasks  The persistent tasks
+     * @param nodes         The nodes in the cluster
+     * @return  The updated tasks
+     */
+    public static PersistentTasksCustomMetaData rewritePersistentTaskParams(Map<String, Job> jobs, Map<String, DatafeedConfig> datafeeds,
+                                                                            PersistentTasksCustomMetaData currentTasks,
+                                                                            DiscoveryNodes nodes) {
+
+        Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedJobTasks = MlTasks.unallocatedJobTasks(currentTasks, nodes);
+        Collection<PersistentTasksCustomMetaData.PersistentTask> unallocatedDatafeedsTasks =
+                MlTasks.unallocatedDatafeedTasks(currentTasks, nodes);
+
+        if (unallocatedJobTasks.isEmpty() && unallocatedDatafeedsTasks.isEmpty()) {
+            return currentTasks;
+        }
+
+        PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(currentTasks);
+
+        for (PersistentTasksCustomMetaData.PersistentTask jobTask : unallocatedJobTasks) {
+            OpenJobAction.JobParams originalParams = (OpenJobAction.JobParams) jobTask.getParams();
+            if (originalParams.getJob() == null) {
+                Job job = jobs.get(originalParams.getJobId());
+                if (job != null) {
+                    logger.debug("updating persistent task params for job [{}]", originalParams.getJobId());
+
+                    // copy and update the job parameters
+                    OpenJobAction.JobParams updatedParams = new OpenJobAction.JobParams(originalParams.getJobId());
+                    updatedParams.setTimeout(originalParams.getTimeout());
+                    updatedParams.setJob(job);
+
+                    // replace with the updated params
+                    taskBuilder.removeTask(jobTask.getId());
+                    taskBuilder.addTask(jobTask.getId(), jobTask.getTaskName(), updatedParams, jobTask.getAssignment());
+                } else {
+                    logger.error("cannot find job for task [{}]", jobTask.getId());
+                }
+            }
+        }
+
+        for (PersistentTasksCustomMetaData.PersistentTask datafeedTask : unallocatedDatafeedsTasks) {
+            StartDatafeedAction.DatafeedParams originalParams = (StartDatafeedAction.DatafeedParams) datafeedTask.getParams();
+
+            if (originalParams.getJobId() == null) {
+                DatafeedConfig datafeedConfig = datafeeds.get(originalParams.getDatafeedId());
+                if (datafeedConfig != null) {
+                    logger.debug("Updating persistent task params for datafeed [{}]", originalParams.getDatafeedId());
+
+                    StartDatafeedAction.DatafeedParams updatedParams =
+                            new StartDatafeedAction.DatafeedParams(originalParams.getDatafeedId(), originalParams.getStartTime());
+                    updatedParams.setTimeout(originalParams.getTimeout());
+                    updatedParams.setEndTime(originalParams.getEndTime());
+                    updatedParams.setJobId(datafeedConfig.getJobId());
+                    updatedParams.setDatafeedIndices(datafeedConfig.getIndices());
+
+                    // replace with the updated params
+                    taskBuilder.removeTask(datafeedTask.getId());
+                    taskBuilder.addTask(datafeedTask.getId(), datafeedTask.getTaskName(), updatedParams, datafeedTask.getAssignment());
+                } else {
+                    logger.error("cannot find datafeed for task [{}]", datafeedTask.getId());
+                }
+            }
+        }
+
+        return taskBuilder.build();
+    }
+
     static class RemovalResult {
         MlMetadata mlMetadata;
         List<String> removedJobIds;
@@ -281,20 +372,20 @@ public class MlConfigMigrator {
      * @return Structure tracking which jobs and datafeeds were actually removed
      * and the new MlMetadata
      */
-    static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<String> datafeedsToRemove, MlMetadata mlMetadata) {
+    static RemovalResult removeJobsAndDatafeeds(List<Job> jobsToRemove, List<DatafeedConfig> datafeedsToRemove, MlMetadata mlMetadata) {
         Map<String, Job> currentJobs = new HashMap<>(mlMetadata.getJobs());
         List<String> removedJobIds = new ArrayList<>();
-        for (String jobId : jobsToRemove) {
-            if (currentJobs.remove(jobId) != null) {
-                removedJobIds.add(jobId);
+        for (Job job : jobsToRemove) {
+            if (currentJobs.remove(job.getId()) != null) {
+                removedJobIds.add(job.getId());
             }
         }
 
         Map<String, DatafeedConfig> currentDatafeeds = new HashMap<>(mlMetadata.getDatafeeds());
         List<String> removedDatafeedIds = new ArrayList<>();
-        for (String datafeedId : datafeedsToRemove) {
-            if (currentDatafeeds.remove(datafeedId) != null) {
-                removedDatafeedIds.add(datafeedId);
+        for (DatafeedConfig datafeed : datafeedsToRemove) {
+            if (currentDatafeeds.remove(datafeed.getId()) != null) {
+                removedDatafeedIds.add(datafeed.getId());
             }
         }
 
@@ -441,15 +532,18 @@ public class MlConfigMigrator {
     }
 
     /**
-     * Find the configurations for all closed jobs in the cluster state.
-     * Closed jobs are those that do not have an associated persistent task.
+     * Find the configurations for all closed jobs and the jobs that
+     * do not have an allocation in the cluster state.
+     * Closed jobs are those that do not have an associated persistent task,
+     * unallocated jobs have a task but no executing node
      *
      * @param clusterState The cluster state
      * @return The closed job configurations
       */
-    public static List<Job> closedJobConfigs(ClusterState clusterState) {
+    public static List<Job> closedOrUnallocatedJobs(ClusterState clusterState) {
         PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
         Set<String> openJobIds = MlTasks.openJobIds(persistentTasks);
+        openJobIds.removeAll(MlTasks.unallocatedJobIds(persistentTasks, clusterState.nodes()));
 
         MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
         return mlMetadata.getJobs().values().stream()
@@ -458,15 +552,18 @@ public class MlConfigMigrator {
     }
 
     /**
-     * Find the configurations for stopped datafeeds in the cluster state.
-     * Stopped datafeeds are those that do not have an associated persistent task.
+     * Find the configurations for stopped datafeeds and datafeeds that do
+     * not have an allocation in the cluster state.
+     * Stopped datafeeds are those that do not have an associated persistent task,
+     * unallocated datafeeds have a task but no executing node.
      *
      * @param clusterState The cluster state
      * @return The closed job configurations
      */
-    public static List<DatafeedConfig> stoppedDatafeedConfigs(ClusterState clusterState) {
+    public static List<DatafeedConfig> stopppedOrUnallocatedDatafeeds(ClusterState clusterState) {
         PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
         Set<String> startedDatafeedIds = MlTasks.startedDatafeedIds(persistentTasks);
+        startedDatafeedIds.removeAll(MlTasks.unallocatedDatafeedIds(persistentTasks, clusterState.nodes()));
 
         MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
         return mlMetadata.getDatafeeds().values().stream()
@@ -489,8 +586,8 @@ public class MlConfigMigrator {
     }
 
     public static List<JobsAndDatafeeds> splitInBatches(ClusterState clusterState) {
-        Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
-        Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
+        Collection<DatafeedConfig> stoppedDatafeeds = stopppedOrUnallocatedDatafeeds(clusterState);
+        Map<String, Job> eligibleJobs = nonDeletingJobs(closedOrUnallocatedJobs(clusterState)).stream()
             .map(MlConfigMigrator::updateJobForMigration)
             .collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));
 
@@ -572,17 +669,15 @@ public class MlConfigMigrator {
         return failedDocumentIds;
     }
 
-    static List<String> filterFailedJobConfigWrites(Set<String> failedDocumentIds, List<Job> jobs) {
+    static List<Job> filterFailedJobConfigWrites(Set<String> failedDocumentIds, List<Job> jobs) {
         return jobs.stream()
-                .map(Job::getId)
-                .filter(id -> failedDocumentIds.contains(Job.documentId(id)) == false)
+                .filter(job -> failedDocumentIds.contains(Job.documentId(job.getId())) == false)
                 .collect(Collectors.toList());
     }
 
-    static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentIds, Collection<DatafeedConfig> datafeeds) {
+    static List<DatafeedConfig> filterFailedDatafeedConfigWrites(Set<String> failedDocumentIds, Collection<DatafeedConfig> datafeeds) {
         return datafeeds.stream()
-                .map(DatafeedConfig::getId)
-                .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
+                .filter(datafeed -> failedDocumentIds.contains(DatafeedConfig.documentId(datafeed.getId())) == false)
                 .collect(Collectors.toList());
     }
 }

+ 26 - 19
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -31,7 +31,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedSupplier;
-import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
@@ -98,6 +97,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
     private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT =
         new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment.");
 
+    static final PersistentTasksCustomMetaData.Assignment AWAITING_MIGRATION =
+            new PersistentTasksCustomMetaData.Assignment(null, "job cannot be assigned until it has been migrated.");
+
     private final XPackLicenseState licenseState;
     private final PersistentTasksService persistentTasksService;
     private final Client client;
@@ -142,13 +144,13 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
         }
     }
 
-    static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, @Nullable Job job,
+    static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, Job job,
                                                                             ClusterState clusterState,
                                                                             int maxConcurrentJobAllocations,
                                                                             int maxMachineMemoryPercent,
                                                                             MlMemoryTracker memoryTracker,
                                                                             Logger logger) {
-        String resultsIndexName = job != null ? job.getResultsIndexName() : null;
+        String resultsIndexName = job.getResultsIndexName();
         List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsIndexName, clusterState);
         if (unavailableIndices.size() != 0) {
             String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
@@ -199,23 +201,21 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
                 continue;
             }
 
-            if (job != null) {
-                Set<String> compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion());
-                if (compatibleJobTypes.contains(job.getJobType()) == false) {
-                    String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) +
-                            "], because this node does not support jobs of type [" + job.getJobType() + "]";
-                    logger.trace(reason);
-                    reasons.add(reason);
-                    continue;
-                }
+            Set<String> compatibleJobTypes = Job.getCompatibleJobTypes(node.getVersion());
+            if (compatibleJobTypes.contains(job.getJobType()) == false) {
+                String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) +
+                        "], because this node does not support jobs of type [" + job.getJobType() + "]";
+                logger.trace(reason);
+                reasons.add(reason);
+                continue;
+            }
 
-                if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) {
-                    String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " +
-                            "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher";
-                    logger.trace(reason);
-                    reasons.add(reason);
-                    continue;
-                }
+            if (jobHasRules(job) && node.getVersion().before(DetectionRule.VERSION_INTRODUCED)) {
+                String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndVersion(node) + "], because jobs using " +
+                        "custom_rules require a node of version [" + DetectionRule.VERSION_INTRODUCED + "] or higher";
+                logger.trace(reason);
+                reasons.add(reason);
+                continue;
             }
 
             long numberOfAssignedJobs = 0;
@@ -693,6 +693,13 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
 
         @Override
         public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) {
+
+            // If the task parameters do not have a job field then the job
+            // was first opened on a pre v6.6 node and has not been migrated
+            if (params.getJob() == null) {
+                return AWAITING_MIGRATION;
+            }
+
             PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
                 params.getJob(),
                 clusterState,

+ 4 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java

@@ -63,7 +63,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
             ActionListener<Boolean> listener = (ActionListener<Boolean>) invocation.getArguments()[1];
             listener.onResponse(Boolean.TRUE);
             return null;
-        }).when(configMigrator).migrateConfigsWithoutTasks(any(ClusterState.class), any(ActionListener.class));
+        }).when(configMigrator).migrateConfigs(any(ClusterState.class), any(ActionListener.class));
     }
 
     public void testClusterChanged_info() {
@@ -87,7 +87,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
                 .build();
         notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
         verify(auditor, times(1)).info(eq("job_id"), any());
-        verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());
+        verify(configMigrator, times(1)).migrateConfigs(eq(newState), any());
 
         // no longer master
         newState = ClusterState.builder(new ClusterName("_name"))
@@ -120,7 +120,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
                 .build();
         notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
         verify(auditor, times(1)).warning(eq("job_id"), any());
-        verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());
+        verify(configMigrator, times(1)).migrateConfigs(eq(newState), any());
 
         // no longer master
         newState = ClusterState.builder(new ClusterName("_name"))
@@ -153,7 +153,7 @@ public class MlAssignmentNotifierTests extends ESTestCase {
                 .build();
 
         notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
-        verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any());
+        verify(configMigrator, times(1)).migrateConfigs(any(), any());
         verifyNoMoreInteractions(auditor);
 
         // no longer master

+ 58 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java

@@ -270,6 +270,34 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
         assertTrue(check.jobIsEligibleForMigration(closedJob.getId(), clusterState));
     }
 
+    public void testJobIsEligibleForMigration_givenOpenAndUnallocatedJob() {
+        Job openJob = JobTests.buildJobBuilder("open-job").build();
+        MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false);
+
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
+        tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()),
+                new PersistentTasksCustomMetaData.Assignment(null, "no assignment"));
+
+        MetaData.Builder metaData = MetaData.builder();
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        addMlConfigIndex(metaData, routingTable);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
+                .metaData(metaData
+                        .putCustom(MlMetadata.TYPE, mlMetadata.build())
+                        .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
+                )
+                .routingTable(routingTable.build())
+                .build();
+
+        Settings settings = newSettings(true);
+        givenClusterSettings(settings);
+
+        MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+
+        assertTrue(check.jobIsEligibleForMigration(openJob.getId(), clusterState));
+    }
+
     public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() {
         // mixed 6.5 and 6.6 nodes
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
@@ -371,6 +399,36 @@ public class MlConfigMigrationEligibilityCheckTests extends ESTestCase {
         assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState));
     }
 
+    public void testDatafeedIsEligibleForMigration_givenUnallocatedDatafeed() {
+        Job job = JobTests.buildJobBuilder("closed-job").build();
+        MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false);
+        mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap());
+        String datafeedId = "df-" + job.getId();
+
+        MetaData.Builder metaData = MetaData.builder();
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        addMlConfigIndex(metaData, routingTable);
+
+        PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(datafeedId, 0L),
+                new PersistentTasksCustomMetaData.Assignment(null, "no assignment"));
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
+                .metaData(metaData
+                        .putCustom(MlMetadata.TYPE, mlMetadata.build())
+                        .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
+                .routingTable(routingTable.build())
+                .build();
+
+        Settings settings = newSettings(true);
+        givenClusterSettings(settings);
+
+        MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService);
+
+        assertTrue(check.datafeedIsEligibleForMigration(datafeedId, clusterState));
+    }
+
     private void givenClusterSettings(Settings settings) {
         ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList(
                 MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)));

+ 171 - 48
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java

@@ -11,7 +11,10 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
@@ -23,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobTests;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,6 +41,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -50,74 +55,82 @@ public class MlConfigMigratorTests extends ESTestCase {
         assertThat(MlConfigMigrator.nonDeletingJobs(Arrays.asList(job1, job2, deletingJob)), containsInAnyOrder(job1, job2));
     }
 
-    public void testClosedJobConfigs() {
-        Job openJob1 = JobTests.buildJobBuilder("openjob1").build();
-        Job openJob2 = JobTests.buildJobBuilder("openjob2").build();
+    public void testClosedOrUnallocatedJobs() {
+        Job closedJob = JobTests.buildJobBuilder("closedjob").build();
+        Job jobWithoutAllocation = JobTests.buildJobBuilder("jobwithoutallocation").build();
+        Job openJob = JobTests.buildJobBuilder("openjob").build();
 
         MlMetadata.Builder mlMetadata = new MlMetadata.Builder()
-                .putJob(openJob1, false)
-                .putJob(openJob2, false)
-                .putDatafeed(createCompatibleDatafeed(openJob1.getId()), Collections.emptyMap());
-
-        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
-                .metaData(MetaData.builder()
-                        .putCustom(MlMetadata.TYPE, mlMetadata.build())
-                        .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build())
-                )
-                .build();
-
-        assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob1, openJob2));
+                .putJob(closedJob, false)
+                .putJob(jobWithoutAllocation, false)
+                .putJob(openJob, false)
+                .putDatafeed(createCompatibleDatafeed(closedJob.getId()), Collections.emptyMap());
 
         PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
-        tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
-                new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
+        tasksBuilder.addTask(MlTasks.jobTaskId("jobwithoutallocation"), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams("jobwithoutallocation"),
+                new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
+        tasksBuilder.addTask(MlTasks.jobTaskId("openjob"), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams("openjob"),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node1")
+                .masterNodeId("node1")
+                .build();
 
-        clusterState = ClusterState.builder(new ClusterName("migratortests"))
+        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
                 .metaData(MetaData.builder()
                         .putCustom(MlMetadata.TYPE, mlMetadata.build())
                         .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
                 )
+                .nodes(nodes)
                 .build();
 
-        assertThat(MlConfigMigrator.closedJobConfigs(clusterState), containsInAnyOrder(openJob2));
+        assertThat(MlConfigMigrator.closedOrUnallocatedJobs(clusterState), containsInAnyOrder(closedJob, jobWithoutAllocation));
     }
 
     public void testStoppedDatafeedConfigs() {
-        Job openJob1 = JobTests.buildJobBuilder("openjob1").build();
-        Job openJob2 = JobTests.buildJobBuilder("openjob2").build();
-        DatafeedConfig datafeedConfig1 = createCompatibleDatafeed(openJob1.getId());
-        DatafeedConfig datafeedConfig2 = createCompatibleDatafeed(openJob2.getId());
-        MlMetadata.Builder mlMetadata = new MlMetadata.Builder()
-                .putJob(openJob1, false)
-                .putJob(openJob2, false)
-                .putDatafeed(datafeedConfig1, Collections.emptyMap())
-                .putDatafeed(datafeedConfig2, Collections.emptyMap());
-
-        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
-                .metaData(MetaData.builder()
-                        .putCustom(MlMetadata.TYPE, mlMetadata.build())
-                        .putCustom(PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData.builder().build())
-                )
-                .build();
-
-        assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig1, datafeedConfig2));
+        Job job1 = JobTests.buildJobBuilder("job1").build();
+        Job job2 = JobTests.buildJobBuilder("job2").build();
+        Job job3 = JobTests.buildJobBuilder("job3").build();
+        DatafeedConfig stopppedDatafeed = createCompatibleDatafeed(job1.getId());
+        DatafeedConfig datafeedWithoutAllocation = createCompatibleDatafeed(job2.getId());
+        DatafeedConfig startedDatafeed = createCompatibleDatafeed(job3.getId());
 
+        MlMetadata.Builder mlMetadata = new MlMetadata.Builder()
+                .putJob(job1, false)
+                .putJob(job2, false)
+                .putJob(job3, false)
+                .putDatafeed(stopppedDatafeed, Collections.emptyMap())
+                .putDatafeed(datafeedWithoutAllocation, Collections.emptyMap())
+                .putDatafeed(startedDatafeed, Collections.emptyMap());
 
         PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
-        tasksBuilder.addTask(MlTasks.jobTaskId("openjob1"), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams("foo-1"),
-                new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
-        tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedConfig1.getId()), MlTasks.DATAFEED_TASK_NAME,
-                new StartDatafeedAction.DatafeedParams(datafeedConfig1.getId(), 0L),
-                new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(stopppedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L),
+                new PersistentTasksCustomMetaData.Assignment(null, "test assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(startedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(stopppedDatafeed.getId(), 0L),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node1")
+                .masterNodeId("node1")
+                .build();
 
-        clusterState = ClusterState.builder(new ClusterName("migratortests"))
+        ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests"))
                 .metaData(MetaData.builder()
                         .putCustom(MlMetadata.TYPE, mlMetadata.build())
                         .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())
                 )
+                .nodes(nodes)
                 .build();
 
-        assertThat(MlConfigMigrator.stoppedDatafeedConfigs(clusterState), containsInAnyOrder(datafeedConfig2));
+        assertThat(MlConfigMigrator.stopppedOrUnallocatedDatafeeds(clusterState),
+                containsInAnyOrder(stopppedDatafeed, datafeedWithoutAllocation));
     }
 
     public void testUpdateJobForMigration() {
@@ -155,7 +168,7 @@ public class MlConfigMigratorTests extends ESTestCase {
 
         assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.emptySet(), jobs), hasSize(3));
         assertThat(MlConfigMigrator.filterFailedJobConfigWrites(Collections.singleton(Job.documentId("bar")), jobs),
-                contains("foo", "baz"));
+                contains(jobs.get(0), jobs.get(2)));
     }
 
     public void testFilterFailedDatafeedConfigWrites() {
@@ -166,7 +179,7 @@ public class MlConfigMigratorTests extends ESTestCase {
 
         assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.emptySet(), datafeeds), hasSize(3));
         assertThat(MlConfigMigrator.filterFailedDatafeedConfigWrites(Collections.singleton(DatafeedConfig.documentId("df-foo")), datafeeds),
-                contains("df-bar", "df-baz"));
+                contains(datafeeds.get(1), datafeeds.get(2)));
     }
 
     public void testDocumentsNotWritten() {
@@ -197,7 +210,7 @@ public class MlConfigMigratorTests extends ESTestCase {
                 .putDatafeed(datafeedConfig2, Collections.emptyMap());
 
         MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds(
-                Arrays.asList("job1", "job2"), Arrays.asList("df-job1", "df-job2"), mlMetadata.build());
+                Arrays.asList(job1, job2), Arrays.asList(datafeedConfig1, datafeedConfig2), mlMetadata.build());
 
         assertThat(removalResult.mlMetadata.getJobs().keySet(), empty());
         assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), empty());
@@ -215,7 +228,8 @@ public class MlConfigMigratorTests extends ESTestCase {
                 .putDatafeed(datafeedConfig1, Collections.emptyMap());
 
         MlConfigMigrator.RemovalResult removalResult = MlConfigMigrator.removeJobsAndDatafeeds(
-                Arrays.asList("job1", "job-none"), Collections.singletonList("df-none"), mlMetadata.build());
+                Arrays.asList(job1, JobTests.buildJobBuilder("job-none").build()),
+                Collections.singletonList(createCompatibleDatafeed("job-none")), mlMetadata.build());
 
         assertThat(removalResult.mlMetadata.getJobs().keySet(), contains("job2"));
         assertThat(removalResult.mlMetadata.getDatafeeds().keySet(), contains("df-job1"));
@@ -300,6 +314,115 @@ public class MlConfigMigratorTests extends ESTestCase {
         assertThat(jobsAndDatafeeds.jobs, empty());
     }
 
+    public void testRewritePersistentTaskParams() {
+        Map<String, Job> jobs = new HashMap<>();
+        Job closedJob = JobTests.buildJobBuilder("closed-job").build();
+        Job unallocatedJob = JobTests.buildJobBuilder("job-to-update").build();
+        Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build();
+        jobs.put(closedJob.getId(), closedJob);
+        jobs.put(unallocatedJob.getId(), unallocatedJob);
+        jobs.put(allocatedJob.getId(), allocatedJob);
+
+        Map<String, DatafeedConfig> datafeeds = new HashMap<>();
+        DatafeedConfig stoppedDatafeed = createCompatibleDatafeed(closedJob.getId());
+        DatafeedConfig unallocatedDatafeed = createCompatibleDatafeed(unallocatedJob.getId());
+        DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId());
+        datafeeds.put(stoppedDatafeed.getId(), stoppedDatafeed);
+        datafeeds.put(unallocatedDatafeed.getId(), unallocatedDatafeed);
+        datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed);
+
+        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        // job tasks
+        tasksBuilder.addTask(MlTasks.jobTaskId(unallocatedJob.getId()), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams(unallocatedJob.getId()),
+                new PersistentTasksCustomMetaData.Assignment(null, "no assignment"));
+        tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams(allocatedJob.getId()),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+        // datafeed tasks
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(unallocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(unallocatedDatafeed.getId(), 0L),
+                new PersistentTasksCustomMetaData.Assignment(null, "no assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+
+        PersistentTasksCustomMetaData originalTasks = tasksBuilder.build();
+        OpenJobAction.JobParams originalUnallocatedTaskParams = (OpenJobAction.JobParams) originalTasks.getTask(
+                MlTasks.jobTaskId(unallocatedJob.getId())).getParams();
+        assertNull(originalUnallocatedTaskParams.getJob());
+        StartDatafeedAction.DatafeedParams originalUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask(
+                MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams();
+        assertNull(originalUnallocatedDatafeedParams.getJobId());
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node1")
+                .masterNodeId("node1")
+                .build();
+
+        PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes);
+
+        // The unallocated task should be modifed
+        OpenJobAction.JobParams modifedUnallocatedTaskParams =
+                (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(unallocatedJob.getId())).getParams();
+        assertNotEquals(originalUnallocatedTaskParams, modifedUnallocatedTaskParams);
+        assertEquals(unallocatedJob, modifedUnallocatedTaskParams.getJob());
+
+        // the allocated task should not be modified
+        OpenJobAction.JobParams allocatedJobParams =
+                (OpenJobAction.JobParams) modifedTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams();
+        assertEquals(null, allocatedJobParams.getJob());
+        OpenJobAction.JobParams originalAllocatedJobParams =
+                (OpenJobAction.JobParams) originalTasks.getTask(MlTasks.jobTaskId(allocatedJob.getId())).getParams();
+        assertEquals(originalAllocatedJobParams, allocatedJobParams);
+
+
+        // unallocated datafeed should be updated
+        StartDatafeedAction.DatafeedParams modifiedUnallocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask(
+                MlTasks.datafeedTaskId(unallocatedDatafeed.getId())).getParams();
+        assertNotEquals(originalUnallocatedDatafeedParams, modifiedUnallocatedDatafeedParams);
+        assertEquals(unallocatedDatafeed.getJobId(), modifiedUnallocatedDatafeedParams.getJobId());
+        assertEquals(unallocatedDatafeed.getIndices(), modifiedUnallocatedDatafeedParams.getDatafeedIndices());
+
+        // allocated datafeed will not be updated
+        StartDatafeedAction.DatafeedParams allocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) modifedTasks.getTask(
+                MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams();
+        assertNull(allocatedDatafeedParams.getJobId());
+        assertThat(allocatedDatafeedParams.getDatafeedIndices(), empty());
+        StartDatafeedAction.DatafeedParams originalAllocatedDatafeedParams = (StartDatafeedAction.DatafeedParams) originalTasks.getTask(
+                MlTasks.datafeedTaskId(allocatedDatafeed.getId())).getParams();
+        assertEquals(originalAllocatedDatafeedParams, allocatedDatafeedParams);
+    }
+
+    public void testRewritePersistentTaskParams_GivenNoUnallocatedTasks() {
+        Map<String, Job> jobs = new HashMap<>();
+        Job allocatedJob = JobTests.buildJobBuilder("allocated-job").build();
+        jobs.put(allocatedJob.getId(), allocatedJob);
+
+        Map<String, DatafeedConfig> datafeeds = new HashMap<>();
+        DatafeedConfig allocatedDatafeed = createCompatibleDatafeed(allocatedJob.getId());
+        datafeeds.put(allocatedDatafeed.getId(), allocatedDatafeed);
+
+        PersistentTasksCustomMetaData.Builder tasksBuilder =  PersistentTasksCustomMetaData.builder();
+        tasksBuilder.addTask(MlTasks.jobTaskId(allocatedJob.getId()), MlTasks.JOB_TASK_NAME,
+                new OpenJobAction.JobParams(allocatedJob.getId()),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+        tasksBuilder.addTask(MlTasks.datafeedTaskId(allocatedDatafeed.getId()), MlTasks.DATAFEED_TASK_NAME,
+                new StartDatafeedAction.DatafeedParams(allocatedDatafeed.getId(), 0L),
+                new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
+
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+                .add(new DiscoveryNode("node1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
+                .localNodeId("node1")
+                .masterNodeId("node1")
+                .build();
+
+        PersistentTasksCustomMetaData originalTasks = tasksBuilder.build();
+        PersistentTasksCustomMetaData modifedTasks = MlConfigMigrator.rewritePersistentTaskParams(jobs, datafeeds, originalTasks, nodes);
+        assertThat(originalTasks, sameInstance(modifedTasks));
+    }
+
     private DatafeedConfig createCompatibleDatafeed(String jobId) {
         // create a datafeed without aggregations or anything
         // else that may cause validation errors

+ 21 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

@@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -23,6 +24,8 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -52,6 +55,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFiel
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
 import org.junit.Before;
@@ -59,9 +63,11 @@ import org.junit.Before;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -577,6 +583,21 @@ public class TransportOpenJobActionTests extends ESTestCase {
         assertThat(OpenJobAction.JobTaskMatcher.match(jobTask2, "ml-2"), is(true));
     }
 
+    public void testGetAssignment_GivenJobThatRequiresMigration() {
+        ClusterService clusterService = mock(ClusterService.class);
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(
+                Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
+                        MachineLearning.MAX_LAZY_ML_NODES)
+        ));
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+
+        TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
+                Settings.EMPTY, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
+
+        OpenJobAction.JobParams params = new OpenJobAction.JobParams("missing_job_field");
+        assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class)));
+    }
+
     public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
         addJobTask(jobId, nodeId, jobState, builder, false);
     }

+ 6 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

@@ -121,10 +121,10 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
     }
 
     public void testMigrateConfigs() throws InterruptedException, IOException {
-        // and jobs and datafeeds clusterstate
         MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
         mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
         mlMetadata.putJob(buildJobBuilder("job-bar").build(), false);
+
         DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo");
         builder.setIndices(Collections.singletonList("beats*"));
         mlMetadata.putDatafeed(builder.build(), Collections.emptyMap());
@@ -149,7 +149,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
         // do the migration
         MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
         // the first time this is called mlmetadata will be snap-shotted
-        blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
+        blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
                 responseHolder, exceptionHolder);
 
         assertNull(exceptionHolder.get());
@@ -214,7 +214,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
 
         // do the migration
         MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
-        blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
+        blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
             responseHolder, exceptionHolder);
 
         assertNull(exceptionHolder.get());
@@ -252,7 +252,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
 
         // do the migration
         MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
-        blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
+        blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
             responseHolder, exceptionHolder);
 
         assertNull(exceptionHolder.get());
@@ -285,7 +285,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
 
         // do the migration
         MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService);
-        blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
+        blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
                 responseHolder, exceptionHolder);
 
         assertNull(exceptionHolder.get());
@@ -361,7 +361,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
 
         // if the cluster state has a job config and the index does not
         // exist it should be created
-        blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
+        blockingCall(actionListener -> mlConfigMigrator.migrateConfigs(clusterState, actionListener),
                 responseHolder, exceptionHolder);
 
         assertBusy(() -> assertTrue(configIndexExists()));

+ 74 - 32
x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
+import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@@ -24,6 +25,7 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +37,8 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
 
 public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase {
 
+    private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job";
+    private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed";
     private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job";
     private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed";
 
@@ -102,13 +106,42 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
         Request putStoppedDatafeed = new Request("PUT", "/_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID);
         putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build()));
         client().performRequest(putStoppedDatafeed);
+
+        // open job and started datafeed
+        Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID);
+        openJob.setAnalysisConfig(analysisConfig);
+        openJob.setDataDescription(new DataDescription.Builder());
+        Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID);
+        putOpenJob.setJsonEntity(Strings.toString(openJob));
+        client().performRequest(putOpenJob);
+
+        Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open");
+        client().performRequest(openOpenJob);
+
+        DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID);
+        if (getOldClusterVersion().before(Version.V_6_6_0)) {
+            dfBuilder.setDelayedDataCheckConfig(null);
+        }
+        dfBuilder.setIndices(Collections.singletonList("airline-data"));
+
+        Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
+        putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
+        client().performRequest(putDatafeed);
+
+        Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start");
+        client().performRequest(startDatafeed);
     }
 
     private void upgradedClusterTests() throws Exception {
-        // wait for the closed job and datafeed to be migrated
-        waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID),
-                Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID),
-                Collections.emptyList(), Collections.emptyList());
+        // wait for the closed and open jobs and datafeed to be migrated
+        waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID),
+                Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID));
+
+        waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID);
+        waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID);
+        // The persistent task params for the job & datafeed left open
+        // during upgrade should be updated with new fields
+        checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID);
 
         // open the migrated job and datafeed
         Request openJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open");
@@ -154,8 +187,7 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
     }
 
     @SuppressWarnings("unchecked")
-    private void waitForMigration(List<String> expectedMigratedJobs, List<String> expectedMigratedDatafeeds,
-                                  List<String> unMigratedJobs, List<String> unMigratedDatafeeds) throws Exception {
+    private void waitForMigration(List<String> expectedMigratedJobs, List<String> expectedMigratedDatafeeds) throws Exception {
 
         // After v6.6.0 jobs are created in the index so no migration will take place
         if (getOldClusterVersion().onOrAfter(Version.V_6_6_0)) {
@@ -170,48 +202,58 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartT
 
             List<Map<String, Object>> jobs =
                     (List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap);
-            assertNotNull(jobs);
-
-            for (String jobId : expectedMigratedJobs) {
-                assertJob(jobId, jobs, false);
-            }
 
-            for (String jobId : unMigratedJobs) {
-                assertJob(jobId, jobs, true);
+            if (jobs != null) {
+                for (String jobId : expectedMigratedJobs) {
+                    assertJobNotPresent(jobId, jobs);
+                }
             }
 
             List<Map<String, Object>> datafeeds =
                     (List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap);
-            assertNotNull(datafeeds);
 
-            for (String datafeedId : expectedMigratedDatafeeds) {
-                assertDatafeed(datafeedId, datafeeds, false);
+            if (datafeeds != null) {
+                for (String datafeedId : expectedMigratedDatafeeds) {
+                    assertDatafeedNotPresent(datafeedId, datafeeds);
+                }
             }
+        }, 30, TimeUnit.SECONDS);
+    }
 
-            for (String datafeedId : unMigratedDatafeeds) {
-                assertDatafeed(datafeedId, datafeeds, true);
+    @SuppressWarnings("unchecked")
+    private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception {
+        Request getClusterState = new Request("GET", "/_cluster/state/metadata");
+        Response response = client().performRequest(getClusterState);
+        Map<String, Object> responseMap = entityAsMap(response);
+
+        List<Map<String, Object>> tasks =
+                (List<Map<String, Object>>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", responseMap);
+        assertNotNull(tasks);
+        for (Map<String, Object> task : tasks) {
+            String id = (String) task.get("id");
+            assertNotNull(id);
+            if (id.equals(MlTasks.jobTaskId(jobId))) {
+                Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task);
+                assertNotNull(jobParam);
             }
-
-        }, 30, TimeUnit.SECONDS);
+            else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) {
+                Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task);
+                assertNotNull(jobIdParam);
+                Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task);
+                assertNotNull(indices);
+            }
+        }
     }
 
-    private void assertDatafeed(String datafeedId, List<Map<String, Object>> datafeeds, boolean expectedToBePresent) {
+    private void assertDatafeedNotPresent(String datafeedId, List<Map<String, Object>> datafeeds) {
         Optional<Object> config = datafeeds.stream().map(map -> map.get("datafeed_id"))
                 .filter(id -> id.equals(datafeedId)).findFirst();
-        if (expectedToBePresent) {
-            assertTrue(config.isPresent());
-        } else {
-            assertFalse(config.isPresent());
-        }
+        assertFalse(config.isPresent());
     }
 
-    private void assertJob(String jobId, List<Map<String, Object>> jobs, boolean expectedToBePresent) {
+    private void assertJobNotPresent(String jobId, List<Map<String, Object>> jobs) {
         Optional<Object> config = jobs.stream().map(map -> map.get("job_id"))
                 .filter(id -> id.equals(jobId)).findFirst();
-        if (expectedToBePresent) {
-            assertTrue(config.isPresent());
-        } else {
-            assertFalse(config.isPresent());
-        }
+        assertFalse(config.isPresent());
     }
 }