1
0
Эх сурвалжийг харах

[ML] Return 408 when open/start APIs time out (#89775)

This changes the response status code from `500` to `408` when
the following ML APIs time out:

- open anomaly detection job
- start datafeed
- start data frame analytics

Closes #89585
Dimitris Athanasiou 3 жил өмнө
parent
commit
4bbe4eab2d

+ 6 - 0
docs/changelog/89775.yaml

@@ -0,0 +1,6 @@
+pr: 89775
+summary: Return 408 instead of 500 when open/start APIs time out
+area: Machine Learning
+type: bug
+issues:
+ - 89585

+ 36 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -31,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -792,4 +794,38 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
             assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
         }, 30, TimeUnit.SECONDS);
     }
+
+    public void testStartDatafeed_GivenTimeout_Returns408() throws Exception {
+        client().admin().indices().prepareCreate("data-1").setMapping("time", "type=date").get();
+        long numDocs = 100;
+        long now = System.currentTimeMillis();
+        long oneWeekAgo = now - 604800000;
+        indexDocs(logger, "data-1", numDocs, oneWeekAgo, now);
+
+        String jobId = "job-for-start-datafeed-timeout";
+        String datafeedId = jobId + "-datafeed";
+
+        Job.Builder job = createScheduledJob(jobId);
+        putJob(job);
+        openJob(job.getId());
+        assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
+
+        DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(
+            job.getId() + "-datafeed",
+            job.getId(),
+            Collections.singletonList("data-1")
+        );
+        DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
+        putDatafeed(datafeedConfig);
+
+        StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, oneWeekAgo);
+        request.getParams().setTimeout(TimeValue.timeValueNanos(1L));
+
+        ElasticsearchException e = expectThrows(
+            ElasticsearchException.class,
+            () -> client().execute(StartDatafeedAction.INSTANCE, request).actionGet()
+        );
+
+        assertThat(e.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
+    }
 }

+ 23 - 4
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

@@ -15,6 +15,8 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.SecuritySettingsSourceField;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -31,6 +33,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
@@ -150,6 +153,15 @@ public class MlJobIT extends ESRestTestCase {
         assertEquals(1, XContentMapValues.extractValue("ml.jobs.opened.count", usage));
     }
 
+    public void testOpenJob_GivenTimeout_Returns408() throws IOException {
+        String jobId = "test-timeout-returns-408";
+        createFarequoteJob(jobId);
+
+        ResponseException e = expectThrows(ResponseException.class, () -> openJob(jobId, Optional.of(TimeValue.timeValueNanos(1L))));
+
+        assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.REQUEST_TIMEOUT.getStatus()));
+    }
+
     private Response createFarequoteJob(String jobId) throws IOException {
         return putJob(jobId, """
             {
@@ -960,10 +972,17 @@ public class MlJobIT extends ESRestTestCase {
     }
 
     private void openJob(String jobId) throws IOException {
-        Response openResponse = client().performRequest(
-            new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open")
-        );
-        assertThat(entityAsMap(openResponse), hasEntry("opened", true));
+        Response response = openJob(jobId, Optional.empty());
+        assertThat(entityAsMap(response), hasEntry("opened", true));
+    }
+
+    private Response openJob(String jobId, Optional<TimeValue> timeout) throws IOException {
+        StringBuilder path = new StringBuilder(MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
+        if (timeout.isPresent()) {
+            path.append("?timeout=" + timeout.get().getStringRep());
+        }
+        Response openResponse = client().performRequest(new Request("POST", path.toString()));
+        return openResponse;
     }
 
     private void closeJob(String jobId) throws IOException {

+ 43 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
 import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -15,12 +16,15 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
+import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
@@ -1043,6 +1047,45 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
         );
     }
 
+    public void testStart_GivenTimeout_Returns408() throws Exception {
+        String sourceIndex = "test-timeout-returns-408-data";
+
+        client().admin().indices().prepareCreate(sourceIndex).setMapping("numeric_1", "type=integer", "numeric_2", "type=integer").get();
+
+        BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+
+        for (int i = 0; i < 5; i++) {
+            IndexRequest indexRequest = new IndexRequest(sourceIndex);
+            indexRequest.id(String.valueOf(i));
+            indexRequest.source("numeric_1", randomInt(), "numeric_2", randomInt());
+            bulkRequestBuilder.add(indexRequest);
+        }
+        BulkResponse bulkResponse = bulkRequestBuilder.get();
+        if (bulkResponse.hasFailures()) {
+            fail("Failed to index data: " + bulkResponse.buildFailureMessage());
+        }
+
+        String id = "test-timeout-returns-408";
+        DataFrameAnalyticsConfig config = buildAnalytics(
+            id,
+            sourceIndex,
+            sourceIndex + "-results",
+            null,
+            new OutlierDetection.Builder().build()
+        );
+        putAnalytics(config);
+
+        StartDataFrameAnalyticsAction.Request request = new StartDataFrameAnalyticsAction.Request(id);
+        request.setTimeout(TimeValue.timeValueNanos(1L));
+        ElasticsearchException e = expectThrows(
+            ElasticsearchException.class,
+            () -> client().execute(StartDataFrameAnalyticsAction.INSTANCE, request).actionGet()
+        );
+
+        assertThat(e.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
+    }
+
     @Override
     boolean supportsInference() {
         return false;

+ 8 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -254,7 +254,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
 
                 @Override
                 public void onTimeout(TimeValue timeout) {
-                    listener.onFailure(new ElasticsearchException("Opening job [{}] timed out after [{}]", jobParams.getJob(), timeout));
+                    listener.onFailure(
+                        new ElasticsearchStatusException(
+                            "Opening job [{}] timed out after [{}]",
+                            RestStatus.REQUEST_TIMEOUT,
+                            jobParams.getJob().getId(),
+                            timeout
+                        )
+                    );
                 }
             }
         );

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
@@ -507,8 +506,9 @@ public class TransportStartDataFrameAnalyticsAction extends TransportMasterNodeA
                         );
                     } else {
                         listener.onFailure(
-                            new ElasticsearchException(
+                            new ElasticsearchStatusException(
                                 "Starting data frame analytics [{}] timed out after [{}]",
+                                RestStatus.REQUEST_TIMEOUT,
                                 task.getParams().getId(),
                                 timeout
                             )

+ 6 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
@@ -407,7 +406,12 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
                 @Override
                 public void onTimeout(TimeValue timeout) {
                     listener.onFailure(
-                        new ElasticsearchException("Starting datafeed [" + params.getDatafeedId() + "] timed out after [" + timeout + "]")
+                        new ElasticsearchStatusException(
+                            "Starting datafeed [{}] timed out after [{}]",
+                            RestStatus.REQUEST_TIMEOUT,
+                            params.getDatafeedId(),
+                            timeout
+                        )
                     );
                 }
             }