Browse Source

[ML][Data Frame] Fixes failure state tests and failure setting handling (#44645)

* [ML][Data Frame] fixing flaky test

* adjusting frequency

* fixing tests

* addressing PR comments
Benjamin Trent 6 years ago
parent
commit
73f8f1f46f

+ 0 - 46
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

@@ -6,12 +6,7 @@
 
 package org.elasticsearch.xpack.dataframe.integration;
 
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
-import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.junit.Before;
 
@@ -22,9 +17,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -730,45 +723,6 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
         assertEquals(4.47169811, actual.doubleValue(), 0.000001);
     }
 
-    @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/pull/44583")
-    public void testBulkIndexFailuresCauseTaskToFail() throws Exception {
-        String transformId = "bulk-failure-pivot";
-        String dataFrameIndex = "pivot-failure-index";
-        createPivotReviewsTransform(transformId, dataFrameIndex, null, null, null);
-
-        try (XContentBuilder builder = jsonBuilder()) {
-            builder.startObject();
-            {
-                builder.startObject("mappings")
-                    .startObject("properties")
-                    .startObject("reviewer")
-                    // This type should cause mapping coercion type conflict on bulk index
-                    .field("type", "long")
-                    .endObject()
-                    .endObject()
-                    .endObject();
-            }
-            builder.endObject();
-            final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
-            Request req = new Request("PUT", dataFrameIndex);
-            req.setEntity(entity);
-            client().performRequest(req);
-        }
-        startDataframeTransform(transformId, false, null);
-
-        assertBusy(() -> assertEquals(DataFrameTransformTaskState.FAILED.value(), getDataFrameTaskState(transformId)),
-            120,
-            TimeUnit.SECONDS);
-
-        Map<?, ?> state = getDataFrameState(transformId);
-        assertThat((String) XContentMapValues.extractValue("state.reason", state),
-            containsString("task encountered more than 10 failures; latest failure: Bulk index experienced failures."));
-
-        // Force stop the transform as bulk indexing caused it to go into a failed state
-        stopDataFrameTransform(transformId, true);
-        deleteIndex(dataFrameIndex);
-    }
-
     private void assertOnePivotValue(String query, double expected) throws IOException {
         Map<String, Object> searchResult = getAsMap(query);
 

+ 7 - 4
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -51,11 +51,9 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE_SUPER_USER).build();
     }
 
-    protected void createReviewsIndex(String indexName) throws IOException {
+    protected void createReviewsIndex(String indexName, int numDocs) throws IOException {
         int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1};
 
-        final int numDocs = 1000;
-
         // create mapping
         try (XContentBuilder builder = jsonBuilder()) {
             builder.startObject();
@@ -146,6 +144,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         createReviewsIndex(REVIEWS_INDEX_NAME);
     }
 
+    protected void createReviewsIndex(String indexName) throws IOException {
+        createReviewsIndex(indexName, 1000);
+    }
+
     protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query) throws IOException {
         createPivotReviewsTransform(transformId, dataFrameIndex, query, null);
     }
@@ -162,7 +164,8 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
             + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
             //Set frequency high for testing
-            + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\", \"frequency\": \"1s\"}},"
+            + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
+            + " \"frequency\": \"1s\","
             + " \"pivot\": {"
             + "   \"group_by\": {"
             + "     \"reviewer\": {"

+ 21 - 22
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.dataframe.integration;
 
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.Strings;
@@ -17,6 +16,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.junit.After;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.util.List;
@@ -27,13 +27,21 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.oneOf;
 
-@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/44583")
 public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
 
     private static final String TRANSFORM_ID = "failure_pivot_1";
 
+    @Before
+    public void setClusterSettings() throws IOException {
+        // Make sure we never retry on failure to speed up the test
+        Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
+        addFailureRetrySetting.setJsonEntity(
+            "{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}");
+        client().performRequest(addFailureRetrySetting);
+    }
+
     @After
     public void cleanUpPotentiallyFailedTransform() throws Exception {
         // If the tests failed in the middle, we should force stop it. This prevents other transform tests from failing due
@@ -43,14 +51,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
     }
 
     public void testForceStopFailedTransform() throws Exception {
-        createReviewsIndex();
+        createReviewsIndex(REVIEWS_INDEX_NAME, 10);
         String dataFrameIndex = "failure_pivot_reviews";
         createDestinationIndexWithBadMapping(dataFrameIndex);
-        createPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null);
+        createContinuousPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null);
         startDataframeTransform(TRANSFORM_ID, false);
         awaitState(TRANSFORM_ID, DataFrameTransformTaskState.FAILED);
         Map<?, ?> fullState = getDataFrameState(TRANSFORM_ID);
-        final String failureReason = "task encountered more than 10 failures; latest failure: " +
+        final String failureReason = "task encountered more than 0 failures; latest failure: " +
             "Bulk index experienced failures. See the logs of the node running the transform for details.";
         // Verify we have failed for the expected reason
         assertThat(XContentMapValues.extractValue("state.reason", fullState),
@@ -69,21 +77,20 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
 
         awaitState(TRANSFORM_ID, DataFrameTransformTaskState.STOPPED);
         fullState = getDataFrameState(TRANSFORM_ID);
-        // Verify we have failed for the expected reason
         assertThat(XContentMapValues.extractValue("state.reason", fullState),
             is(nullValue()));
     }
 
 
     public void testForceStartFailedTransform() throws Exception {
-        createReviewsIndex();
+        createReviewsIndex(REVIEWS_INDEX_NAME, 10);
         String dataFrameIndex = "failure_pivot_reviews";
         createDestinationIndexWithBadMapping(dataFrameIndex);
-        createPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null);
+        createContinuousPivotReviewsTransform(TRANSFORM_ID, dataFrameIndex, null);
         startDataframeTransform(TRANSFORM_ID, false);
         awaitState(TRANSFORM_ID, DataFrameTransformTaskState.FAILED);
         Map<?, ?> fullState = getDataFrameState(TRANSFORM_ID);
-        final String failureReason = "task encountered more than 10 failures; latest failure: " +
+        final String failureReason = "task encountered more than 0 failures; latest failure: " +
             "Bulk index experienced failures. See the logs of the node running the transform for details.";
         // Verify we have failed for the expected reason
         assertThat(XContentMapValues.extractValue("state.reason", fullState),
@@ -101,23 +108,15 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
         deleteIndex(dataFrameIndex);
         // Force start the data frame to indicate failure correction
         startDataframeTransform(TRANSFORM_ID, true);
-        // Wait for data to be indexed appropriately and refresh for search
-        waitForDataFrameCheckpoint(TRANSFORM_ID);
-        refreshIndex(dataFrameIndex);
 
         // Verify that we have started and that our reason is cleared
         fullState = getDataFrameState(TRANSFORM_ID);
         assertThat(XContentMapValues.extractValue("state.reason", fullState), is(nullValue()));
         assertThat(XContentMapValues.extractValue("state.task_state", fullState), equalTo("started"));
-        assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), equalTo("started"));
-        assertThat((int)XContentMapValues.extractValue("stats.index_failures", fullState), greaterThan(0));
-
-        // get and check some users to verify we restarted
-        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);
-        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72);
-        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846);
-        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769);
-        assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
+        assertThat(XContentMapValues.extractValue("state.indexer_state", fullState), is(oneOf("started", "indexing")));
+        assertThat(XContentMapValues.extractValue("stats.index_failures", fullState), equalTo(1));
+
+        stopDataFrameTransform(TRANSFORM_ID, true);
     }
 
     private void awaitState(String transformId, DataFrameTransformTaskState state) throws Exception {

+ 9 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -207,8 +207,15 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
         assert dataFrameAuditor.get() != null;
         assert dataFrameTransformsCheckpointService.get() != null;
 
-        return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
-                dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool));
+        return Collections.singletonList(
+            new DataFrameTransformPersistentTasksExecutor(client,
+                dataFrameTransformsConfigManager.get(),
+                dataFrameTransformsCheckpointService.get(),
+                schedulerEngine.get(),
+                dataFrameAuditor.get(),
+                threadPool,
+                clusterService,
+                settingsModule.getSettings()));
     }
 
     public List<Setting<?>> getSettings() {

+ 1 - 8
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

@@ -32,7 +32,6 @@ public class TransportStartDataFrameTransformTaskAction extends
     TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformTaskAction.Request,
         StartDataFrameTransformTaskAction.Response, StartDataFrameTransformTaskAction.Response> {
 
-    private volatile int numFailureRetries;
     private final XPackLicenseState licenseState;
 
     @Inject
@@ -42,8 +41,6 @@ public class TransportStartDataFrameTransformTaskAction extends
             StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new,
             StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME);
         this.licenseState = licenseState;
-        clusterService.getClusterSettings()
-            .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
     }
 
     @Override
@@ -62,7 +59,7 @@ public class TransportStartDataFrameTransformTaskAction extends
     protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
                                  ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         if (transformTask.getTransformId().equals(request.getId())) {
-            transformTask.setNumFailureRetries(numFailureRetries).start(null, listener);
+            transformTask.start(null, listener);
         } else {
             listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
                 + "] does not match request's ID [" + request.getId() + "]"));
@@ -93,8 +90,4 @@ public class TransportStartDataFrameTransformTaskAction extends
         boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted);
         return new StartDataFrameTransformTaskAction.Response(allStarted);
     }
-
-    void setNumFailureRetries(int numFailureRetries) {
-        this.numFailureRetries = numFailureRetries;
-    }
 }

+ 14 - 2
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -18,7 +18,9 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.persistent.PersistentTaskState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -61,13 +63,16 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
     private final SchedulerEngine schedulerEngine;
     private final ThreadPool threadPool;
     private final DataFrameAuditor auditor;
+    private volatile int numFailureRetries;
 
     public DataFrameTransformPersistentTasksExecutor(Client client,
                                                      DataFrameTransformsConfigManager transformsConfigManager,
                                                      DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService,
                                                      SchedulerEngine schedulerEngine,
                                                      DataFrameAuditor auditor,
-                                                     ThreadPool threadPool) {
+                                                     ThreadPool threadPool,
+                                                     ClusterService clusterService,
+                                                     Settings settings) {
         super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME);
         this.client = client;
         this.transformsConfigManager = transformsConfigManager;
@@ -75,6 +80,9 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
         this.schedulerEngine = schedulerEngine;
         this.auditor = auditor;
         this.threadPool = threadPool;
+        this.numFailureRetries = DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING.get(settings);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
     }
 
     @Override
@@ -285,7 +293,11 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
                            Long previousCheckpoint,
                            ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         buildTask.initializeIndexer(indexerBuilder);
-        buildTask.start(previousCheckpoint, listener);
+        buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, listener);
+    }
+
+    private void setNumFailureRetries(int numFailureRetries) {
+        this.numFailureRetries = numFailureRetries;
     }
 
     @Override

+ 19 - 5
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -329,19 +329,33 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     }
 
     synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
-        taskState.set(DataFrameTransformTaskState.FAILED);
-        stateReason.set(reason);
         auditor.error(transform.getId(), reason);
         // We should not keep retrying. Either the task will be stopped, or started
         // If it is started again, it is registered again.
         deregisterSchedulerJob();
+        DataFrameTransformState newState = new DataFrameTransformState(
+            DataFrameTransformTaskState.FAILED,
+            initialIndexerState,
+            initialPosition,
+            currentCheckpoint.get(),
+            reason,
+            getIndexer() == null ? null : getIndexer().getProgress());
         // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
         // This keeps track of STARTED, FAILED, STOPPED
         // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
         //   we could not read the previous state information from said index.
-        persistStateToClusterState(getState(), ActionListener.wrap(
-            r -> listener.onResponse(null),
-            listener::onFailure
+        persistStateToClusterState(newState, ActionListener.wrap(
+            r -> {
+                taskState.set(DataFrameTransformTaskState.FAILED);
+                stateReason.set(reason);
+                listener.onResponse(null);
+            },
+            e -> {
+                logger.error("Failed to set task state as failed to cluster state", e);
+                taskState.set(DataFrameTransformTaskState.FAILED);
+                stateReason.set(reason);
+                listener.onFailure(e);
+            }
         ));
     }
 

+ 10 - 2
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java

@@ -21,6 +21,8 @@ import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
@@ -41,6 +43,7 @@ import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
 
@@ -98,12 +101,17 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
         DataFrameTransformsConfigManager transformsConfigManager = new DataFrameTransformsConfigManager(client, xContentRegistry());
         DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService = new DataFrameTransformsCheckpointService(client,
             transformsConfigManager);
-
+        ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
+            Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING));
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(cSettings);
         DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client,
             transformsConfigManager,
             dataFrameTransformsCheckpointService, mock(SchedulerEngine.class),
             new DataFrameAuditor(client, ""),
-            mock(ThreadPool.class));
+            mock(ThreadPool.class),
+            clusterService,
+            Settings.EMPTY);
 
         assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(),
             equalTo("current-data-node-with-1-tasks"));