Browse Source

[ML][Data Frame] adding dynamic cluster setting for failure retries (#44577)

This adds a new dynamic cluster setting `xpack.data_frame.num_transform_failure_retries`.

This setting indicates how many times non-critical failures should be retried before a data frame transform is marked as failed and should stop executing. At the time of this commit; Min: 0, Max: 100, Default: 10
Benjamin Trent 6 years ago
parent
commit
187dc5a029

+ 40 - 0
docs/reference/settings/data-frames-settings.asciidoc

@@ -0,0 +1,40 @@
+
+[role="xpack"]
+[[data-frames-settings]]
+=== {dataframe-transforms-cap}  settings in Elasticsearch
+[subs="attributes"]
+++++
+<titleabbrev>{dataframe-transforms-cap} settings</titleabbrev>
+++++
+
+You do not need to configure any settings to use {dataframe-transforms}. It is enabled by default.
+
+All of these settings can be added to the `elasticsearch.yml` configuration file. 
+The dynamic settings can also be updated across a cluster with the 
+<<cluster-update-settings,cluster update settings API>>.
+
+TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml` 
+file.
+
+[float]
+[[general-data-frames-settings]]
+==== General {dataframe-transforms} settings
+
+`xpack.data_frame.enabled`::
+Set to `true` (default) to enable {dataframe-transforms} on the node. +
++
+If set to `false` in `elasticsearch.yml`, the {dataframe-transform} APIs are disabled on the node.
+Therefore the node cannot start or administrate transforms or receive transport (internal)
+communication requests related to {dataframe-transform} APIs.
++
+IMPORTANT: If you want to use {dataframe-transform} features in your cluster, you must have
+`xpack.data_frame.enabled` set to `true` on all master-eligible nodes. This is the
+default behavior.
+
+`xpack.data_frame.num_transform_failure_retries` (<<cluster-update-settings,Dynamic>>)::
+The number of times that a {dataframe-transform} retries when it experiences a
+non-fatal error. Once the number of retries is exhausted, the {dataframe-transform}
+task will be marked as `failed`. The default value is `10` with a valid minimum of `0`
+and maximum of `100`.
+If a {dataframe-transform} is already running, it will have to be restarted
+to use the changed setting.

+ 2 - 0
docs/reference/setup.asciidoc

@@ -51,6 +51,8 @@ include::settings/audit-settings.asciidoc[]
 
 include::settings/ccr-settings.asciidoc[]
 
+include::settings/data-frames-settings.asciidoc[]
+
 include::settings/ilm-settings.asciidoc[]
 
 include::settings/license-settings.asciidoc[]

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackSettings.java

@@ -57,7 +57,7 @@ public class XPackSettings {
     /** Setting for enabling or disabling graph. Defaults to true. */
     public static final Setting<Boolean> GRAPH_ENABLED = Setting.boolSetting("xpack.graph.enabled", true, Setting.Property.NodeScope);
 
-    /** Setting for enabling or disabling machine learning. Defaults to false. */
+    /** Setting for enabling or disabling machine learning. Defaults to true. */
     public static final Setting<Boolean> MACHINE_LEARNING_ENABLED = Setting.boolSetting("xpack.ml.enabled", true,
             Setting.Property.NodeScope);
 

+ 6 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsModule;
@@ -72,6 +73,7 @@ import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAc
 import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformPersistentTasksExecutor;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
 import java.io.IOException;
 import java.time.Clock;
@@ -209,6 +211,10 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
                 dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool));
     }
 
+    public List<Setting<?>> getSettings() {
+        return Collections.singletonList(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING);
+    }
+
     @Override
     public void close() {
         if (schedulerEngine.get() != null) {

+ 8 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

@@ -32,6 +32,7 @@ public class TransportStartDataFrameTransformTaskAction extends
     TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformTaskAction.Request,
         StartDataFrameTransformTaskAction.Response, StartDataFrameTransformTaskAction.Response> {
 
+    private volatile int numFailureRetries;
     private final XPackLicenseState licenseState;
 
     @Inject
@@ -41,6 +42,8 @@ public class TransportStartDataFrameTransformTaskAction extends
             StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new,
             StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME);
         this.licenseState = licenseState;
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
     }
 
     @Override
@@ -59,7 +62,7 @@ public class TransportStartDataFrameTransformTaskAction extends
     protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
                                  ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
         if (transformTask.getTransformId().equals(request.getId())) {
-            transformTask.start(null, listener);
+            transformTask.setNumFailureRetries(numFailureRetries).start(null, listener);
         } else {
             listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
                 + "] does not match request's ID [" + request.getId() + "]"));
@@ -90,4 +93,8 @@ public class TransportStartDataFrameTransformTaskAction extends
         boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted);
         return new StartDataFrameTransformTaskAction.Response(allStarted);
     }
+
+    void setNumFailureRetries(int numFailureRetries) {
+        this.numFailureRetries = numFailureRetries;
+    }
 }

+ 22 - 4
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -63,8 +64,16 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
     // Default interval the scheduler sends an event if the config does not specify a frequency
     private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
     private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
-    // TODO consider moving to dynamic cluster setting
-    private static final int MAX_CONTINUOUS_FAILURES = 10;
+    private static final int DEFAULT_FAILURE_RETRIES = 10;
+    private volatile int numFailureRetries = DEFAULT_FAILURE_RETRIES;
+    // How many times the transform task can retry on an non-critical failure
+    public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
+        "xpack.data_frame.num_transform_failure_retries",
+        DEFAULT_FAILURE_RETRIES,
+        0,
+        100,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic);
     private static final IndexerState[] RUNNING_STATES = new IndexerState[]{IndexerState.STARTED, IndexerState.INDEXING};
     public static final String SCHEDULE_NAME = DataFrameField.TASK_NAME + "/schedule";
 
@@ -351,6 +360,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         }
     }
 
+    public DataFrameTransformTask setNumFailureRetries(int numFailureRetries) {
+        this.numFailureRetries = numFailureRetries;
+        return this;
+    }
+
+    public int getNumFailureRetries() {
+        return numFailureRetries;
+    }
+
     private void registerWithSchedulerJob() {
         schedulerEngine.register(this);
         final SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(schedulerJobName(), next());
@@ -832,10 +850,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                 return;
             }
 
-            if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
+            if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > transformTask.getNumFailureRetries()) {
                 String failureMessage = isIrrecoverableFailure(e) ?
                     "task encountered irrecoverable failure: " + e.getMessage() :
-                    "task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
+                    "task encountered more than " + transformTask.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
                 failIndexer(failureMessage);
             }
         }