Browse Source

Fix leak in DfsQueryPhase and introduce search disconnect stress test (#116060) (#117384)

Fixing an obvious leak and finally adding a stress test for search
disconnects.
Armin Braun 10 months ago
parent
commit
b9940e02ee

+ 6 - 0
docs/changelog/116060.yaml

@@ -0,0 +1,6 @@
+pr: 116060
+summary: Fix leak in `DfsQueryPhase` and introduce search disconnect stress test
+area: Search
+type: bug
+issues:
+ - 115056

+ 103 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchWithRandomDisconnectsIT.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+package org.elasticsearch.search.basic;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.AbstractDisruptionTestCase;
+import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.test.disruption.NetworkDisruption;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.IntStream;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+
+public class SearchWithRandomDisconnectsIT extends AbstractDisruptionTestCase {
+
+    public void testSearchWithRandomDisconnects() throws InterruptedException, ExecutionException {
+        // make sure we have a couple data nodes
+        int minDataNodes = randomIntBetween(3, 7);
+        internalCluster().ensureAtLeastNumDataNodes(minDataNodes);
+        final int indexCount = randomIntBetween(minDataNodes, 10 * minDataNodes);
+        final String[] indexNames = IntStream.range(0, indexCount).mapToObj(i -> "test-" + i).toArray(String[]::new);
+        final Settings indexSettings = indexSettings(1, 0).put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
+            .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), false)
+            .build();
+        for (String indexName : indexNames) {
+            createIndex(indexName, indexSettings);
+        }
+        BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+        for (String indexName : indexNames) {
+            for (int i = 0; i < randomIntBetween(1, 10); i++) {
+                bulkRequestBuilder = bulkRequestBuilder.add(prepareIndex(indexName).setCreate(false).setSource("foo", "bar-" + i));
+            }
+        }
+        assertFalse(bulkRequestBuilder.get().hasFailures());
+        final AtomicBoolean done = new AtomicBoolean();
+        final int concurrentSearches = randomIntBetween(2, 5);
+        final List<PlainActionFuture<Void>> futures = new ArrayList<>(concurrentSearches);
+        for (int i = 0; i < concurrentSearches; i++) {
+            final PlainActionFuture<Void> finishFuture = new PlainActionFuture<>();
+            futures.add(finishFuture);
+            prepareRandomSearch().execute(new ActionListener<>() {
+                @Override
+                public void onResponse(SearchResponse searchResponse) {
+                    runMoreSearches();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    runMoreSearches();
+                }
+
+                private void runMoreSearches() {
+                    if (done.get() == false) {
+                        prepareRandomSearch().execute(this);
+                    } else {
+                        finishFuture.onResponse(null);
+                    }
+                }
+            });
+        }
+        for (int i = 0, n = randomIntBetween(50, 100); i < n; i++) {
+            NetworkDisruption networkDisruption = new NetworkDisruption(
+                isolateNode(internalCluster().getRandomNodeName()),
+                NetworkDisruption.DISCONNECT
+            );
+            setDisruptionScheme(networkDisruption);
+            networkDisruption.startDisrupting();
+            networkDisruption.stopDisrupting();
+            internalCluster().clearDisruptionScheme();
+            ensureFullyConnectedCluster();
+        }
+        done.set(true);
+        for (PlainActionFuture<Void> future : futures) {
+            future.get();
+        }
+        ensureGreen(DISRUPTION_HEALING_OVERHEAD, indexNames);
+        assertAcked(indicesAdmin().prepareDelete(indexNames));
+    }
+
+    private static SearchRequestBuilder prepareRandomSearch() {
+        return prepareSearch("*").setQuery(new MatchAllQueryBuilder())
+            .setSize(9999)
+            .setFetchSource(true)
+            .setAllowPartialSearchResults(randomBoolean());
+    }
+}

+ 1 - 1
server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

@@ -96,7 +96,7 @@ final class DfsQueryPhase extends SearchPhase {
                 connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
             } catch (Exception e) {
                 shardFailure(e, querySearchRequest, shardIndex, shardTarget, counter);
-                return;
+                continue;
             }
             searchTransportService.sendExecuteQuery(
                 connection,

+ 2 - 2
server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

@@ -48,7 +48,7 @@ import static org.hamcrest.Matchers.not;
 
 public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
 
-    static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
+    public static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
@@ -220,7 +220,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
         return partition;
     }
 
-    TwoPartitions isolateNode(String isolatedNode) {
+    protected TwoPartitions isolateNode(String isolatedNode) {
         Set<String> side1 = new HashSet<>();
         Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
         side1.add(isolatedNode);