Browse Source

Cleanup logic around ML's ScrollDataExtractor and reactivate related test (#105826)

Dried up the logic that is leaking buffers and causing test failures
(and fixed a couple trivial items along the way) and reactivated tests.
Should be easier to hunt this down now that the leak tracker is much
more sensivitve and logs more context.
Armin Braun 1 year ago
parent
commit
0fb3a6ecd5

+ 0 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

@@ -541,7 +541,6 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
         });
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103108")
     public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
         internalCluster().ensureAtMostNumDataNodes(0);
         logger.info("Starting dedicated master node...");

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -75,7 +75,7 @@ class DatafeedJob {
     private volatile long lastDataCheckTimeMs;
     private volatile Tuple<String, Annotation> lastDataCheckAnnotationWithId;
     private volatile Long lastEndTimeMs;
-    private AtomicBoolean running = new AtomicBoolean(true);
+    private final AtomicBoolean running = new AtomicBoolean(true);
     private volatile boolean isIsolated;
     private volatile boolean haveEverSeenData;
     private volatile long consecutiveDelayedDataBuckets;
@@ -351,7 +351,7 @@ class DatafeedJob {
         return running.get();
     }
 
-    private void run(long start, long end, FlushJobAction.Request flushRequest) throws IOException {
+    private void run(long start, long end, FlushJobAction.Request flushRequest) {
         if (end <= start) {
             return;
         }

+ 9 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java

@@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigUtils;
 import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
+import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorQueryContext;
 import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorUtils;
 
 import java.io.ByteArrayInputStream;
@@ -121,7 +122,7 @@ abstract class AbstractAggregationDataExtractor implements DataExtractor {
         LOGGER.debug("[{}] Executing aggregated search", context.jobId);
         ActionRequestBuilder<SearchRequest, SearchResponse> searchRequest = buildSearchRequest(buildBaseSearchSource());
         assert searchRequest.request().allowPartialSearchResults() == false;
-        SearchResponse searchResponse = executeSearchRequest(searchRequest);
+        SearchResponse searchResponse = executeSearchRequest(client, context.queryContext, searchRequest);
         try {
             LOGGER.debug("[{}] Search response was obtained", context.jobId);
             timingStatsReporter.reportSearchDuration(searchResponse.getTook());
@@ -142,9 +143,13 @@ abstract class AbstractAggregationDataExtractor implements DataExtractor {
         aggregationToJsonProcessor.process(aggs);
     }
 
-    private SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
+    static SearchResponse executeSearchRequest(
+        Client client,
+        DataExtractorQueryContext context,
+        ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder
+    ) {
         SearchResponse searchResponse = ClientHelper.executeWithHeaders(
-            context.queryContext.headers,
+            context.headers,
             ClientHelper.ML_ORIGIN,
             client,
             searchRequestBuilder::get
@@ -216,7 +221,7 @@ abstract class AbstractAggregationDataExtractor implements DataExtractor {
         ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder = buildSearchRequest(
             DataExtractorUtils.getSearchSourceBuilderForSummary(context.queryContext)
         );
-        SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
+        SearchResponse searchResponse = executeSearchRequest(client, context.queryContext, searchRequestBuilder);
         try {
             LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
             timingStatsReporter.reportSearchDuration(searchResponse.getTook());

+ 9 - 28
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java

@@ -16,7 +16,6 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
 import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigUtils;
 import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
 import org.elasticsearch.xpack.core.ml.utils.Intervals;
@@ -48,9 +47,6 @@ class CompositeAggregationDataExtractor implements DataExtractor {
 
     private static final Logger LOGGER = LogManager.getLogger(CompositeAggregationDataExtractor.class);
 
-    private static final String EARLIEST_TIME = "earliest_time";
-    private static final String LATEST_TIME = "latest_time";
-
     private volatile Map<String, Object> afterKey = null;
     private final CompositeAggregationBuilder compositeAggregationBuilder;
     private final Client client;
@@ -90,7 +86,7 @@ class CompositeAggregationDataExtractor implements DataExtractor {
 
     @Override
     public void cancel() {
-        LOGGER.debug(() -> "[" + context.jobId + "] Data extractor received cancel request");
+        LOGGER.debug("[{}] Data extractor received cancel request", context.jobId);
         isCancelled = true;
     }
 
@@ -113,7 +109,7 @@ class CompositeAggregationDataExtractor implements DataExtractor {
         SearchInterval searchInterval = new SearchInterval(context.queryContext.start, context.queryContext.end);
         InternalAggregations aggs = search();
         if (aggs == null) {
-            LOGGER.trace(() -> "[" + context.jobId + "] extraction finished");
+            LOGGER.trace("[{}] extraction finished", context.jobId);
             hasNext = false;
             afterKey = null;
             return new Result(searchInterval, Optional.empty());
@@ -153,9 +149,9 @@ class CompositeAggregationDataExtractor implements DataExtractor {
         }
         searchSourceBuilder.aggregation(compositeAggregationBuilder);
         ActionRequestBuilder<SearchRequest, SearchResponse> searchRequest = requestBuilder.build(searchSourceBuilder);
-        SearchResponse searchResponse = executeSearchRequest(searchRequest);
+        SearchResponse searchResponse = AbstractAggregationDataExtractor.executeSearchRequest(client, context.queryContext, searchRequest);
         try {
-            LOGGER.trace(() -> "[" + context.jobId + "] Search composite response was obtained");
+            LOGGER.trace("[{}] Search composite response was obtained", context.jobId);
             timingStatsReporter.reportSearchDuration(searchResponse.getTook());
             InternalAggregations aggregations = searchResponse.getAggregations();
             if (aggregations == null) {
@@ -171,25 +167,6 @@ class CompositeAggregationDataExtractor implements DataExtractor {
         }
     }
 
-    private SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
-        SearchResponse searchResponse = ClientHelper.executeWithHeaders(
-            context.queryContext.headers,
-            ClientHelper.ML_ORIGIN,
-            client,
-            searchRequestBuilder::get
-        );
-        boolean success = false;
-        try {
-            DataExtractorUtils.checkForSkippedClusters(searchResponse);
-            success = true;
-        } finally {
-            if (success == false) {
-                searchResponse.decRef();
-            }
-        }
-        return searchResponse;
-    }
-
     private InputStream processAggs(InternalAggregations aggs) throws IOException {
         AggregationToJsonProcessor aggregationToJsonProcessor = new AggregationToJsonProcessor(
             context.queryContext.timeField,
@@ -262,7 +239,11 @@ class CompositeAggregationDataExtractor implements DataExtractor {
             client,
             context.queryContext
         );
-        SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
+        SearchResponse searchResponse = AbstractAggregationDataExtractor.executeSearchRequest(
+            client,
+            context.queryContext,
+            searchRequestBuilder
+        );
         try {
             LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
             timingStatsReporter.reportSearchDuration(searchResponse.getTook());

+ 8 - 25
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -133,13 +134,13 @@ class ScrollDataExtractor implements DataExtractor {
         }
     }
 
-    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
-        SearchResponse searchResponse = ClientHelper.executeWithHeaders(
-            context.queryContext.headers,
-            ClientHelper.ML_ORIGIN,
-            client,
-            searchRequestBuilder::get
+    protected SearchResponse executeSearchRequest(ActionRequestBuilder<?, SearchResponse> searchRequestBuilder) {
+        return checkForSkippedClusters(
+            ClientHelper.executeWithHeaders(context.queryContext.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get)
         );
+    }
+
+    private SearchResponse checkForSkippedClusters(SearchResponse searchResponse) {
         boolean success = false;
         try {
             DataExtractorUtils.checkForSkippedClusters(searchResponse);
@@ -262,25 +263,7 @@ class ScrollDataExtractor implements DataExtractor {
 
     @SuppressWarnings("HiddenField")
     protected SearchResponse executeSearchScrollRequest(String scrollId) {
-        SearchResponse searchResponse = ClientHelper.executeWithHeaders(
-            context.queryContext.headers,
-            ClientHelper.ML_ORIGIN,
-            client,
-            () -> new SearchScrollRequestBuilder(client).setScroll(SCROLL_TIMEOUT).setScrollId(scrollId).get()
-        );
-        boolean success = false;
-        try {
-            DataExtractorUtils.checkForSkippedClusters(searchResponse);
-            success = true;
-        } catch (ResourceNotFoundException e) {
-            clearScrollLoggingExceptions(searchResponse.getScrollId());
-            throw e;
-        } finally {
-            if (success == false) {
-                searchResponse.decRef();
-            }
-        }
-        return searchResponse;
+        return executeSearchRequest(new SearchScrollRequestBuilder(client).setScroll(SCROLL_TIMEOUT).setScrollId(scrollId));
     }
 
     private void clearScroll() {

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/inference/InferenceRunner.java

@@ -153,7 +153,7 @@ public class InferenceRunner {
             config.getHeaders(),
             ClientHelper.ML_ORIGIN,
             client,
-            () -> client.search(searchRequest).actionGet()
+            client.search(searchRequest)::actionGet
         );
         try {
             Max maxIncrementalIdAgg = searchResponse.getAggregations().get(DestinationIndex.INCREMENTAL_ID);

+ 7 - 8
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java

@@ -9,11 +9,10 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.search.ClearScrollRequest;
-import org.elasticsearch.action.search.ClearScrollResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.search.TransportClearScrollAction;
@@ -77,7 +76,7 @@ import static org.mockito.Mockito.when;
 public class ScrollDataExtractorTests extends ESTestCase {
 
     private Client client;
-    private List<SearchRequestBuilder> capturedSearchRequests;
+    private List<ActionRequestBuilder<?, SearchResponse>> capturedSearchRequests;
     private List<String> capturedContinueScrollIds;
     private ArgumentCaptor<ClearScrollRequest> capturedClearScrollRequests;
     private String jobId;
@@ -87,12 +86,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
     private List<SearchSourceBuilder.ScriptField> scriptFields;
     private int scrollSize;
     private long initScrollStartTime;
-    private ActionFuture<ClearScrollResponse> clearScrollFuture;
     private DatafeedTimingStatsReporter timingStatsReporter;
 
     private class TestDataExtractor extends ScrollDataExtractor {
 
-        private Queue<Tuple<SearchResponse, ElasticsearchException>> responses = new LinkedList<>();
+        private final Queue<Tuple<SearchResponse, ElasticsearchException>> responses = new LinkedList<>();
         private int numScrollReset;
 
         TestDataExtractor(long start, long end) {
@@ -110,7 +108,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
         }
 
         @Override
-        protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
+        protected SearchResponse executeSearchRequest(ActionRequestBuilder<?, SearchResponse> searchRequestBuilder) {
             capturedSearchRequests.add(searchRequestBuilder);
             Tuple<SearchResponse, ElasticsearchException> responseOrException = responses.remove();
             if (responseOrException.v2() != null) {
@@ -176,9 +174,10 @@ public class ScrollDataExtractorTests extends ESTestCase {
         scriptFields = Collections.emptyList();
         scrollSize = 1000;
 
-        clearScrollFuture = mock(ActionFuture.class);
         capturedClearScrollRequests = ArgumentCaptor.forClass(ClearScrollRequest.class);
-        when(client.execute(same(TransportClearScrollAction.TYPE), capturedClearScrollRequests.capture())).thenReturn(clearScrollFuture);
+        when(client.execute(same(TransportClearScrollAction.TYPE), capturedClearScrollRequests.capture())).thenReturn(
+            mock(ActionFuture.class)
+        );
         timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class));
     }