Browse Source

Early detection of circuit breaker exception in the coordinating node (#67431)

In #62884  we added the support for the request circuit breaker in search coordinating nodes.
Today the circuit breaker is strictly checked only when a partial or final reduce occurs.
With this commit, we also check the circuit breaker strictly when a shard response
is received and we cancel the request early if an exception is thrown at this point.
Jim Ferenczi 4 years ago
parent
commit
9114e8266d

+ 11 - 5
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

@@ -316,6 +316,17 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
                         emptyResults.add(new SearchShard(target.getClusterAlias(), target.getShardId()));
                     }
                 } else {
+                    if (hasAggs) {
+                        long aggsSize = ramBytesUsedQueryResult(result);
+                        try {
+                            addEstimateAndMaybeBreak(aggsSize);
+                        } catch (Exception exc) {
+                            onMergeFailure(exc);
+                            next.run();
+                            return;
+                        }
+                        aggsCurrentBufferSize += aggsSize;
+                    }
                     // add one if a partial merge is pending
                     int size = buffer.size() + (hasPartialReduce ? 1 : 0);
                     if (size >= batchReduceSize) {
@@ -329,11 +340,6 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
                         queue.add(task);
                         tryExecuteNext();
                     }
-                    if (hasAggs) {
-                        long aggsSize = ramBytesUsedQueryResult(result);
-                        addWithoutBreaking(aggsSize);
-                        aggsCurrentBufferSize += aggsSize;
-                    }
                     buffer.add(result);
                 }
             }

+ 2 - 0
server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

@@ -93,6 +93,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
         if (queryResult.isNull() == false
                 // disable sort optims for scroll requests because they keep track of the last bottom doc locally (per shard)
                 && getRequest().scroll() == null
+                // top docs are already consumed if the query was cancelled or in error.
+                && queryResult.hasConsumedTopDocs() == false
                 && queryResult.topDocs() != null
                 && queryResult.topDocs().topDocs.getClass() == TopFieldDocs.class) {
             TopFieldDocs topDocs = (TopFieldDocs) queryResult.topDocs().topDocs;

+ 14 - 19
server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

@@ -936,23 +936,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
         }
     }
 
-    public void testPartialReduce() throws Exception {
-        for (int i = 0; i < 10; i++) {
-            testReduceCase(false);
-        }
-    }
-
-    public void testPartialReduceWithFailure() throws Exception {
-        for (int i = 0; i < 10; i++) {
-            testReduceCase(true);
-        }
+    public void testCoordCircuitBreaker() throws Exception {
+        int numShards = randomIntBetween(20, 200);
+        testReduceCase(numShards, numShards, true);
+        testReduceCase(numShards, numShards, false);
+        testReduceCase(numShards, randomIntBetween(2, numShards-1), true);
+        testReduceCase(numShards, randomIntBetween(2, numShards-1), false);
     }
 
-    private void testReduceCase(boolean shouldFail) throws Exception {
-        int expectedNumResults = randomIntBetween(20, 200);
-        int bufferSize = randomIntBetween(2, expectedNumResults - 1);
+    private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) throws Exception {
         SearchRequest request = new SearchRequest();
-
         request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
         request.setBatchedReduceSize(bufferSize);
         AtomicBoolean hasConsumedFailure = new AtomicBoolean();
@@ -963,10 +956,10 @@ public class SearchPhaseControllerTests extends ESTestCase {
         }
         QueryPhaseResultConsumer consumer = searchPhaseController.newSearchPhaseResults(fixedExecutor,
             circuitBreaker, SearchProgressListener.NOOP,
-            request, expectedNumResults, exc -> hasConsumedFailure.set(true));
-        CountDownLatch latch = new CountDownLatch(expectedNumResults);
-        Thread[] threads = new Thread[expectedNumResults];
-        for (int i =  0; i < expectedNumResults; i++) {
+            request, numShards, exc -> hasConsumedFailure.set(true));
+        CountDownLatch latch = new CountDownLatch(numShards);
+        Thread[] threads = new Thread[numShards];
+        for (int i =  0; i < numShards; i++) {
             final int index = i;
             threads[index] = new Thread(() -> {
                 QuerySearchResult result = new QuerySearchResult(new ShardSearchContextId(UUIDs.randomBase64UUID(), index),
@@ -985,13 +978,15 @@ public class SearchPhaseControllerTests extends ESTestCase {
             });
             threads[index].start();
         }
-        for (int i = 0; i < expectedNumResults; i++) {
+        for (int i = 0; i < numShards; i++) {
             threads[i].join();
         }
         latch.await();
         if (shouldFail) {
             if (shouldFailPartial == false) {
                 circuitBreaker.shouldBreak.set(true);
+            } else {
+                circuitBreaker.shouldBreak.set(false);
             }
             CircuitBreakingException exc = expectThrows(CircuitBreakingException.class, () -> consumer.reduce());
             assertEquals(shouldFailPartial, hasConsumedFailure.get());