Browse Source

[ML] Extract creation of DFA field extractor into a factory (#49315)

This commit moves the async calls required to retrieve the components
that make up `ExtractedFieldsExtractor` out of `DataFrameDataExtractorFactory`
and into a dedicated `ExtractorFieldsExtractorFactory` class.

A few more refactorings are performed:

  - The detector no longer needs the results field. Instead, it knows
  whether to use it or not based on whether the task is restarting.
  - We pass more accurately whether the task is restarting or not.
  - The validation of whether fields that have a cardinality limit
  are valid is now performed in the detector after retrieving the
  respective cardinalities.
Dimitris Athanasiou 6 years ago
parent
commit
1dd816f030

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportEstimateMemoryUsageAction.java

@@ -81,6 +81,7 @@ public class TransportEstimateMemoryUsageAction
         DataFrameDataExtractorFactory.createForSourceIndices(
             client,
             taskId,
+            true, // We are not interested in first-time run validations here
             request.getConfig(),
             ActionListener.wrap(
                 dataExtractorFactory -> {

+ 47 - 33
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -65,6 +65,7 @@ import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
 import org.elasticsearch.xpack.ml.dataframe.MappingsMerger;
 import org.elasticsearch.xpack.ml.dataframe.SourceDestValidator;
 import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
+import org.elasticsearch.xpack.ml.dataframe.extractor.ExtractedFieldsDetectorFactory;
 import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
 import org.elasticsearch.xpack.ml.job.JobNodeSelector;
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
@@ -229,33 +230,7 @@ public class TransportStartDataFrameAnalyticsAction
 
         // Step 6. Validate that there are analyzable data in the source index
         ActionListener<StartContext> validateMappingsMergeListener = ActionListener.wrap(
-            startContext -> DataFrameDataExtractorFactory.createForSourceIndices(client,
-                "validate_source_index_has_rows-" + id,
-                startContext.config,
-                ActionListener.wrap(
-                    dataFrameDataExtractorFactory ->
-                        dataFrameDataExtractorFactory
-                            .newExtractor(false)
-                            .collectDataSummaryAsync(ActionListener.wrap(
-                                dataSummary -> {
-                                    if (dataSummary.rows == 0) {
-                                        finalListener.onFailure(ExceptionsHelper.badRequestException(
-                                            "Unable to start {} as no documents in the source indices [{}] contained all the fields "
-                                                + "selected for analysis. If you are relying on automatic field selection then there are "
-                                                + "currently mapped fields that do not exist in any indexed documents, and you will have "
-                                                + "to switch to explicit field selection and include only fields that exist in indexed "
-                                                + "documents.",
-                                            id, Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
-                                        ));
-                                    } else {
-                                        finalListener.onResponse(startContext);
-                                    }
-                                },
-                                finalListener::onFailure
-                            )),
-                    finalListener::onFailure
-                ))
-            ,
+            startContext -> validateSourceIndexHasRows(startContext, finalListener),
             finalListener::onFailure
         );
 
@@ -270,9 +245,7 @@ public class TransportStartDataFrameAnalyticsAction
         // Step 4. Validate dest index is empty if task is starting for first time
         ActionListener<StartContext> toValidateDestEmptyListener = ActionListener.wrap(
             startContext -> {
-                DataFrameAnalyticsTask.StartingState startingState = DataFrameAnalyticsTask.determineStartingState(
-                    startContext.config.getId(), startContext.progressOnStart);
-                switch (startingState) {
+                switch (startContext.startingState) {
                     case FIRST_TIME:
                         checkDestIndexIsEmptyIfExists(startContext, toValidateMappingsListener);
                         break;
@@ -286,7 +259,7 @@ public class TransportStartDataFrameAnalyticsAction
                             "Cannot start because the job has already finished"));
                         break;
                     default:
-                        finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startingState));
+                        finalListener.onFailure(ExceptionsHelper.serverError("Unexpected starting state " + startContext.startingState));
                         break;
                 }
             },
@@ -296,9 +269,16 @@ public class TransportStartDataFrameAnalyticsAction
         // Step 3. Validate source and dest; check data extraction is possible
         ActionListener<StartContext> startContextListener = ActionListener.wrap(
             startContext -> {
+                // Validate the query parses
+                startContext.config.getSource().getParsedQuery();
+
+                // Validate source/dest are valid
                 new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(startContext.config);
-                DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, startContext.config, ActionListener.wrap(
-                    config -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
+
+                // Validate extraction is possible
+                boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
+                new ExtractedFieldsDetectorFactory(client).createFromSource(startContext.config, isTaskRestarting, ActionListener.wrap(
+                    extractedFieldsDetector -> toValidateDestEmptyListener.onResponse(startContext), finalListener::onFailure));
             },
             finalListener::onFailure
         );
@@ -314,6 +294,38 @@ public class TransportStartDataFrameAnalyticsAction
         configProvider.get(id, getConfigListener);
     }
 
+    private void validateSourceIndexHasRows(StartContext startContext, ActionListener<StartContext> listener) {
+        boolean isTaskRestarting = startContext.startingState != DataFrameAnalyticsTask.StartingState.FIRST_TIME;
+        DataFrameDataExtractorFactory.createForSourceIndices(client,
+            "validate_source_index_has_rows-" + startContext.config.getId(),
+            isTaskRestarting,
+            startContext.config,
+            ActionListener.wrap(
+                dataFrameDataExtractorFactory ->
+                    dataFrameDataExtractorFactory
+                        .newExtractor(false)
+                        .collectDataSummaryAsync(ActionListener.wrap(
+                            dataSummary -> {
+                                if (dataSummary.rows == 0) {
+                                    listener.onFailure(ExceptionsHelper.badRequestException(
+                                        "Unable to start {} as no documents in the source indices [{}] contained all the fields "
+                                            + "selected for analysis. If you are relying on automatic field selection then there are "
+                                            + "currently mapped fields that do not exist in any indexed documents, and you will have "
+                                            + "to switch to explicit field selection and include only fields that exist in indexed "
+                                            + "documents.",
+                                        startContext.config.getId(),
+                                        Strings.arrayToCommaDelimitedString(startContext.config.getSource().getIndex())
+                                    ));
+                                } else {
+                                    listener.onResponse(startContext);
+                                }
+                            },
+                            listener::onFailure
+                        )),
+                listener::onFailure
+            ));
+    }
+
     private void getProgress(DataFrameAnalyticsConfig config, ActionListener<List<PhaseProgress>> listener) {
         GetDataFrameAnalyticsStatsAction.Request getStatsRequest = new GetDataFrameAnalyticsStatsAction.Request(config.getId());
         executeAsyncWithOrigin(client, ML_ORIGIN, GetDataFrameAnalyticsStatsAction.INSTANCE, getStatsRequest, ActionListener.wrap(
@@ -390,10 +402,12 @@ public class TransportStartDataFrameAnalyticsAction
     private static class StartContext {
         private final DataFrameAnalyticsConfig config;
         private final List<PhaseProgress> progressOnStart;
+        private final DataFrameAnalyticsTask.StartingState startingState;
 
         private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progressOnStart) {
             this.config = config;
             this.progressOnStart = progressOnStart;
+            this.startingState = DataFrameAnalyticsTask.determineStartingState(config.getId(), progressOnStart);
         }
     }
 

+ 24 - 199
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java

@@ -5,43 +5,20 @@
  */
 package org.elasticsearch.xpack.ml.dataframe.extractor;
 
-import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.metrics.Cardinality;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
-import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.extractor.ExtractedField;
 import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
 
 import java.util.Arrays;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class DataFrameDataExtractorFactory {
 
@@ -94,27 +71,27 @@ public class DataFrameDataExtractorFactory {
      * The source index must exist and contain at least 1 compatible field or validations will fail.
      *
      * @param client ES Client used to make calls against the cluster
+     * @param taskId The task id
+     * @param isTaskRestarting Whether the task is restarting or it is running for the first time
      * @param config The config from which to create the extractor factory
      * @param listener The listener to notify on creation or failure
      */
     public static void createForSourceIndices(Client client,
                                               String taskId,
+                                              boolean isTaskRestarting,
                                               DataFrameAnalyticsConfig config,
                                               ActionListener<DataFrameDataExtractorFactory> listener) {
-        validateIndexAndExtractFields(
-            client,
-            config.getSource().getIndex(),
-            config,
-            null,
-            false,
-            ActionListener.wrap(
-                extractedFields -> listener.onResponse(
-                    new DataFrameDataExtractorFactory(
-                        client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(),
-                        config.getAnalysis().supportsMissingValues())),
-                listener::onFailure
-            )
-        );
+        ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client);
+        extractedFieldsDetectorFactory.createFromSource(config, isTaskRestarting, ActionListener.wrap(
+            extractedFieldsDetector -> {
+                ExtractedFields extractedFields = extractedFieldsDetector.detect();
+                DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, taskId,
+                    Arrays.asList(config.getSource().getIndex()), extractedFields, config.getHeaders(),
+                    config.getAnalysis().supportsMissingValues());
+                listener.onResponse(extractorFactory);
+            },
+            listener::onFailure
+        ));
     }
 
     /**
@@ -131,168 +108,16 @@ public class DataFrameDataExtractorFactory {
                                                  DataFrameAnalyticsConfig config,
                                                  boolean isTaskRestarting,
                                                  ActionListener<DataFrameDataExtractorFactory> listener) {
-        validateIndexAndExtractFields(
-            client,
-            new String[] {config.getDest().getIndex()},
-            config,
-            config.getDest().getResultsField(),
-            isTaskRestarting,
-            ActionListener.wrap(
-                extractedFields -> listener.onResponse(
-                    new DataFrameDataExtractorFactory(
-                        client, config.getId(), Arrays.asList(config.getDest().getIndex()), extractedFields, config.getHeaders(),
-                        config.getAnalysis().supportsMissingValues())),
-                listener::onFailure
-            )
-        );
-    }
-
-    /**
-     * Validates the source index and analytics config
-     *
-     * @param client ES Client to make calls
-     * @param config Analytics config to validate
-     * @param listener The listener to notify on failure or completion
-     */
-    public static void validateConfigAndSourceIndex(Client client,
-                                                    DataFrameAnalyticsConfig config,
-                                                    ActionListener<DataFrameAnalyticsConfig> listener) {
-        validateIndexAndExtractFields(
-            client,
-            config.getSource().getIndex(),
-            config,
-            config.getDest().getResultsField(),
-            false,
-            ActionListener.wrap(
-                fields -> {
-                    config.getSource().getParsedQuery(); // validate query is acceptable
-                    listener.onResponse(config);
-                },
-                listener::onFailure
-            )
-        );
-    }
-
-    private static void validateIndexAndExtractFields(Client client,
-                                                      String[] index,
-                                                      DataFrameAnalyticsConfig config,
-                                                      String resultsField,
-                                                      boolean isTaskRestarting,
-                                                      ActionListener<ExtractedFields> listener) {
-        AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();
-        AtomicReference<ExtractedFields> extractedFieldsHolder = new AtomicReference<>();
-
-        // Step 4. Check fields cardinality vs limits and notify listener
-        ActionListener<SearchResponse> checkCardinalityHandler = ActionListener.wrap(
-            searchResponse -> {
-                if (searchResponse != null) {
-                    Aggregations aggs = searchResponse.getAggregations();
-                    if (aggs == null) {
-                        listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
-                        return;
-                    }
-                    for (Map.Entry<String, Long> entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) {
-                        String fieldName = entry.getKey();
-                        Long limit = entry.getValue();
-                        Cardinality cardinality = aggs.get(fieldName);
-                        if (cardinality == null) {
-                            listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
-                            return;
-                        }
-                        if (cardinality.getValue() > limit) {
-                            listener.onFailure(
-                                ExceptionsHelper.badRequestException(
-                                    "Field [{}] must have at most [{}] distinct values but there were at least [{}]",
-                                    fieldName, limit, cardinality.getValue()));
-                            return;
-                        }
-                    }
-                }
-                listener.onResponse(extractedFieldsHolder.get());
-            },
-            listener::onFailure
-        );
-
-        // Step 3. Extract fields (if possible)
-        ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
-            fieldCapabilitiesResponse -> {
-                extractedFieldsHolder.set(
-                    new ExtractedFieldsDetector(
-                            index, config, resultsField, isTaskRestarting, docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse)
-                        .detect());
-
-                Map<String, Long> fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits();
-                if (fieldCardinalityLimits.isEmpty()) {
-                    checkCardinalityHandler.onResponse(null);
-                } else {
-                    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
-                    for (Map.Entry<String, Long> entry : fieldCardinalityLimits.entrySet()) {
-                        String fieldName = entry.getKey();
-                        Long limit = entry.getValue();
-                        searchSourceBuilder.aggregation(
-                            AggregationBuilders.cardinality(fieldName)
-                                .field(fieldName)
-                                .precisionThreshold(limit + 1));
-                    }
-                    SearchRequest searchRequest = new SearchRequest(config.getSource().getIndex()).source(searchSourceBuilder);
-                    ClientHelper.executeWithHeadersAsync(
-                        config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, checkCardinalityHandler);
-                }
-            },
-            listener::onFailure
-        );
-
-        // Step 2. Get field capabilities necessary to build the information of how to extract fields
-        ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
-            docValueFieldsLimit -> {
-                docValueFieldsLimitHolder.set(docValueFieldsLimit);
-
-                FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
-                fieldCapabilitiesRequest.indices(index);
-                fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
-                fieldCapabilitiesRequest.fields("*");
-                ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
-                    client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
-                    // This response gets discarded - the listener handles the real response
-                    return null;
-                });
+        ExtractedFieldsDetectorFactory extractedFieldsDetectorFactory = new ExtractedFieldsDetectorFactory(client);
+        extractedFieldsDetectorFactory.createFromDest(config, isTaskRestarting, ActionListener.wrap(
+            extractedFieldsDetector -> {
+                ExtractedFields extractedFields = extractedFieldsDetector.detect();
+                DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(),
+                    Collections.singletonList(config.getDest().getIndex()), extractedFields, config.getHeaders(),
+                    config.getAnalysis().supportsMissingValues());
+                listener.onResponse(extractorFactory);
             },
             listener::onFailure
-        );
-
-        // Step 1. Get doc value fields limit
-        getDocValueFieldsLimit(client, index, docValueFieldsLimitListener);
-    }
-
-    private static void getDocValueFieldsLimit(Client client, String[] index, ActionListener<Integer> docValueFieldsLimitListener) {
-        ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
-                Integer minDocValueFieldsLimit = Integer.MAX_VALUE;
-
-                ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
-                Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
-                while (iterator.hasNext()) {
-                    ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
-                    Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
-                    if (indexMaxDocValueFields < minDocValueFieldsLimit) {
-                        minDocValueFieldsLimit = indexMaxDocValueFields;
-                    }
-                }
-                docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
-            },
-            e -> {
-                if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
-                    docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
-                        + ((IndexNotFoundException) e).getIndex() + " does not exist"));
-                } else {
-                    docValueFieldsLimitListener.onFailure(e);
-                }
-            }
-        );
-
-        GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
-        getSettingsRequest.indices(index);
-        getSettingsRequest.includeDefaults(true);
-        getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
-        client.admin().indices().getSettings(getSettingsRequest, settingsListener);
+        ));
     }
 }

+ 26 - 13
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java

@@ -52,19 +52,20 @@ public class ExtractedFieldsDetector {
 
     private final String[] index;
     private final DataFrameAnalyticsConfig config;
-    private final String resultsField;
     private final boolean isTaskRestarting;
     private final int docValueFieldsLimit;
     private final FieldCapabilitiesResponse fieldCapabilitiesResponse;
+    private final Map<String, Long> fieldCardinalities;
 
-    ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, String resultsField, boolean isTaskRestarting,
-                            int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse) {
+    ExtractedFieldsDetector(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
+                            int docValueFieldsLimit, FieldCapabilitiesResponse fieldCapabilitiesResponse,
+                            Map<String, Long> fieldCardinalities) {
         this.index = Objects.requireNonNull(index);
         this.config = Objects.requireNonNull(config);
-        this.resultsField = resultsField;
         this.isTaskRestarting = isTaskRestarting;
         this.docValueFieldsLimit = docValueFieldsLimit;
         this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse);
+        this.fieldCardinalities = Objects.requireNonNull(fieldCardinalities);
     }
 
     public ExtractedFields detect() {
@@ -79,11 +80,13 @@ public class ExtractedFieldsDetector {
         checkNoIgnoredFields(fields);
         checkFieldsHaveCompatibleTypes(fields);
         checkRequiredFields(fields);
+        checkFieldsWithCardinalityLimit();
         return detectExtractedFields(fields);
     }
 
     private Set<String> getIncludedFields() {
         Set<String> fields = new HashSet<>(fieldCapabilitiesResponse.get().keySet());
+        checkResultsFieldIsNotPresent();
         removeFieldsUnderResultsField(fields);
         FetchSourceContext analyzedFields = config.getAnalyzedFields();
 
@@ -96,21 +99,13 @@ public class ExtractedFieldsDetector {
         return fields;
     }
 
-    private void removeFieldsUnderResultsField(Set<String> fields) {
-        if (resultsField == null) {
-            return;
-        }
-        checkResultsFieldIsNotPresent();
-        // Ignore fields under the results object
-        fields.removeIf(field -> field.startsWith(resultsField + "."));
-    }
-
     private void checkResultsFieldIsNotPresent() {
         // If the task is restarting we do not mind the index containing the results field, we will overwrite all docs
         if (isTaskRestarting) {
             return;
         }
 
+        String resultsField = config.getDest().getResultsField();
         Map<String, FieldCapabilities> indexToFieldCaps = fieldCapabilitiesResponse.getField(resultsField);
         if (indexToFieldCaps != null && indexToFieldCaps.isEmpty() == false) {
             throw ExceptionsHelper.badRequestException(
@@ -122,6 +117,11 @@ public class ExtractedFieldsDetector {
         }
     }
 
+    private void removeFieldsUnderResultsField(Set<String> fields) {
+        // Ignore fields under the results object
+        fields.removeIf(field -> field.startsWith(config.getDest().getResultsField() + "."));
+    }
+
     private void removeFieldsWithIncompatibleTypes(Set<String> fields) {
         Iterator<String> fieldsIterator = fields.iterator();
         while (fieldsIterator.hasNext()) {
@@ -234,6 +234,19 @@ public class ExtractedFieldsDetector {
         }
     }
 
+    private void checkFieldsWithCardinalityLimit() {
+        for (Map.Entry<String, Long> entry : config.getAnalysis().getFieldCardinalityLimits().entrySet()) {
+            String fieldName = entry.getKey();
+            long limit = entry.getValue();
+            long cardinality = fieldCardinalities.get(fieldName);
+            if (cardinality > limit) {
+                throw ExceptionsHelper.badRequestException(
+                        "Field [{}] must have at most [{}] distinct values but there were at least [{}]",
+                        fieldName, limit, cardinality);
+            }
+        }
+    }
+
     private ExtractedFields detectExtractedFields(Set<String> fields) {
         List<String> sortedFields = new ArrayList<>(fields);
         // We sort the fields to ensure the checksum for each document is deterministic

+ 189 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorFactory.java

@@ -0,0 +1,189 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.dataframe.extractor;
+
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.metrics.Cardinality;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A factory that retrieves all the parts necessary to build a {@link ExtractedFieldsDetector}.
+ */
+public class ExtractedFieldsDetectorFactory {
+
+    private final Client client;
+
+    public ExtractedFieldsDetectorFactory(Client client) {
+        this.client = Objects.requireNonNull(client);
+    }
+
+    public void createFromSource(DataFrameAnalyticsConfig config, boolean isTaskRestarting,
+                                 ActionListener<ExtractedFieldsDetector> listener) {
+        create(config.getSource().getIndex(), config, isTaskRestarting, listener);
+    }
+
+    public void createFromDest(DataFrameAnalyticsConfig config, boolean isTaskRestarting,
+                               ActionListener<ExtractedFieldsDetector> listener) {
+        create(new String[] {config.getDest().getIndex()}, config, isTaskRestarting, listener);
+    }
+
+    private void create(String[] index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
+                        ActionListener<ExtractedFieldsDetector> listener) {
+        AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();
+        AtomicReference<FieldCapabilitiesResponse> fieldCapsResponseHolder = new AtomicReference<>();
+
+        // Step 4. Create cardinality by field map and build detector
+        ActionListener<Map<String, Long>> fieldCardinalitiesHandler = ActionListener.wrap(
+            fieldCardinalities -> {
+                ExtractedFieldsDetector detector = new ExtractedFieldsDetector(index, config, isTaskRestarting,
+                    docValueFieldsLimitHolder.get(), fieldCapsResponseHolder.get(), fieldCardinalities);
+                listener.onResponse(detector);
+            },
+            listener::onFailure
+        );
+
+        // Step 3. Get cardinalities for fields with limits
+        ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
+            fieldCapabilitiesResponse -> {
+                fieldCapsResponseHolder.set(fieldCapabilitiesResponse);
+                getCardinalitiesForFieldsWithLimit(index, config, fieldCardinalitiesHandler);
+            },
+            listener::onFailure
+        );
+
+        // Step 2. Get field capabilities necessary to build the information of how to extract fields
+        ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
+            docValueFieldsLimit -> {
+                docValueFieldsLimitHolder.set(docValueFieldsLimit);
+                getFieldCaps(index, config, fieldCapabilitiesHandler);
+            },
+            listener::onFailure
+        );
+
+        // Step 1. Get doc value fields limit
+        getDocValueFieldsLimit(index, docValueFieldsLimitListener);
+    }
+
+    private void getCardinalitiesForFieldsWithLimit(String[] index, DataFrameAnalyticsConfig config,
+                                                    ActionListener<Map<String, Long>> listener) {
+        Map<String, Long> fieldCardinalityLimits = config.getAnalysis().getFieldCardinalityLimits();
+        if (fieldCardinalityLimits.isEmpty()) {
+            listener.onResponse(Collections.emptyMap());
+            return;
+        }
+
+        ActionListener<SearchResponse> searchListener = ActionListener.wrap(
+            searchResponse -> buildFieldCardinalitiesMap(config, searchResponse, listener),
+            listener::onFailure
+        );
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0);
+        for (Map.Entry<String, Long> entry : fieldCardinalityLimits.entrySet()) {
+            String fieldName = entry.getKey();
+            Long limit = entry.getValue();
+            searchSourceBuilder.aggregation(
+                AggregationBuilders.cardinality(fieldName)
+                    .field(fieldName)
+                    .precisionThreshold(limit + 1));
+        }
+        SearchRequest searchRequest = new SearchRequest(index).source(searchSourceBuilder);
+        ClientHelper.executeWithHeadersAsync(
+            config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE, searchRequest, searchListener);
+    }
+
+    private void buildFieldCardinalitiesMap(DataFrameAnalyticsConfig config, SearchResponse searchResponse,
+                                            ActionListener<Map<String, Long>> listener) {
+        Aggregations aggs = searchResponse.getAggregations();
+        if (aggs == null) {
+            listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
+            return;
+        }
+
+        Map<String, Long> fieldCardinalities = new HashMap<>(config.getAnalysis().getFieldCardinalityLimits().size());
+        for (String field : config.getAnalysis().getFieldCardinalityLimits().keySet()) {
+            Cardinality cardinality = aggs.get(field);
+            if (cardinality == null) {
+                listener.onFailure(ExceptionsHelper.serverError("Unexpected null response when gathering field cardinalities"));
+                return;
+            }
+            fieldCardinalities.put(field, cardinality.getValue());
+        }
+        listener.onResponse(fieldCardinalities);
+    }
+
+    private void getFieldCaps(String[] index, DataFrameAnalyticsConfig config, ActionListener<FieldCapabilitiesResponse> listener) {
+        FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
+        fieldCapabilitiesRequest.indices(index);
+        fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
+        fieldCapabilitiesRequest.fields("*");
+        ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
+            client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, listener);
+            // This response gets discarded - the listener handles the real response
+            return null;
+        });
+    }
+
+    private void getDocValueFieldsLimit(String[] index, ActionListener<Integer> docValueFieldsLimitListener) {
+        ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
+                Integer minDocValueFieldsLimit = Integer.MAX_VALUE;
+
+                ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
+                Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
+                while (iterator.hasNext()) {
+                    ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
+                    Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
+                    if (indexMaxDocValueFields < minDocValueFieldsLimit) {
+                        minDocValueFieldsLimit = indexMaxDocValueFields;
+                    }
+                }
+                docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
+            },
+            e -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                    docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
+                        + ((IndexNotFoundException) e).getIndex() + " does not exist"));
+                } else {
+                    docValueFieldsLimitListener.onFailure(e);
+                }
+            }
+        );
+
+        GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
+        getSettingsRequest.indices(index);
+        getSettingsRequest.includeDefaults(true);
+        getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
+        client.admin().indices().getSettings(getSettingsRequest, settingsListener);
+    }
+}

+ 52 - 53
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetectorTests.java

@@ -47,7 +47,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .addAggregatableField("some_float", "float").build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -62,7 +62,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -76,7 +76,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .addAggregatableField("some_keyword", "keyword").build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." +
@@ -88,7 +88,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .addAggregatableField("indecisive_field", "float", "keyword").build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
@@ -104,7 +104,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -125,7 +125,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -144,7 +144,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
@@ -160,7 +160,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
@@ -176,7 +176,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]  {"some_float", "some_keyword"}, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
@@ -190,7 +190,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]  {"foo", "bar"}, new String[] {"foo"});
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -207,7 +207,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("foo"), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " +
@@ -223,19 +223,33 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildClassificationConfig("some_float"), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " +
             "expected types are [boolean, byte, integer, ip, keyword, long, short, text]"));
     }
 
+    public void testDetect_GivenClassificationAndDependentVariableHasInvalidCardinality() {
+        FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
+            .addAggregatableField("some_long", "long")
+            .addAggregatableField("some_keyword", "keyword")
+            .addAggregatableField("foo", "keyword")
+            .build();
+
+        ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX,
+            buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L));
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
+
+        assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]"));
+    }
+
     public void testDetect_GivenIgnoredField() {
         FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
             .addAggregatableField("_id", "float").build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
@@ -248,7 +262,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("field [_id] cannot be analyzed"));
@@ -270,7 +284,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FieldCapabilitiesResponse fieldCapabilities = mockFieldCapsResponseBuilder.build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -287,7 +301,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected"));
@@ -302,7 +316,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
         assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
             "Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@@ -318,7 +332,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -337,7 +351,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " +
@@ -353,7 +367,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
@@ -369,7 +383,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -387,7 +401,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
@@ -404,29 +418,12 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap());
         ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
 
         assertThat(e.getMessage(), equalTo("No field [ml] could be detected"));
     }
 
-    public void testDetect_NullResultsField() {
-        FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
-            .addAggregatableField(RESULTS_FIELD, "float")
-            .addAggregatableField("my_field1", "float")
-            .addAggregatableField("your_field2", "float")
-            .addAggregatableField("your_keyword", "keyword")
-            .build();
-
-        ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), null, false, 100, fieldCapabilities);
-        ExtractedFields extractedFields = extractedFieldsDetector.detect();
-
-        List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
-            .collect(Collectors.toList());
-        assertThat(extractedFieldNames, equalTo(Arrays.asList(RESULTS_FIELD, "my_field1", "your_field2")));
-    }
-
     public void testDetect_GivenLessFieldsThanDocValuesLimit() {
         FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
             .addAggregatableField("field_1", "float")
@@ -436,7 +433,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 4, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), true, 4, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -455,7 +452,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 3, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), true, 3, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -474,7 +471,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, true, 2, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), true, 2, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<String> extractedFieldNames = extractedFields.getAllFields().stream().map(ExtractedField::getName)
@@ -490,7 +487,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildOutlierDetectionConfig(), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -515,7 +512,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildClassificationConfig("some_boolean"), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildClassificationConfig("some_boolean"), false, 100, fieldCapabilities,
+            Collections.singletonMap("some_boolean", 2L));
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         List<ExtractedField> allFields = extractedFields.getAllFields();
@@ -547,7 +545,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("a_float"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("a_float"), true, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(5));
@@ -564,7 +562,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildClassificationConfig("field_1"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildClassificationConfig("field_1"), true, 100, fieldCapabilities, Collections.singletonMap("field_1", 2L));
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -581,7 +579,8 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildClassificationConfig("field_1.keyword"), true, 100, fieldCapabilities,
+            Collections.singletonMap("field_1.keyword", 2L));
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -600,7 +599,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -617,7 +616,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 0, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("field_2"), true, 0, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -635,7 +634,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("field_2.double"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("field_2.double"), true, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -652,7 +651,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
             .build();
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("field_2"), RESULTS_FIELD, true, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("field_2"), true, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));
@@ -670,7 +669,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
         FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
 
         ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
-            SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), RESULTS_FIELD, false, 100, fieldCapabilities);
+            SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
         ExtractedFields extractedFields = extractedFieldsDetector.detect();
 
         assertThat(extractedFields.getAllFields().size(), equalTo(2));