Преглед изворни кода

ESQL: Check for circuit breaker in async (#111070)

This adds a test to make sure that circuit breakers exceptions thrown
from the async ESQL task are properly recorded to the index.
Nik Everett пре 1 година
родитељ
комит
0710a4495f

+ 86 - 4
test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

@@ -16,7 +16,9 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.WarningsHandler;
+import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -48,8 +50,10 @@ import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
 import static org.hamcrest.Matchers.any;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.matchesRegex;
 
 /**
  * Tests that run ESQL queries that have, in the past, used so much memory they
@@ -97,6 +101,77 @@ public class HeapAttackIT extends ESRestTestCase {
         assertCircuitBreaks(() -> sortByManyLongs(5000));
     }
 
+    /**
+     * This should record an async response with a {@link CircuitBreakingException}.
+     */
+    public void testSortByManyLongsTooMuchMemoryAsync() throws IOException {
+        initManyLongs();
+        Request request = new Request("POST", "/_query/async");
+        request.addParameter("error_trace", "");
+        request.setJsonEntity(makeSortByManyLongs(5000).toString().replace("\n", "\\n"));
+        request.setOptions(
+            RequestOptions.DEFAULT.toBuilder()
+                .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build())
+                .setWarningsHandler(WarningsHandler.PERMISSIVE)
+        );
+        logger.info("--> test {} started async", getTestName());
+        Response response = runQuery(() -> {
+            Response r = client().performRequest(request);
+            Map<?, ?> map = responseAsMap(r);
+            assertMap(map, matchesMap().extraOk().entry("is_running", true).entry("id", any(String.class)));
+            String id = map.get("id").toString();
+            Request fetch = new Request("GET", "/_query/async/" + id);
+            long endTime = System.nanoTime() + TimeValue.timeValueMinutes(5).nanos();
+            while (System.nanoTime() < endTime) {
+                Response resp;
+                try {
+                    resp = client().performRequest(fetch);
+                } catch (ResponseException e) {
+                    if (e.getResponse().getStatusLine().getStatusCode() == 404) {
+                        logger.error("polled for results got 404");
+                        continue;
+                    }
+                    if (e.getResponse().getStatusLine().getStatusCode() == 503) {
+                        logger.error("polled for results got 503");
+                        continue;
+                    }
+                    if (e.getResponse().getStatusLine().getStatusCode() == 429) {
+                        // This is what we were going to for - a CircuitBreakerException
+                        return e.getResponse();
+                    }
+                    throw e;
+                }
+                Map<?, ?> m = responseAsMap(resp);
+                logger.error("polled for results {}", m);
+                boolean isRunning = (boolean) m.get("is_running");
+                if (isRunning == false) {
+                    return resp;
+                }
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            throw new IOException("timed out");
+        });
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(429));
+        Map<?, ?> map = responseAsMap(response);
+        assertMap(
+            map,
+            matchesMap().entry("status", 429)
+                .entry(
+                    "error",
+                    matchesMap().entry("bytes_wanted", greaterThan(1000))
+                        .entry("reason", matchesRegex("\\[request] Data too large, data for \\[topn] would .+"))
+                        .entry("durability", "TRANSIENT")
+                        .entry("type", "circuit_breaking_exception")
+                        .entry("bytes_limit", greaterThan(1000))
+                        .entry("root_cause", matchesList().item(any(Map.class)))
+                )
+        );
+    }
+
     private void assertCircuitBreaks(ThrowingRunnable r) throws IOException {
         ResponseException e = expectThrows(ResponseException.class, r);
         Map<?, ?> map = responseAsMap(e.getResponse());
@@ -109,13 +184,17 @@ public class HeapAttackIT extends ESRestTestCase {
 
     private Response sortByManyLongs(int count) throws IOException {
         logger.info("sorting by {} longs", count);
+        return query(makeSortByManyLongs(count).toString(), null);
+    }
+
+    private StringBuilder makeSortByManyLongs(int count) {
         StringBuilder query = makeManyLongs(count);
         query.append("| SORT a, b, i0");
         for (int i = 1; i < count; i++) {
             query.append(", i").append(i);
         }
         query.append("\\n| KEEP a, b | LIMIT 10000\"}");
-        return query(query.toString(), null);
+        return query;
     }
 
     /**
@@ -295,12 +374,16 @@ public class HeapAttackIT extends ESRestTestCase {
         if (filterPath != null) {
             request.addParameter("filter_path", filterPath);
         }
-        request.setJsonEntity(query.toString().replace("\n", "\\n"));
+        request.setJsonEntity(query.replace("\n", "\\n"));
         request.setOptions(
             RequestOptions.DEFAULT.toBuilder()
                 .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build())
                 .setWarningsHandler(WarningsHandler.PERMISSIVE)
         );
+        return runQuery(() -> client().performRequest(request));
+    }
+
+    private Response runQuery(CheckedSupplier<Response, IOException> run) throws IOException {
         logger.info("--> test {} started querying", getTestName());
         final ThreadPool testThreadPool = new TestThreadPool(getTestName());
         final long startedTimeInNanos = System.nanoTime();
@@ -321,11 +404,10 @@ public class HeapAttackIT extends ESRestTestCase {
                     RequestConfig requestConfig = RequestConfig.custom()
                         .setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(2).millis()))
                         .build();
-                    request.setOptions(RequestOptions.DEFAULT.toBuilder().setRequestConfig(requestConfig));
                     client().performRequest(triggerOOM);
                 }
             }, TimeValue.timeValueMinutes(5), testThreadPool.executor(ThreadPool.Names.GENERIC));
-            Response resp = client().performRequest(request);
+            Response resp = run.get();
             logger.info("--> test {} completed querying", getTestName());
             return resp;
         } finally {