Bläddra i källkod

[ML] Better error when aggregate_metric_double used in scrolling data… (#92232)

Fields of type `aggregate_metric_double` cannot be used with anomaly detection
datafeeds unless they are used in aggregations. This commit improves the error
message in this scenario so that the user gets better informed of why they
cannot use such fields.

Closes #90592

Co-authored-by: David Roberts <dave.roberts@elastic.co>
Dimitris Athanasiou 2 år sedan
förälder
incheckning
e3f76c5bea

+ 6 - 0
docs/changelog/92232.yaml

@@ -0,0 +1,6 @@
+pr: 92232
+summary: Better error when `aggregate_metric_double` used in scrolling datafeeds
+area: Machine Learning
+type: enhancement
+issues:
+ - 90592

+ 53 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
@@ -64,6 +65,7 @@ import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDataf
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
@@ -828,4 +830,55 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
 
         assertThat(e.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
     }
+
+    public void testStart_GivenAggregateMetricDoubleWithoutAggs() throws Exception {
+        final String index = "index-with-aggregate-metric-double";
+        String mapping = """
+            {
+              "properties": {
+                "time": {
+                  "type": "date"
+                },
+                "presum": {
+                  "type": "aggregate_metric_double",
+                  "metrics": [ "min", "max", "sum", "value_count" ],
+                  "default_metric": "max"
+                }
+              }
+            }""";
+        client().admin().indices().prepareCreate(index).setMapping(mapping).get();
+
+        DataDescription.Builder dataDescription = new DataDescription.Builder();
+
+        Detector.Builder d = new Detector.Builder("avg", "presum");
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
+        analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
+
+        Job.Builder jobBuilder = new Job.Builder();
+        jobBuilder.setId("job-with-aggregate-metric-double");
+        jobBuilder.setAnalysisConfig(analysisConfig);
+        jobBuilder.setDataDescription(dataDescription);
+
+        putJob(jobBuilder);
+        openJob(jobBuilder.getId());
+        assertBusy(() -> assertEquals(getJobStats(jobBuilder.getId()).get(0).getState(), JobState.OPENED));
+
+        DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(jobBuilder.getId() + "-datafeed", jobBuilder.getId());
+        dfBuilder.setIndices(Collections.singletonList(index));
+
+        DatafeedConfig datafeedConfig = dfBuilder.build();
+
+        putDatafeed(datafeedConfig);
+
+        ElasticsearchStatusException e = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> startDatafeed(datafeedConfig.getId(), 0L, null)
+        );
+
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(
+            e.getMessage(),
+            containsString("field [presum] is of type [aggregate_metric_double] and cannot be used in a datafeed without aggregations")
+        );
+    }
 }

+ 30 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
 
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
@@ -25,10 +26,16 @@ import org.elasticsearch.xpack.core.ml.utils.MlStrings;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
 
+import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 public class ScrollDataExtractorFactory implements DataExtractorFactory {
+
+    // This field type is not supported for scrolling datafeeds.
+    private static final String AGGREGATE_METRIC_DOUBLE = "aggregate_metric_double";
+
     private final Client client;
     private final DatafeedConfig datafeedConfig;
     private final Job job;
@@ -104,6 +111,17 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
                 );
                 return;
             }
+            Optional<String> optionalAggregatedMetricDouble = findFirstAggregatedMetricDoubleField(fieldCapabilitiesResponse);
+            if (optionalAggregatedMetricDouble.isPresent()) {
+                listener.onFailure(
+                    ExceptionsHelper.badRequestException(
+                        "field [{}] is of type [{}] and cannot be used in a datafeed without aggregations",
+                        optionalAggregatedMetricDouble.get(),
+                        AGGREGATE_METRIC_DOUBLE
+                    )
+                );
+                return;
+            }
             TimeBasedExtractedFields fields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
             listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, fields, xContentRegistry, timingStatsReporter));
         }, e -> {
@@ -142,4 +160,16 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
             return null;
         });
     }
+
+    private static Optional<String> findFirstAggregatedMetricDoubleField(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
+        Map<String, Map<String, FieldCapabilities>> indexTofieldCapsMap = fieldCapabilitiesResponse.get();
+        for (Map.Entry<String, Map<String, FieldCapabilities>> indexToFieldCaps : indexTofieldCapsMap.entrySet()) {
+            for (Map.Entry<String, FieldCapabilities> typeToFieldCaps : indexToFieldCaps.getValue().entrySet()) {
+                if (AGGREGATE_METRIC_DOUBLE.equals(typeToFieldCaps.getKey())) {
+                    return Optional.of(typeToFieldCaps.getValue().getName());
+                }
+            }
+        }
+        return Optional.empty();
+    }
 }