Browse Source

Fix SearchResponse leaks in REST and Enrich tests (#103533)

It's in the title, fix a couple of tests here and there. Also, fix REST
tests in particular and add a utility to cleanly read a SearchResponse
from a REST response without leaking the parser.

for #102030
Armin Braun 1 year ago
parent
commit
41d1665122

+ 11 - 0
test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java

@@ -9,6 +9,11 @@ package org.elasticsearch.search;
 
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.io.IOException;
 
 public enum SearchResponseUtils {
     ;
@@ -25,4 +30,10 @@ public enum SearchResponseUtils {
     public static long getTotalHitsValue(SearchRequestBuilder request) {
         return getTotalHits(request).value;
     }
+
+    public static SearchResponse responseAsSearchResponse(Response searchResponse) throws IOException {
+        try (var parser = ESRestTestCase.responseAsParser(searchResponse)) {
+            return SearchResponse.fromXContent(parser);
+        }
+    }
 }

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -1824,7 +1824,7 @@ public abstract class ESRestTestCase extends ESTestCase {
         return responseEntity;
     }
 
-    protected static XContentParser responseAsParser(Response response) throws IOException {
+    public static XContentParser responseAsParser(Response response) throws IOException {
         return XContentHelper.createParser(XContentParserConfiguration.EMPTY, responseAsBytes(response), XContentType.JSON);
     }
 

+ 5 - 1
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

@@ -193,7 +193,11 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
         assertBusy(() -> {
             try {
                 AsyncSearchResponse resp = getAsyncSearch(id);
-                assertFalse(resp.isRunning());
+                try {
+                    assertFalse(resp.isRunning());
+                } finally {
+                    resp.decRef();
+                }
             } catch (Exception exc) {
                 if (ExceptionsHelper.unwrapCause(exc.getCause()) instanceof ResourceNotFoundException == false) {
                     throw exc;

+ 3 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java

@@ -216,11 +216,13 @@ public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
         // To begin with, the results index should be auto-created.
         AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
         AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
-        {
+        try {
             PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
             indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
             future.get();
             assertSettings();
+        } finally {
+            resp.decRef();
         }
 
         // Delete the index, so we can test subsequent auto-create behaviour

+ 21 - 19
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

@@ -257,26 +257,28 @@ public class EnrichProcessorFactoryTests extends ESTestCase {
                     ActionListener<Response> listener
                 ) {
                     assert EnrichCoordinatorProxyAction.NAME.equals(action.name());
-                    var emptyResponse = new SearchResponse(
-                        new InternalSearchResponse(
-                            new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
-                            InternalAggregations.EMPTY,
-                            new Suggest(Collections.emptyList()),
-                            new SearchProfileResults(Collections.emptyMap()),
-                            false,
-                            false,
-                            1
-                        ),
-                        "",
-                        1,
-                        1,
-                        0,
-                        0,
-                        ShardSearchFailure.EMPTY_ARRAY,
-                        SearchResponse.Clusters.EMPTY
-                    );
                     requestCounter[0]++;
-                    listener.onResponse((Response) emptyResponse);
+                    ActionListener.respondAndRelease(
+                        listener,
+                        (Response) new SearchResponse(
+                            new InternalSearchResponse(
+                                new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
+                                InternalAggregations.EMPTY,
+                                new Suggest(Collections.emptyList()),
+                                new SearchProfileResults(Collections.emptyMap()),
+                                false,
+                                false,
+                                1
+                            ),
+                            "",
+                            1,
+                            1,
+                            0,
+                            0,
+                            ShardSearchFailure.EMPTY_ARRAY,
+                            SearchResponse.Clusters.EMPTY
+                        )
+                    );
                 }
             };
             EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);

+ 3 - 2
x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java

@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -153,7 +154,7 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
         assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
 
         client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
-        assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
+        assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
     }
 
     public void testWriteThreadLivenessWithPipeline() throws Exception {
@@ -276,6 +277,6 @@ public class EnrichResiliencyTests extends ESSingleNodeTestCase {
         assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));
 
         client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
-        assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
+        assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
     }
 }

+ 5 - 4
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

@@ -222,8 +222,7 @@ public class RollupIndexerStateTests extends ESTestCase {
             }
 
             try {
-                SearchResponse response = searchFunction.apply(buildSearchRequest());
-                nextPhase.onResponse(response);
+                ActionListener.respondAndRelease(nextPhase, searchFunction.apply(buildSearchRequest()));
             } catch (Exception e) {
                 nextPhase.onFailure(e);
             }
@@ -482,8 +481,10 @@ public class RollupIndexerStateTests extends ESTestCase {
                         null,
                         1
                     );
-                    final SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null);
-                    nextPhase.onResponse(response);
+                    ActionListener.respondAndRelease(
+                        nextPhase,
+                        new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)
+                    );
                 }
 
                 @Override

+ 26 - 15
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityDlsAndFlsRestIT.java

@@ -14,6 +14,7 @@ import org.elasticsearch.client.Response;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchResponseUtils;
 import org.elasticsearch.test.rest.ObjectPath;
 
 import java.io.IOException;
@@ -170,14 +171,19 @@ public abstract class AbstractRemoteClusterSecurityDlsAndFlsRestIT extends Abstr
         String[] expectedRemoteIndices,
         String[] expectedFields
     ) {
-        try (var parser = responseAsParser(searchResponse)) {
+        try {
             assertOK(searchResponse);
-            final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
-                .collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
+            var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
+            try {
+                final var searchResult = Arrays.stream(response.getHits().getHits())
+                    .collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
 
-            assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
-            for (String remoteIndex : expectedRemoteIndices) {
-                assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
+                assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
+                for (String remoteIndex : expectedRemoteIndices) {
+                    assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
+                }
+            } finally {
+                response.decRef();
             }
         } catch (IOException e) {
             throw new UncheckedIOException(e);
@@ -193,15 +199,20 @@ public abstract class AbstractRemoteClusterSecurityDlsAndFlsRestIT extends Abstr
         Response searchResponse,
         Map<String, Set<String>> expectedRemoteIndicesAndFields
     ) {
-        try (var parser = responseAsParser(searchResponse)) {
+        try {
             assertOK(searchResponse);
-            final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
-                .collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
+            var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
+            try {
+                final var searchResult = Arrays.stream(response.getHits().getHits())
+                    .collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
 
-            assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
-            for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
-                Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
-                assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
+                assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
+                for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
+                    Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
+                    assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
+                }
+            } finally {
+                response.decRef();
             }
         } catch (IOException e) {
             throw new UncheckedIOException(e);
@@ -209,9 +220,9 @@ public abstract class AbstractRemoteClusterSecurityDlsAndFlsRestIT extends Abstr
     }
 
     protected void assertSearchResponseContainsEmptyResult(Response response) {
-        try (var parser = responseAsParser(response)) {
+        try {
             assertOK(response);
-            SearchResponse searchResponse = SearchResponse.fromXContent(parser);
+            SearchResponse searchResponse = SearchResponseUtils.responseAsSearchResponse(response);
             try {
                 assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
             } finally {

+ 2 - 2
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/ReloadRemoteClusterCredentialsIT.java

@@ -126,7 +126,7 @@ public class ReloadRemoteClusterCredentialsIT extends SecuritySingleNodeTestCase
             configureRemoteCluster(remoteAddress);
 
             // Run search to trigger header capturing on the receiving side
-            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
+            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();
 
             assertHeadersContainCredentialsThenClear(credentials, capturedHeaders);
 
@@ -135,7 +135,7 @@ public class ReloadRemoteClusterCredentialsIT extends SecuritySingleNodeTestCase
             writeCredentialsToKeyStore(updatedCredentials);
             reloadSecureSettings();
 
-            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
+            client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();
 
             assertHeadersContainCredentialsThenClear(updatedCredentials, capturedHeaders);
         }

+ 17 - 22
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

@@ -182,19 +182,21 @@ public class WatcherServiceTests extends ESTestCase {
             null,
             1
         );
-        SearchResponse scrollSearchResponse = new SearchResponse(
-            scrollSearchSections,
-            "scrollId",
-            1,
-            1,
-            0,
-            10,
-            ShardSearchFailure.EMPTY_ARRAY,
-            SearchResponse.Clusters.EMPTY
-        );
         doAnswer(invocation -> {
             ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
-            listener.onResponse(scrollSearchResponse);
+            ActionListener.respondAndRelease(
+                listener,
+                new SearchResponse(
+                    scrollSearchSections,
+                    "scrollId",
+                    1,
+                    1,
+                    0,
+                    10,
+                    ShardSearchFailure.EMPTY_ARRAY,
+                    SearchResponse.Clusters.EMPTY
+                )
+            );
             return null;
         }).when(client).execute(eq(TransportSearchScrollAction.TYPE), any(SearchScrollRequest.class), anyActionListener());
 
@@ -222,19 +224,12 @@ public class WatcherServiceTests extends ESTestCase {
         }
         SearchHits searchHits = new SearchHits(hits, new TotalHits(count, TotalHits.Relation.EQUAL_TO), 1.0f);
         SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);
-        SearchResponse searchResponse = new SearchResponse(
-            sections,
-            "scrollId",
-            1,
-            1,
-            0,
-            10,
-            ShardSearchFailure.EMPTY_ARRAY,
-            SearchResponse.Clusters.EMPTY
-        );
         doAnswer(invocation -> {
             ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
-            listener.onResponse(searchResponse);
+            ActionListener.respondAndRelease(
+                listener,
+                new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)
+            );
             return null;
         }).when(client).execute(eq(TransportSearchAction.TYPE), any(SearchRequest.class), anyActionListener());
 

+ 18 - 13
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TokenBackwardsCompatibilityIT.java

@@ -8,7 +8,6 @@ package org.elasticsearch.upgrades;
 
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpHost;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
@@ -17,6 +16,7 @@ import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.WarningsHandler;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.SearchResponseUtils;
 import org.elasticsearch.test.rest.ObjectPath;
 import org.junit.After;
 import org.junit.Before;
@@ -440,17 +440,22 @@ public class TokenBackwardsCompatibilityIT extends AbstractUpgradeTestCase {
             }""");
         final Response searchResponse = client().performRequest(searchRequest);
         assertOK(searchResponse);
-        final SearchHits searchHits = SearchResponse.fromXContent(responseAsParser(searchResponse)).getHits();
-        assertThat(
-            "Search request used with size parameter that was too small to fetch all tokens.",
-            searchHits.getTotalHits().value,
-            lessThanOrEqualTo(searchSize)
-        );
-        final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
-            assertNotNull(searchHit.getId());
-            return searchHit.getId();
-        }).toList();
-        assertThat(tokenIds, not(empty()));
-        return tokenIds;
+        var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
+        try {
+            final SearchHits searchHits = response.getHits();
+            assertThat(
+                "Search request used with size parameter that was too small to fetch all tokens.",
+                searchHits.getTotalHits().value,
+                lessThanOrEqualTo(searchSize)
+            );
+            final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
+                assertNotNull(searchHit.getId());
+                return searchHit.getId();
+            }).toList();
+            assertThat(tokenIds, not(empty()));
+            return tokenIds;
+        } finally {
+            response.decRef();
+        }
     }
 }