Browse Source

[ML] Refresh state index before completing data frame analytics job (#50322)

In order to ensure any persisted model state is searchable by the moment
the job reports itself as `stopped`, we need to refresh the state index
before completing.

This should fix the occasional failures we see in #50168 and #50313 where
the model state appears missing.

Closes #50168
Closes #50313
Dimitris Athanasiou 5 years ago
parent
commit
4d8d8d93d6

+ 13 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

@@ -12,6 +12,7 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
@@ -159,6 +160,7 @@ public class AnalyticsProcessManager {
             processContext.setFailureReason(resultProcessor.getFailure());
 
             refreshDest(config);
+            refreshStateIndex(config.getId());
             LOGGER.info("[{}] Result processor has completed", config.getId());
         } catch (Exception e) {
             if (task.isStopping()) {
@@ -288,6 +290,17 @@ public class AnalyticsProcessManager {
             () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet());
     }
 
+    private void refreshStateIndex(String jobId) {
+        String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
+        LOGGER.debug("[{}] Refresh index {}", jobId, indexName);
+
+        RefreshRequest refreshRequest = new RefreshRequest(indexName);
+        refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
+        try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
+            client.admin().indices().refresh(refreshRequest).actionGet();
+        }
+    }
+
     private void closeProcess(DataFrameAnalyticsTask task) {
         String configId = task.getParams().getId();
         LOGGER.info("[{}] Closing process", configId);