Ver Fonte

Fix more search response leaks in tests (#103957)

Some more fixes as part of https://github.com/elastic/elasticsearch/issues/102030
Armin Braun há 1 ano atrás
pai
commit
1a848eb52e

+ 5 - 4
test/external-modules/latency-simulating-directory/src/internalClusterTest/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepositoryTests.java

@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.LongAdder;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
@@ -136,9 +137,9 @@ public class LatencySimulatingBlobStoreRepositoryTests extends AbstractSnapshotI
         assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
 
         logger.info("--> run a search");
-        var searchResponse = client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")).get();
-
-        assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
-        assertThat(COUNTS.intValue(), greaterThan(0));
+        assertResponse(client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")), searchResponse -> {
+            assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
+            assertThat(COUNTS.intValue(), greaterThan(0));
+        });
     }
 }

+ 8 - 3
x-pack/plugin/async-search/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/search/AsyncSearchSecurityIT.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.search;
 
 import org.apache.http.util.EntityUtils;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
@@ -182,15 +183,19 @@ public class AsyncSearchSecurityIT extends ESRestTestCase {
     private SearchHit[] getSearchHits(String asyncId, String user) throws IOException {
         final Response resp = getAsyncSearch(asyncId, user);
         assertOK(resp);
-        AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(
+        SearchResponse searchResponse = AsyncSearchResponse.fromXContent(
             XContentHelper.createParser(
                 NamedXContentRegistry.EMPTY,
                 LoggingDeprecationHandler.INSTANCE,
                 new BytesArray(EntityUtils.toByteArray(resp.getEntity())),
                 XContentType.JSON
             )
-        );
-        return searchResponse.getSearchResponse().getHits().getHits();
+        ).getSearchResponse();
+        try {
+            return searchResponse.getHits().getHits();
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     public void testAuthorizationOfPointInTime() throws Exception {

+ 18 - 16
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java

@@ -865,23 +865,25 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
             } catch (IOException e) {
                 listener.onFailure(e);
             }
-            SearchResponse response = new SearchResponse(
-                null,
-                new Aggregations(Collections.singletonList(result)),
-                null,
-                false,
-                null,
-                null,
-                1,
-                null,
-                1,
-                1,
-                0,
-                0,
-                ShardSearchFailure.EMPTY_ARRAY,
-                null
+            ActionListener.respondAndRelease(
+                listener,
+                new SearchResponse(
+                    null,
+                    new Aggregations(Collections.singletonList(result)),
+                    null,
+                    false,
+                    null,
+                    null,
+                    1,
+                    null,
+                    1,
+                    1,
+                    0,
+                    0,
+                    ShardSearchFailure.EMPTY_ARRAY,
+                    null
+                )
             );
-            listener.onResponse(response);
         }
 
         @Override

+ 18 - 16
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java

@@ -105,23 +105,25 @@ public class RollupIndexerStateTests extends ESTestCase {
                     return null;
                 }
             }));
-            final SearchResponse response = new SearchResponse(
-                new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0),
-                aggs,
-                null,
-                false,
-                null,
-                null,
-                1,
-                null,
-                1,
-                1,
-                0,
-                0,
-                new ShardSearchFailure[0],
-                null
+            ActionListener.respondAndRelease(
+                nextPhase,
+                new SearchResponse(
+                    new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0),
+                    aggs,
+                    null,
+                    false,
+                    null,
+                    null,
+                    1,
+                    null,
+                    1,
+                    1,
+                    0,
+                    0,
+                    new ShardSearchFailure[0],
+                    null
+                )
             );
-            nextPhase.onResponse(response);
         }
 
         @Override

+ 32 - 27
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java

@@ -178,36 +178,41 @@ public class TimeBasedCheckpointProviderTests extends ESTestCase {
         TimeValue delay,
         Tuple<Long, Long> expectedRangeQueryBounds
     ) throws InterruptedException {
-        doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
-        String transformId = getTestName();
-        TransformConfig transformConfig = newTransformConfigWithDateHistogram(
-            transformId,
-            transformVersion,
-            dateHistogramField,
-            dateHistogramInterval,
-            delay
-        );
-        TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
+        final SearchResponse searchResponse = newSearchResponse(totalHits);
+        try {
+            doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
+            String transformId = getTestName();
+            TransformConfig transformConfig = newTransformConfigWithDateHistogram(
+                transformId,
+                transformVersion,
+                dateHistogramField,
+                dateHistogramInterval,
+                delay
+            );
+            TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
 
-        SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
-        SetOnce<Exception> exceptionHolder = new SetOnce<>();
-        CountDownLatch latch = new CountDownLatch(1);
-        provider.sourceHasChanged(
-            lastCheckpoint,
-            new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
-        );
-        assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
+            SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
+            SetOnce<Exception> exceptionHolder = new SetOnce<>();
+            CountDownLatch latch = new CountDownLatch(1);
+            provider.sourceHasChanged(
+                lastCheckpoint,
+                new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
+            );
+            assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
 
-        ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
-        verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
-        SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
-        BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
-        RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
-        assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
-        assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));
+            ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
+            verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
+            SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
+            BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
+            RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
+            assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
+            assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));
 
-        assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
-        assertThat(exceptionHolder.get(), is(nullValue()));
+            assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
+            assertThat(exceptionHolder.get(), is(nullValue()));
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     public void testCreateNextCheckpoint_NoDelay() throws InterruptedException {

+ 21 - 19
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -541,26 +541,28 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     && "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) {
                     listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42)));
                 } else {
-                    SearchResponse response = new SearchResponse(
-                        new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
-                        // Simulate completely null aggs
-                        null,
-                        new Suggest(Collections.emptyList()),
-                        false,
-                        false,
-                        new SearchProfileResults(Collections.emptyMap()),
-                        1,
-                        null,
-                        1,
-                        1,
-                        0,
-                        0,
-                        ShardSearchFailure.EMPTY_ARRAY,
-                        SearchResponse.Clusters.EMPTY,
-                        // copy the pit from the request
-                        searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
+                    ActionListener.respondAndRelease(
+                        listener,
+                        (Response) new SearchResponse(
+                            new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
+                            // Simulate completely null aggs
+                            null,
+                            new Suggest(Collections.emptyList()),
+                            false,
+                            false,
+                            new SearchProfileResults(Collections.emptyMap()),
+                            1,
+                            null,
+                            1,
+                            1,
+                            0,
+                            0,
+                            ShardSearchFailure.EMPTY_ARRAY,
+                            SearchResponse.Clusters.EMPTY,
+                            // copy the pit from the request
+                            searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
+                        )
                     );
-                    listener.onResponse((Response) response);
 
                 }
                 return;

+ 191 - 177
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

@@ -221,7 +221,8 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
 
         @Override
         void doGetInitialProgress(SearchRequest request, ActionListener<SearchResponse> responseListener) {
-            responseListener.onResponse(
+            ActionListener.respondAndRelease(
+                responseListener,
                 new SearchResponse(
                     new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
                     // Simulate completely null aggs
@@ -388,29 +389,33 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
-        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
-        Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
-
-        TransformAuditor auditor = mock(TransformAuditor.class);
-        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
-
-        MockedTransformIndexer indexer = createMockIndexer(
-            config,
-            state,
-            searchFunction,
-            bulkFunction,
-            null,
-            threadPool,
-            ThreadPool.Names.GENERIC,
-            auditor,
-            context
-        );
+        try {
+            AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+            Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
+            Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
+
+            TransformAuditor auditor = mock(TransformAuditor.class);
+            TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
+
+            MockedTransformIndexer indexer = createMockIndexer(
+                config,
+                state,
+                searchFunction,
+                bulkFunction,
+                null,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                auditor,
+                context
+            );
 
-        IterationResult<TransformIndexerPosition> newPosition = indexer.doProcess(searchResponse);
-        assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty()));
-        assertThat(newPosition.getPosition(), is(nullValue()));
-        assertThat(newPosition.isDone(), is(true));
+            IterationResult<TransformIndexerPosition> newPosition = indexer.doProcess(searchResponse);
+            assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty()));
+            assertThat(newPosition.getPosition(), is(nullValue()));
+            assertThat(newPosition.isDone(), is(true));
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     public void testScriptError() throws Exception {
@@ -524,58 +529,61 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
-
-        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
-
-        Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
-
-        Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction = deleteByQueryRequest -> {
-            throw new SearchPhaseExecutionException(
-                "query",
-                "Partial shards failure",
-                new ShardSearchFailure[] {
-                    new ShardSearchFailure(
-                        new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format"))
-                    ) }
+        try {
+            AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+            Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
+
+            Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
+
+            Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction = deleteByQueryRequest -> {
+                throw new SearchPhaseExecutionException(
+                    "query",
+                    "Partial shards failure",
+                    new ShardSearchFailure[] {
+                        new ShardSearchFailure(
+                            new ElasticsearchParseException("failed to parse date field", new IllegalArgumentException("illegal format"))
+                        ) }
+                );
+            };
+
+            final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
+            final AtomicReference<String> failureMessage = new AtomicReference<>();
+
+            MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+            TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
+            TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+
+            MockedTransformIndexer indexer = createMockIndexer(
+                config,
+                state,
+                searchFunction,
+                bulkFunction,
+                deleteByQueryFunction,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                auditor,
+                context
             );
-        };
 
-        final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
-        final AtomicReference<String> failureMessage = new AtomicReference<>();
+            final CountDownLatch latch = indexer.newLatch(1);
 
-        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
-        TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
-        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
 
-        MockedTransformIndexer indexer = createMockIndexer(
-            config,
-            state,
-            searchFunction,
-            bulkFunction,
-            deleteByQueryFunction,
-            threadPool,
-            ThreadPool.Names.GENERIC,
-            auditor,
-            context
-        );
-
-        final CountDownLatch latch = indexer.newLatch(1);
-
-        indexer.start();
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
-        assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
-
-        latch.countDown();
-        assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
-        assertTrue(failIndexerCalled.get());
-        assertThat(
-            failureMessage.get(),
-            matchesRegex(
-                "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*"
-            )
-        );
+            latch.countDown();
+            assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
+            assertTrue(failIndexerCalled.get());
+            assertThat(
+                failureMessage.get(),
+                matchesRegex(
+                    "task encountered irrecoverable failure: org.elasticsearch.ElasticsearchParseException: failed to parse date field;.*"
+                )
+            );
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exception {
@@ -614,61 +622,64 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
-
-        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
-
-        Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
-
-        Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction = deleteByQueryRequest -> {
-            throw new SearchPhaseExecutionException(
-                "query",
-                "Partial shards failure",
-                new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) }
+        try {
+            AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+            Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> searchResponse;
+
+            Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
+
+            Function<DeleteByQueryRequest, BulkByScrollResponse> deleteByQueryFunction = deleteByQueryRequest -> {
+                throw new SearchPhaseExecutionException(
+                    "query",
+                    "Partial shards failure",
+                    new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) }
+                );
+            };
+
+            final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
+            final AtomicReference<String> failureMessage = new AtomicReference<>();
+
+            MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+            auditor.addExpectation(
+                new MockTransformAuditor.SeenAuditExpectation(
+                    "timed out during dbq",
+                    Level.WARNING,
+                    transformId,
+                    "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];"
+                        + " Will automatically retry [1/10]"
+                )
+            );
+            TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
+            TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+
+            MockedTransformIndexer indexer = createMockIndexer(
+                config,
+                state,
+                searchFunction,
+                bulkFunction,
+                deleteByQueryFunction,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                auditor,
+                context
             );
-        };
-
-        final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
-        final AtomicReference<String> failureMessage = new AtomicReference<>();
-
-        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
-        auditor.addExpectation(
-            new MockTransformAuditor.SeenAuditExpectation(
-                "timed out during dbq",
-                Level.WARNING,
-                transformId,
-                "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];"
-                    + " Will automatically retry [1/10]"
-            )
-        );
-        TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
-        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
-
-        MockedTransformIndexer indexer = createMockIndexer(
-            config,
-            state,
-            searchFunction,
-            bulkFunction,
-            deleteByQueryFunction,
-            threadPool,
-            ThreadPool.Names.GENERIC,
-            auditor,
-            context
-        );
 
-        final CountDownLatch latch = indexer.newLatch(1);
+            final CountDownLatch latch = indexer.newLatch(1);
 
-        indexer.start();
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
-        assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
 
-        latch.countDown();
-        assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
-        assertFalse(failIndexerCalled.get());
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        auditor.assertAllExpectationsMatched();
-        assertEquals(1, context.getFailureCount());
+            latch.countDown();
+            assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
+            assertFalse(failIndexerCalled.get());
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            auditor.assertAllExpectationsMatched();
+            assertEquals(1, context.getFailureCount());
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     public void testFailureCounterIsResetOnSuccess() throws Exception {
@@ -707,72 +718,75 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             ShardSearchFailure.EMPTY_ARRAY,
             SearchResponse.Clusters.EMPTY
         );
-
-        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        Function<SearchRequest, SearchResponse> searchFunction = new Function<>() {
-            final AtomicInteger calls = new AtomicInteger(0);
-
-            @Override
-            public SearchResponse apply(SearchRequest searchRequest) {
-                int call = calls.getAndIncrement();
-                if (call == 0) {
-                    throw new SearchPhaseExecutionException(
-                        "query",
-                        "Partial shards failure",
-                        new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
-                    );
+        try {
+            AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+            Function<SearchRequest, SearchResponse> searchFunction = new Function<>() {
+                final AtomicInteger calls = new AtomicInteger(0);
+
+                @Override
+                public SearchResponse apply(SearchRequest searchRequest) {
+                    int call = calls.getAndIncrement();
+                    if (call == 0) {
+                        throw new SearchPhaseExecutionException(
+                            "query",
+                            "Partial shards failure",
+                            new ShardSearchFailure[] { new ShardSearchFailure(new Exception()) }
+                        );
+                    }
+                    return searchResponse;
                 }
-                return searchResponse;
-            }
-        };
+            };
 
-        Function<BulkRequest, BulkResponse> bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1);
+            Function<BulkRequest, BulkResponse> bulkFunction = request -> new BulkResponse(new BulkItemResponse[0], 1);
 
-        final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
-        final AtomicReference<String> failureMessage = new AtomicReference<>();
+            final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
+            final AtomicReference<String> failureMessage = new AtomicReference<>();
 
-        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
-        TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
-        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+            MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+            TransformContext.Listener contextListener = createContextListener(failIndexerCalled, failureMessage);
+            TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
 
-        MockedTransformIndexer indexer = createMockIndexer(
-            config,
-            state,
-            searchFunction,
-            bulkFunction,
-            null,
-            threadPool,
-            ThreadPool.Names.GENERIC,
-            auditor,
-            context
-        );
+            MockedTransformIndexer indexer = createMockIndexer(
+                config,
+                state,
+                searchFunction,
+                bulkFunction,
+                null,
+                threadPool,
+                ThreadPool.Names.GENERIC,
+                auditor,
+                context
+            );
 
-        final CountDownLatch latch = indexer.newLatch(1);
+            final CountDownLatch latch = indexer.newLatch(1);
 
-        indexer.start();
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
-        assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
 
-        latch.countDown();
-        assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
-        assertFalse(failIndexerCalled.get());
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        assertEquals(1, context.getFailureCount());
+            latch.countDown();
+            assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
+            assertFalse(failIndexerCalled.get());
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertEquals(1, context.getFailureCount());
 
-        final CountDownLatch secondLatch = indexer.newLatch(1);
+            final CountDownLatch secondLatch = indexer.newLatch(1);
 
-        indexer.start();
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())));
-        assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertBusy(() -> assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())));
+            assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
 
-        secondLatch.countDown();
-        assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
-        assertFalse(failIndexerCalled.get());
-        assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
-        auditor.assertAllExpectationsMatched();
-        assertEquals(0, context.getFailureCount());
+            secondLatch.countDown();
+            assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS);
+            assertFalse(failIndexerCalled.get());
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            auditor.assertAllExpectationsMatched();
+            assertEquals(0, context.getFailureCount());
+        } finally {
+            searchResponse.decRef();
+        }
     }
 
     // tests throttling of audits on logs based on repeated exception types