Browse Source

ML: creating ML State write alias and pointing writes there (#37483)

* ML: creating ML State write alias and pointing writes there

* Moving alias check to openJob method

* adjusting concrete index lookup for ml-state
Benjamin Trent 6 years ago
parent
commit
5384162a42
16 changed files with 265 additions and 121 deletions
  1. 83 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java
  2. 2 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java
  3. 3 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java
  4. 1 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java
  5. 5 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  6. 10 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java
  7. 4 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java
  8. 30 17
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java
  9. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java
  10. 51 47
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java
  11. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java
  12. 5 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java
  13. 29 12
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java
  14. 32 19
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java
  15. 2 1
      x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java
  16. 5 0
      x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml

+ 83 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

@@ -5,6 +5,24 @@
  */
 package org.elasticsearch.xpack.core.ml.job.persistence;
 
+import org.elasticsearch.ResourceAlreadyExistsException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
 /**
  * Methods for handling index naming related functions
  */
@@ -40,11 +58,11 @@ public final class AnomalyDetectorsIndex {
     }
 
     /**
-     * The name of the default index where a job's state is stored
-     * @return The index name
+     * The name of the alias pointing to the appropriate index for writing job state
+     * @return The write alias name
      */
-    public static String jobStateIndexName() {
-        return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX;
+    public static String jobStateIndexWriteAlias() {
+        return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write";
     }
 
     /**
@@ -64,4 +82,65 @@ public final class AnomalyDetectorsIndex {
         return AnomalyDetectorsIndexFields.CONFIG_INDEX;
     }
 
+    /**
+     * Create the .ml-state index (if necessary)
+     * Create the .ml-state-write alias for the .ml-state index (if necessary)
+     */
+    public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {
+
+        if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
+            finalListener.onResponse(false);
+            return;
+        }
+
+        final ActionListener<String> createAliasListener = ActionListener.wrap(
+            concreteIndexName -> {
+                final IndicesAliasesRequest request = client.admin()
+                    .indices()
+                    .prepareAliases()
+                    .addAlias(concreteIndexName, jobStateIndexWriteAlias())
+                    .request();
+                executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+                    ML_ORIGIN,
+                    request,
+                    ActionListener.<AcknowledgedResponse>wrap(
+                        resp -> finalListener.onResponse(resp.isAcknowledged()),
+                        finalListener::onFailure),
+                    client.admin().indices()::aliases);
+            },
+            finalListener::onFailure
+        );
+
+        IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
+        String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state,
+            IndicesOptions.lenientExpandOpen(),
+            jobStateIndexPattern());
+        if (stateIndices.length > 0) {
+            Arrays.sort(stateIndices, Collections.reverseOrder());
+            createAliasListener.onResponse(stateIndices[0]);
+        } else {
+            CreateIndexRequest createIndexRequest = client.admin()
+                .indices()
+                .prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
+                .addAlias(new Alias(jobStateIndexWriteAlias()))
+                .request();
+            executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+                ML_ORIGIN,
+                createIndexRequest,
+                ActionListener.<CreateIndexResponse>wrap(
+                    createIndexResponse -> finalListener.onResponse(true),
+                    createIndexFailure -> {
+                        // If it was created between our last check, and this request being handled, we should add the alias
+                        // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
+                        // as well.
+                        if (createIndexFailure instanceof ResourceAlreadyExistsException) {
+                            createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
+                        } else {
+                            finalListener.onFailure(createIndexFailure);
+                        }
+                    }),
+                client.admin().indices()::create);
+        }
+    }
+
 }

+ 2 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java

@@ -73,7 +73,6 @@ import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
-import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
@@ -762,7 +761,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
 
         assertNoAccessAllowed(role, "foo");
         assertOnlyReadAllowed(role, MlMetaIndex.INDEX_NAME);
-        assertOnlyReadAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
+        assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
         assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
         assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
     }
@@ -814,7 +813,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
 
         assertNoAccessAllowed(role, "foo");
         assertNoAccessAllowed(role, MlMetaIndex.INDEX_NAME);
-        assertNoAccessAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
+        assertNoAccessAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
         assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
         assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
     }

+ 3 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.MlMetaIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 
 import java.io.IOException;
@@ -30,13 +31,13 @@ public final class XPackRestTestHelper {
     public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
             Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
                     MlMetaIndex.INDEX_NAME,
-                    AnomalyDetectorsIndex.jobStateIndexName(),
+                    AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
                     AnomalyDetectorsIndex.jobResultsIndexPrefix()));
 
     public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
             Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
                     MlMetaIndex.INDEX_NAME,
-                    AnomalyDetectorsIndex.jobStateIndexName(),
+                    AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
                     AnomalyDetectorsIndex.jobResultsIndexPrefix(),
                     AnomalyDetectorsIndex.configIndexName()));
 

+ 1 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

@@ -180,7 +180,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         for (int i = 0; i < 10010; i++) {
             String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
-            IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
+            IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
             indexRequest.source(Collections.emptyMap());
             bulkRequestBuilder.add(indexRequest);
         }

+ 5 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -109,6 +109,7 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
 import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
@@ -701,7 +702,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
             }
 
             try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
-                IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName())
+                IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
                         .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
                         // TODO review these settings
                         .settings(Settings.builder()
@@ -710,9 +711,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
                         .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(stateMapping))
                         .version(Version.CURRENT.id)
                         .build();
-                templates.put(AnomalyDetectorsIndex.jobStateIndexName(), stateTemplate);
+                templates.put(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, stateTemplate);
             } catch (IOException e) {
-                logger.error("Error loading the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
+                logger.error("Error loading the template for the " + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + " index", e);
             }
 
             try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping()) {
@@ -742,7 +743,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
     public static boolean allTemplatesInstalled(ClusterState clusterState) {
         boolean allPresent = true;
         List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
-                AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
+                AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
         for (String templateName : templateNames) {
             allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
         }

+ 10 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

@@ -439,7 +439,7 @@ public class MlConfigMigrator {
 
         logger.debug("taking a snapshot of ml_metadata");
         String documentId = "ml-config";
-        IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
+        IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
                 ElasticsearchMappings.DOC_TYPE, documentId)
                 .setOpType(DocWriteRequest.OpType.CREATE);
 
@@ -456,8 +456,10 @@ public class MlConfigMigrator {
             return;
         }
 
-        executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
-                ActionListener.<IndexResponse>wrap(
+        AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap(
+            r -> {
+                executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
+                    ActionListener.<IndexResponse>wrap(
                         indexResponse -> {
                             listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
                         },
@@ -469,8 +471,11 @@ public class MlConfigMigrator {
                                 listener.onFailure(e);
                             }
                         }),
-                client::index
-        );
+                    client::index
+                );
+            },
+            listener::onFailure
+        ));
     }
 
     private void createConfigIndex(ActionListener<Boolean> listener) {

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -529,7 +529,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             // Try adding state doc mapping
             ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
                     response -> {
-                        addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
+                        addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping,
                                 state, jobUpdateListener);
                     }, listener::onFailure
             );
@@ -673,6 +673,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
         private volatile int maxConcurrentJobAllocations;
         private volatile int maxMachineMemoryPercent;
         private volatile int maxLazyMLNodes;
+        private volatile ClusterState clusterState;
 
         public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
                                               AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker,
@@ -689,6 +690,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             clusterService.getClusterSettings()
                     .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
             clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
+            clusterService.addListener(event -> clusterState = event.state());
         }
 
         @Override
@@ -748,7 +750,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             }
 
             String jobId = jobTask.getJobId();
-            autodetectProcessManager.openJob(jobTask, e2 -> {
+            autodetectProcessManager.openJob(jobTask, clusterState, e2 -> {
                 if (e2 == null) {
                     FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
                     executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,

+ 30 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -79,26 +80,38 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
         logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
                 request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());
 
-        jobManager.jobExists(request.getJobId(), ActionListener.wrap(
-                exists -> {
-                    PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
-                    JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
 
-                    if (jobState.equals(JobState.CLOSED) == false) {
-                        throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
+        // 3. Revert the state
+        ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
+            exists -> {
+                PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+                JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
+
+                if (jobState.equals(JobState.CLOSED) == false) {
+                    throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
+                }
+
+                getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
+                    ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
+                    if (request.getDeleteInterveningResults()) {
+                        wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
+                        wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
                     }
+                    jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
+                }, listener::onFailure);
+            },
+            listener::onFailure
+        );
+
+
+        // 2. Verify the job exists
+        ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
+            r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
+            listener::onFailure
+        );
 
-                    getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
-                        ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
-                        if (request.getDeleteInterveningResults()) {
-                            wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
-                            wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
-                        }
-                        jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
-                    }, listener::onFailure);
-                },
-                listener::onFailure
-        ));
+        // 1. Verify/Create the state index and its alias exists
+        AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener);
     }
 
     private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

@@ -228,7 +228,7 @@ public class JobResultsPersister {
      */
     public void persistQuantiles(Quantiles quantiles) {
         Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
-        persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet();
+        persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet();
     }
 
     /**
@@ -237,7 +237,7 @@ public class JobResultsPersister {
     public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) {
         Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
         persistable.setRefreshPolicy(refreshPolicy);
-        persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener);
+        persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), listener);
     }
 
     /**

+ 51 - 47
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.collect.Tuple;
@@ -38,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@@ -412,68 +414,70 @@ public class AutodetectProcessManager {
         }
     }
 
-    public void openJob(JobTask jobTask, Consumer<Exception> closeHandler) {
+    public void openJob(JobTask jobTask, ClusterState clusterState, Consumer<Exception> closeHandler) {
         String jobId = jobTask.getJobId();
         logger.info("Opening job [{}]", jobId);
-
-        jobManager.getJob(jobId, ActionListener.wrap(
-                job -> {
-                    if (job.getJobVersion() == null) {
-                        closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId
+        AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, ActionListener.wrap(
+            r -> {
+                jobManager.getJob(jobId, ActionListener.wrap(
+                    job -> {
+                        if (job.getJobVersion() == null) {
+                            closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId
                                 + "] because jobs created prior to version 5.5 are not supported"));
-                        return;
-                    }
-
+                            return;
+                        }
 
-                    processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
-                    jobResultsProvider.getAutodetectParams(job, params -> {
-                        // We need to fork, otherwise we restore model state from a network thread (several GET api calls):
-                        threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
-                            @Override
-                            public void onFailure(Exception e) {
-                                closeHandler.accept(e);
-                            }
 
-                            @Override
-                            protected void doRun() {
-                                ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
-                                if (processContext == null) {
-                                    logger.debug("Aborted opening job [{}] as it has been closed", jobId);
-                                    return;
+                        processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
+                        jobResultsProvider.getAutodetectParams(job, params -> {
+                            // We need to fork, otherwise we restore model state from a network thread (several GET api calls):
+                            threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() {
+                                @Override
+                                public void onFailure(Exception e) {
+                                    closeHandler.accept(e);
                                 }
-                                if (processContext.getState() !=  ProcessContext.ProcessStateName.NOT_RUNNING) {
-                                    logger.debug("Cannot open job [{}] when its state is [{}]",
+
+                                @Override
+                                protected void doRun() {
+                                    ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
+                                    if (processContext == null) {
+                                        logger.debug("Aborted opening job [{}] as it has been closed", jobId);
+                                        return;
+                                    }
+                                    if (processContext.getState() !=  ProcessContext.ProcessStateName.NOT_RUNNING) {
+                                        logger.debug("Cannot open job [{}] when its state is [{}]",
                                             jobId, processContext.getState().getClass().getName());
-                                    return;
-                                }
+                                        return;
+                                    }
 
-                                try {
-                                    createProcessAndSetRunning(processContext, job, params, closeHandler);
-                                    processContext.getAutodetectCommunicator().init(params.modelSnapshot());
-                                    setJobState(jobTask, JobState.OPENED);
-                                } catch (Exception e1) {
-                                    // No need to log here as the persistent task framework will log it
                                     try {
-                                        // Don't leave a partially initialised process hanging around
-                                        processContext.newKillBuilder()
+                                        createProcessAndSetRunning(processContext, job, params, closeHandler);
+                                        processContext.getAutodetectCommunicator().init(params.modelSnapshot());
+                                        setJobState(jobTask, JobState.OPENED);
+                                    } catch (Exception e1) {
+                                        // No need to log here as the persistent task framework will log it
+                                        try {
+                                            // Don't leave a partially initialised process hanging around
+                                            processContext.newKillBuilder()
                                                 .setAwaitCompletion(false)
                                                 .setFinish(false)
                                                 .kill();
-                                        processByAllocation.remove(jobTask.getAllocationId());
-                                    } finally {
-                                        setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
+                                            processByAllocation.remove(jobTask.getAllocationId());
+                                        } finally {
+                                            setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
+                                        }
                                     }
                                 }
-                            }
+                            });
+                        }, e1 -> {
+                            logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
+                            setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
                         });
-                    }, e1 -> {
-                        logger.warn("Failed to gather information required to open job [" + jobId + "]", e1);
-                        setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1));
-                    });
-                },
-                closeHandler
-        ));
-
+                    },
+                    closeHandler
+                ));
+            },
+            closeHandler));
     }
 
     private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer<Exception> handler) {

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java

@@ -98,7 +98,7 @@ public class AutodetectStateProcessor implements StateProcessor {
 
     void persist(BytesReference bytes) throws IOException {
         BulkRequest bulkRequest = new BulkRequest();
-        bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON);
+        bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON);
         if (bulkRequest.numberOfActions() > 0) {
             LOGGER.trace("[{}] Persisting job state document", jobId);
             try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {

+ 5 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -614,7 +615,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
 
     private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
         List<String> indices = new ArrayList<>();
-        indices.add(AnomalyDetectorsIndex.jobStateIndexName());
+        indices.add(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
         indices.add(MlMetaIndex.INDEX_NAME);
         indices.add(AuditorField.NOTIFICATIONS_INDEX);
         indices.add(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
@@ -625,6 +626,9 @@ public class TransportOpenJobActionTests extends ESTestCase {
                     .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
                     .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
             );
+            if (indexName.equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) {
+                indexMetaData.putAlias(new AliasMetaData.Builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias()));
+            }
             metaData.put(indexMetaData);
             Index index = new Index(indexName, "_uuid");
             ShardId shardId = new ShardId(index, 0);

+ 29 - 12
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

@@ -10,10 +10,12 @@ import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.metadata.AliasOrIndex;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -52,6 +54,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
@@ -74,7 +78,13 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
         clusterService = mock(ClusterService.class);
         ClusterSettings clusterSettings = new ClusterSettings(nodeSettings(), new HashSet<>(Collections.singletonList(
                 MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)));
+        MetaData metaData = mock(MetaData.class);
+        SortedMap<String, AliasOrIndex> aliasOrIndexSortedMap = new TreeMap<>();
+        when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap);
+        ClusterState clusterState = mock(ClusterState.class);
+        when(clusterState.getMetaData()).thenReturn(metaData);
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+        when(clusterService.state()).thenReturn(clusterState);
     }
 
     public void testWriteConfigToIndex() throws InterruptedException {
@@ -139,6 +149,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
                 .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
                 .routingTable(routingTable.build())
                 .build();
+        when(clusterService.state()).thenReturn(clusterState);
 
         doAnswer(invocation -> {
                 ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
@@ -184,15 +195,6 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
     }
 
     public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
-        // index a doc with the same Id as the config snapshot
-        IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
-                ElasticsearchMappings.DOC_TYPE, "ml-config")
-                .setSource(Collections.singletonMap("a_field", "a_value"))
-                .setOpType(DocWriteRequest.OpType.CREATE)
-                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
-        indexRequest.execute().actionGet();
-
         // define the configs
         MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
         mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
@@ -201,9 +203,23 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
         RoutingTable.Builder routingTable = RoutingTable.builder();
         addMlConfigIndex(metaData, routingTable);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
-                .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
-                .routingTable(routingTable.build())
-                .build();
+            .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
+            .routingTable(routingTable.build())
+            .build();
+        when(clusterService.state()).thenReturn(clusterState);
+
+        // index a doc with the same Id as the config snapshot
+        PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
+        AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client(), clusterService.state(), future);
+        future.actionGet();
+
+        IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
+                ElasticsearchMappings.DOC_TYPE, "ml-config")
+                .setSource(Collections.singletonMap("a_field", "a_value"))
+                .setOpType(DocWriteRequest.OpType.CREATE)
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+
+        indexRequest.execute().actionGet();
 
         doAnswer(invocation -> {
             ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
@@ -258,6 +274,7 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
                 .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
                 .routingTable(routingTable.build())
                 .build();
+        when(clusterService.state()).thenReturn(clusterState);
 
         doAnswer(invocation -> {
             ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];

+ 32 - 19
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -8,6 +8,9 @@ package org.elasticsearch.xpack.ml.job.process.autodetect;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasOrIndex;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -32,6 +35,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
 import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
@@ -63,6 +67,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -109,6 +115,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
     private JobDataCountsPersister jobDataCountsPersister;
     private NormalizerFactory normalizerFactory;
     private Auditor auditor;
+    private ClusterState clusterState;
 
     private DataCounts dataCounts = new DataCounts("foo");
     private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
@@ -128,6 +135,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         jobDataCountsPersister = mock(JobDataCountsPersister.class);
         normalizerFactory = mock(NormalizerFactory.class);
         auditor = mock(Auditor.class);
+        MetaData metaData = mock(MetaData.class);
+        SortedMap<String, AliasOrIndex> aliasOrIndexSortedMap = new TreeMap<>();
+        aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class));
+        when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap);
+        clusterState = mock(ClusterState.class);
+        when(clusterState.getMetaData()).thenReturn(metaData);
 
         doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
@@ -170,7 +183,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
         when(jobTask.getAllocationId()).thenReturn(1L);
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         assertEquals(1, manager.numberOfOpenJobs());
         assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
         verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any());
@@ -196,7 +209,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn(job.getId());
         AtomicReference<Exception> errorHolder = new AtomicReference<>();
-        manager.openJob(jobTask, errorHolder::set);
+        manager.openJob(jobTask, clusterState, errorHolder::set);
         Exception error = errorHolder.get();
         assertThat(error, is(notNullValue()));
         assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported"));
@@ -242,22 +255,22 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("bar");
         when(jobTask.getAllocationId()).thenReturn(1L);
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("baz");
         when(jobTask.getAllocationId()).thenReturn(2L);
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         assertEquals(3, manager.numberOfOpenJobs());
 
         Exception[] holder = new Exception[1];
         jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foobar");
         when(jobTask.getAllocationId()).thenReturn(3L);
-        manager.openJob(jobTask, e -> holder[0] = e);
+        manager.openJob(jobTask, clusterState, e -> holder[0] = e);
         Exception e = holder[0];
         assertEquals("max running job capacity [3] reached", e.getMessage());
 
@@ -266,7 +279,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         when(jobTask.getJobId()).thenReturn("baz");
         manager.closeJob(jobTask, false, null);
         assertEquals(2, manager.numberOfOpenJobs());
-        manager.openJob(jobTask, e1 -> {});
+        manager.openJob(jobTask, clusterState, e1 -> {});
         assertEquals(3, manager.numberOfOpenJobs());
     }
 
@@ -278,7 +291,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
         DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 params, (dataCounts1, e) -> {});
         assertEquals(1, manager.numberOfOpenJobs());
@@ -301,7 +314,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         Exception[] holder = new Exception[1];
         manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
         assertNotNull(holder[0]);
@@ -314,7 +327,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -342,7 +355,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -390,7 +403,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -419,7 +432,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         InputStream inputStream = createInputStream("");
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> {});
         verify(communicator).writeToJob(same(inputStream), same(analysisRegistry), same(xContentType), same(params), any());
     }
@@ -431,7 +444,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
         InputStream inputStream = createInputStream("");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, inputStream, randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -471,7 +484,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         // create a jobtask
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class),
                 (dataCounts1, e) -> {
                 });
@@ -511,7 +524,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         when(jobTask.getJobId()).thenReturn("foo");
         assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
 
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -529,7 +542,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         when(jobTask.getJobId()).thenReturn("foo");
         assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
 
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts1, e) -> {});
 
@@ -563,7 +576,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
 
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn("foo");
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         InputStream inputStream = createInputStream("");
         DataCounts[] dataCounts = new DataCounts[1];
         manager.processData(jobTask, analysisRegistry, inputStream,
@@ -728,7 +741,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         AutodetectProcessManager manager = createManager(communicator);
         JobTask jobTask = mock(JobTask.class);
         when(jobTask.getJobId()).thenReturn(jobId);
-        manager.openJob(jobTask, e -> {});
+        manager.openJob(jobTask, clusterState, e -> {});
         manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()),
                 mock(DataLoadParams.class), (dataCounts, e) -> {});
         return manager;

+ 2 - 1
x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.test.rest.yaml.ObjectPath;
 import org.elasticsearch.xpack.core.ml.MlMetaIndex;
 import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 import org.elasticsearch.xpack.core.rollup.job.RollupJob;
 import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
@@ -87,7 +88,7 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
         if (installTemplates()) {
             List<String> templates = new ArrayList<>();
             templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
-                    AnomalyDetectorsIndex.jobStateIndexName(),
+                    AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
                     AnomalyDetectorsIndex.jobResultsIndexPrefix(),
                     AnomalyDetectorsIndex.configIndexName()));
 

+ 5 - 0
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml

@@ -72,6 +72,11 @@ setup:
       ml.get_jobs:
         job_id: mixed-cluster-job
 
+  - do:
+      indices.exists_alias:
+        name: ".ml-state-write"
+  - is_true: ''
+
 ---
 "Test job with no model memory limit has established model memory after reopening":
   - do: