Browse Source

Fix and unmute testSetUpgradeMode_ExistingTaskGetsUnassigned (#55368)

Przemysław Witek 5 years ago
parent
commit
16a141b764

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java

@@ -132,6 +132,7 @@ public final class MlIndexAndAlias {
                                                  String alias,
                                                  boolean addAlias,
                                                  ActionListener<Boolean> listener) {
+        logger.info("About to create first concrete index [{}] with alias [{}]", index, alias);
         CreateIndexRequestBuilder requestBuilder = client.admin()
             .indices()
             .prepareCreate(index);
@@ -163,6 +164,7 @@ public final class MlIndexAndAlias {
                                          @Nullable String currentIndex,
                                          String newIndex,
                                          ActionListener<Boolean> listener) {
+        logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
         IndicesAliasesRequestBuilder requestBuilder = client.admin()
             .indices()
             .prepareAliases()

+ 22 - 6
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -5,6 +5,8 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -501,7 +503,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55221")
     public void testSetUpgradeMode_ExistingTaskGetsUnassigned() throws Exception {
         initialize("classification_set_upgrade_mode");
         indexData(sourceIndex, 300, 0, KEYWORD_FIELD);
@@ -519,18 +520,33 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertThat(analyticsTaskList(), hasSize(1));
         assertThat(analyticsAssignedTaskList(), is(empty()));
 
-        GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
-        assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
-        assertThat(analyticsStats.getNode(), is(nullValue()));
+        assertBusy(() -> {
+            try {
+                GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
+                assertThat(analyticsStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
+                assertThat(analyticsStats.getNode(), is(nullValue()));
+            } catch (ElasticsearchException e) {
+                logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
+                fail(e.getDetailedMessage());
+            }
+        });
 
         setUpgradeModeTo(false);
         assertThat(analyticsTaskList(), hasSize(1));
         assertBusy(() -> assertThat(analyticsAssignedTaskList(), hasSize(1)));
 
-        analyticsStats = getAnalyticsStats(jobId);
-        assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
+        assertBusy(() -> {
+            try {
+                GetDataFrameAnalyticsStatsAction.Response.Stats analyticsStats = getAnalyticsStats(jobId);
+                assertThat(analyticsStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
+            } catch (ElasticsearchException e) {
+                logger.error(new ParameterizedMessage("[{}] Encountered exception while fetching analytics stats", jobId), e);
+                fail(e.getDetailedMessage());
+            }
+        });
 
         waitUntilAnalyticsIsStopped(jobId);
+        assertProgress(jobId, 100, 100, 100, 100);
     }
 
     public void testSetUpgradeMode_NewTaskDoesNotStart() throws Exception {

+ 6 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

@@ -212,11 +212,15 @@ public class TransportGetDataFrameAnalyticsStatsAction
 
         executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap(
             multiSearchResponse -> {
-                for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) {
+                MultiSearchResponse.Item[] itemResponses = multiSearchResponse.getResponses();
+                for (int i = 0; i < itemResponses.length; ++i) {
+                    MultiSearchResponse.Item itemResponse = itemResponses[i];
                     if (itemResponse.isFailure()) {
+                        SearchRequest itemRequest = multiSearchRequest.requests().get(i);
                         logger.error(
                             new ParameterizedMessage(
-                                "[{}] Item failure encountered during multi search: {}", configId, itemResponse.getFailureMessage()),
+                                "[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}",
+                                configId, itemRequest.indices(), itemRequest.source(), itemResponse.getFailureMessage()),
                             itemResponse.getFailure());
                         listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure()));
                         return;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -643,9 +643,9 @@ public class TransportStartDataFrameAnalyticsAction
         @Override
         protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyticsAction.TaskParams params,
                                      PersistentTaskState state) {
-            logger.info("[{}] Starting data frame analytics", params.getId());
             DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
             DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
+            logger.info("[{}] Starting data frame analytics from state [{}]", params.getId(), analyticsState);
 
             // If we are "stopping" there is nothing to do and we should stop
             if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {

+ 0 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

@@ -11,7 +11,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
@@ -267,10 +266,6 @@ public class AnalyticsProcessManager {
         process.writeRecord(headerRecord);
     }
 
-    private void indexDataCounts(DataCounts dataCounts) {
-        IndexRequest indexRequest = new IndexRequest(MlStatsIndex.writeAlias());
-    }
-
     private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
                               AnalyticsProcess<AnalyticsResult> process) {
         if (config.getAnalysis().persistsState() == false) {

+ 4 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java

@@ -105,6 +105,7 @@ public class TrainedModelStatsService {
     }
 
     void stop() {
+        logger.info("About to stop TrainedModelStatsService");
         stopped = true;
         statsQueue.clear();
 
@@ -115,6 +116,7 @@ public class TrainedModelStatsService {
     }
 
     void start() {
+        logger.info("About to start TrainedModelStatsService");
         stopped = false;
         scheduledFuture = threadPool.scheduleWithFixedDelay(this::updateStats,
             PERSISTENCE_INTERVAL,
@@ -126,11 +128,13 @@ public class TrainedModelStatsService {
             return;
         }
         if (verifiedStatsIndexCreated == false) {
+            logger.info("About to create the stats index as it does not exist yet");
             try {
                 PlainActionFuture<Boolean> listener = new PlainActionFuture<>();
                 MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, indexNameExpressionResolver, listener);
                 listener.actionGet();
                 verifiedStatsIndexCreated = true;
+                logger.info("Created stats index");
             } catch (Exception e) {
                 logger.error("failure creating ml stats index for storing model stats", e);
                 return;