Explorar el Código

Possible source of leaked delayable writables (#80166)

In a couple of error paths, it's possible that QueryPhaseResultsConsumer#PendingMerges might not have consumed all the aggregations it's trying to merge. It is never the less important to release those aggregations so that we don't leak memory or hold references to them. This PR achieves that by using the Releasables.close() mechanism, which will execute each close action, even if earlier actions had exceptions. This ensures that all of the aggregations get released and that the circuit breaker gets cleaned up.

Co-authored-by: Henning Andersen <henning.andersen@elastic.co>
Co-authored-by: David Turner <david.turner@elastic.co>
Mark Tozzi hace 4 años
padre
commit
8e4351342f

+ 16 - 5
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

@@ -36,6 +36,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.action.search.SearchPhaseController.getTopDocsSize;
 import static org.elasticsearch.action.search.SearchPhaseController.mergeTopDocs;
@@ -249,14 +250,24 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
 
         @Override
         public synchronized void close() {
-            assert hasPendingMerges() == false : "cannot close with partial reduce in-flight";
             if (hasFailure()) {
                 assert circuitBreakerBytes == 0;
-                return;
+            } else {
+                assert circuitBreakerBytes >= 0;
+            }
+
+            List<Releasable> toRelease = new ArrayList<>(buffer.stream().<Releasable>map(b -> b::releaseAggs).collect(Collectors.toList()));
+            toRelease.add(() -> {
+                circuitBreaker.addWithoutBreaking(-circuitBreakerBytes);
+                circuitBreakerBytes = 0;
+            });
+
+            Releasables.close(toRelease);
+
+            if (hasPendingMerges()) {
+                // This is a theoretically unreachable exception.
+                throw new IllegalStateException("Attempted to close with partial reduce in-flight");
             }
-            assert circuitBreakerBytes >= 0;
-            circuitBreaker.addWithoutBreaking(-circuitBreakerBytes);
-            circuitBreakerBytes = 0;
         }
 
         synchronized Exception getFailure() {

+ 3 - 1
server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

@@ -377,7 +377,9 @@ public final class QuerySearchResult extends SearchPhaseResult {
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         // we do not know that it is being sent over transport, but this at least protects all writes from happening, including sending.
-        assert aggregations == null || aggregations.isSerialized() == false : "cannot send serialized version since it will leak";
+        if (aggregations != null && aggregations.isSerialized()) {
+            throw new IllegalStateException("cannot send serialized version since it will leak");
+        }
         if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
             out.writeBoolean(isNull);
         }

+ 22 - 0
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -454,6 +455,27 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         ensureTaskNotRunning(response.getId());
     }
 
+    public void testSearchPhaseFailureLeak() throws Exception {
+        SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
+        request.setKeepOnCompletion(true);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
+        request.getSearchRequest().allowPartialSearchResults(false);
+        request.getSearchRequest()
+            .source(
+                new SearchSourceBuilder().query(
+                    new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), between(0, numShards - 1))
+                )
+            );
+        request.getSearchRequest().source().aggregation(terms("f").field("f").size(between(1, 10)));
+
+        AsyncSearchResponse response = submitAsyncSearch(request);
+        assertFalse(response.isRunning());
+        assertTrue(response.isPartial());
+        assertThat(response.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
+        assertNotNull(response.getFailure());
+        ensureTaskNotRunning(response.getId());
+    }
+
     public void testMaxResponseSize() {
         SearchSourceBuilder source = new SearchSourceBuilder().query(new MatchAllQueryBuilder())
             .aggregation(AggregationBuilders.terms("terms").field("terms.keyword").size(numKeywords));