Browse Source

Merge upstream

ChrisHegarty 2 years ago
parent
commit
0a5e0d3c3a

+ 5 - 0
docs/changelog/98455.yaml

@@ -0,0 +1,5 @@
+pr: 98455
+summary: Add setting for search parallelism
+area: Search
+type: enhancement
+issues: []

+ 25 - 16
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -39,6 +39,7 @@ import java.util.TreeMap;
  * to be merged must be updated with the next free version first. Without the unique id string, git will happily merge the two versions
  * together, resulting in the same transport version being used across multiple commits, causing problems when you try to upgrade between
  * those two merged commits.
+ *
  * <h2>Version compatibility</h2>
  * The earliest compatible version is hardcoded in the {@link #MINIMUM_COMPATIBLE} field. Previously, this was dynamically calculated from
  * the major/minor versions of {@link Version}, but {@code TransportVersion} does not have separate major/minor version numbers. So the
@@ -47,15 +48,7 @@ import java.util.TreeMap;
  * <p>
  * The earliest CCS compatible version is hardcoded at {@link #MINIMUM_CCS_VERSION}, as the transport version used by the previous minor
  * release. This should be updated appropriately whenever a minor release happens.
- * <h2>Adding a new version</h2>
- * A new transport version should be added <em>every time</em> a change is made to the serialization protocol of one or more classes. Each
- * transport version should only be used in a single merged commit (apart from BwC versions copied from {@link Version}).
- * <p>
- * To add a new transport version, add a new constant at the bottom of the list that is one greater than the current highest version, ensure
- * it has a unique id, and update the {@link CurrentHolder#CURRENT} constant to point to the new version.
- * <h2>Reverting a transport version</h2>
- * If you revert a commit with a transport version change, you <em>must</em> ensure there is a <em>new</em> transport version representing
- * the reverted change. <em>Do not</em> let the transport version go backwards, it must <em>always</em> be incremented.
+ *
  * <h2>Scope of usefulness of {@link TransportVersion}</h2>
  * {@link TransportVersion} is a property of the transport connection between a pair of nodes, and should not be used as an indication of
  * the version of any single node. The {@link TransportVersion} of a connection is negotiated between the nodes via some logic that is not
@@ -71,6 +64,7 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
      * This map is used during class construction, referenced by the registerTransportVersion method.
      * When all the transport version constants have been registered, the map is cleared & never touched again.
      */
+    @SuppressWarnings("UnusedAssignment")
     private static Map<String, Integer> IDS = new HashMap<>();
 
     private static TransportVersion registerTransportVersion(int id, String uniqueId) {
@@ -125,7 +119,7 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_8_1 = registerTransportVersion(8_08_01_99, "291c71bb-5b0a-4b7e-a407-6e53bc128d0f");
 
     /*
-     * READ THE JAVADOC ABOVE BEFORE ADDING NEW TRANSPORT VERSIONS
+     * READ THE COMMENT BELOW THiS BLOCK OF DECLARATIONS BEFORE ADDING NEW TRANSPORT VERSIONS
      * Detached transport versions added below here.
      */
     public static final TransportVersion V_8_500_010 = registerTransportVersion(8_500_010, "9818C628-1EEC-439B-B943-468F61460675");
@@ -135,14 +129,11 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_014 = registerTransportVersion(8_500_014, "D115A2E1-1739-4A02-AB7B-64F6EA157EFB");
     public static final TransportVersion V_8_500_015 = registerTransportVersion(8_500_015, "651216c9-d54f-4189-9fe1-48d82d276863");
     public static final TransportVersion V_8_500_016 = registerTransportVersion(8_500_016, "492C94FB-AAEA-4C9E-8375-BDB67A398584");
-
     public static final TransportVersion V_8_500_017 = registerTransportVersion(8_500_017, "0EDCB5BA-049C-443C-8AB1-5FA58FB996FB");
     public static final TransportVersion V_8_500_018 = registerTransportVersion(8_500_018, "827C32CE-33D9-4AC3-A773-8FB768F59EAF");
     public static final TransportVersion V_8_500_019 = registerTransportVersion(8_500_019, "09bae57f-cab8-423c-aab3-c9778509ffe3");
-    // 8.9.0
     public static final TransportVersion V_8_500_020 = registerTransportVersion(8_500_020, "ECB42C26-B258-42E5-A835-E31AF84A76DE");
     public static final TransportVersion V_8_500_021 = registerTransportVersion(8_500_021, "102e0d84-0c08-402c-a696-935f3a3da873");
-    // Introduced for stateless plugin
     public static final TransportVersion V_8_500_022 = registerTransportVersion(8_500_022, "4993c724-7a81-4955-84e7-403484610091");
     public static final TransportVersion V_8_500_023 = registerTransportVersion(8_500_023, "01b06435-5d73-42ff-a121-3b36b771375e");
     public static final TransportVersion V_8_500_024 = registerTransportVersion(8_500_024, "db337007-f823-4dbd-968e-375383814c17");
@@ -156,9 +147,7 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_032 = registerTransportVersion(8_500_032, "a9a14bc6-c3f2-41d9-a3d8-c686bf2c901d");
     public static final TransportVersion V_8_500_033 = registerTransportVersion(8_500_033, "193ab7c4-a751-4cbd-a66a-2d7d56ccbc10");
     public static final TransportVersion V_8_500_034 = registerTransportVersion(8_500_034, "16871c8b-88ba-4432-980a-10fd9ecad2dc");
-
     public static final TransportVersion V_8_500_035 = registerTransportVersion(8_500_035, "664dd6ce-3487-4fbd-81a9-af778b28be45");
-    // Introduced for stateless plugin
     public static final TransportVersion V_8_500_036 = registerTransportVersion(8_500_036, "3343c64f-d7ac-4f02-9262-3e1acfc56f89");
     public static final TransportVersion V_8_500_037 = registerTransportVersion(8_500_037, "d76a4f22-8878-43e0-acfa-15e452195fa7");
     public static final TransportVersion V_8_500_038 = registerTransportVersion(8_500_038, "9ef93580-feae-409f-9989-b49e411ca7a9");
@@ -166,7 +155,7 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_040 = registerTransportVersion(8_500_040, "8F3AA068-A608-4A16-9683-2412A75BF2DD");
     public static final TransportVersion V_8_500_041 = registerTransportVersion(8_500_041, "5b6a0fd0-ac0b-443f-baae-cffec140905c");
     public static final TransportVersion V_8_500_042 = registerTransportVersion(8_500_042, "763b4801-a4fc-47c4-aff5-7f5a757b8a07");
-    public static final TransportVersion V_8_500_043 = registerTransportVersion(8_500_043, "50baabd14-7f5c-4f8c-9351-94e0d397aabc");
+    public static final TransportVersion V_8_500_043 = registerTransportVersion(8_500_043, "50babd14-7f5c-4f8c-9351-94e0d397aabc");
     public static final TransportVersion V_8_500_044 = registerTransportVersion(8_500_044, "96b83320-2317-4e9d-b735-356f18c1d76a");
     public static final TransportVersion V_8_500_045 = registerTransportVersion(8_500_045, "24a596dd-c843-4c0a-90b3-759697d74026");
     public static final TransportVersion V_8_500_046 = registerTransportVersion(8_500_046, "61666d4c-a4f0-40db-8a3d-4806718247c5");
@@ -186,6 +175,26 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
     public static final TransportVersion V_8_500_060 = registerTransportVersion(8_500_060, "ec065a44-b468-4f8a-aded-7b90ca8d792b");
     public static final TransportVersion V_8_500_061 = registerTransportVersion(8_500_061, "4e07f830-8be4-448c-851e-62b3d2f0bf0a");
     public static final TransportVersion V_8_500_062 = registerTransportVersion(8_500_062, "09CD9C9B-3207-4B40-8756-B7A12001A885");
+    /*
+     * STOP! READ THIS FIRST! No, really,
+     *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _
+     *       / ___|_   _/ _ \|  _ \| |      |  _ \| ____|  / \  |  _ \  |_   _| | | |_ _/ ___|  |  ___|_ _|  _ \/ ___|_   _| |
+     *       \___ \ | || | | | |_) | |      | |_) |  _|   / _ \ | | | |   | | | |_| || |\___ \  | |_   | || |_) \___ \ | | | |
+     *        ___) || || |_| |  __/|_|      |  _ <| |___ / ___ \| |_| |   | | |  _  || | ___) | |  _|  | ||  _ < ___) || | |_|
+     *       |____/ |_| \___/|_|   (_)      |_| \_\_____/_/   \_\____/    |_| |_| |_|___|____/  |_|   |___|_| \_\____/ |_| (_)
+     *
+     * A new transport version should be added EVERY TIME a change is made to the serialization protocol of one or more classes. Each
+     * transport version should only be used in a single merged commit (apart from the BwC versions copied from o.e.Version, ≤V_8_8_1).
+     *
+     * To add a new transport version, add a new constant at the bottom of the list, above this comment, which is one greater than the
+     * current highest version, ensure it has a fresh UUID, and update CurrentHolder#CURRENT to point to the new version. Don't add other
+     * lines, comments, etc.
+     *
+     * REVERTING A TRANSPORT VERSION
+     *
+     * If you revert a commit with a transport version change, you MUST ensure there is a NEW transport version representing the reverted
+     * change. DO NOT let the transport version go backwards, it must ALWAYS be incremented.
+     */
 
     private static class CurrentHolder {
         private static final TransportVersion CURRENT = findCurrent(V_8_500_062);

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -502,6 +502,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
         SearchModule.INDICES_MAX_NESTED_DEPTH_SETTING,
         SearchService.SEARCH_WORKER_THREADS_ENABLED,
+        SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED,
         ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
         ThreadPool.LATE_TIME_INTERVAL_WARN_THRESHOLD_SETTING,
         ThreadPool.SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING,

+ 28 - 18
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -2174,21 +2174,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 );
             }
         } else {
-            if (origin == Engine.Operation.Origin.PRIMARY) {
-                assert assertPrimaryMode();
-                // We only do indexing into primaries that are started since:
-                // * TransportReplicationAction.ReroutePhase only allows to index into active primaries.
-                // * A relocation will retry the reroute phase.
-                // * Allocation ids protect against spurious requests towards old allocations.
-                // * We apply the cluster state on IndexShard instances before making it available for routing
-                assert assertStartedForPrimaryIndexing();
-            } else if (origin == Engine.Operation.Origin.REPLICA) {
-                assert assertReplicationTarget();
-            } else {
-                assert origin == Engine.Operation.Origin.LOCAL_RESET;
-                assert getActiveOperationsCount() == OPERATIONS_BLOCKED
-                    : "locally resetting without blocking operations, active operations [" + getActiveOperationsCount() + "]";
-            }
+            assert assertWriteOriginInvariants(origin);
             if (writeAllowedStates.contains(state) == false) {
                 throw new IllegalIndexShardStateException(
                     shardId,
@@ -2199,12 +2185,36 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
     }
 
-    private boolean assertStartedForPrimaryIndexing() {
-        final var state = this.state;
-        assert state == IndexShardState.STARTED : "primary indexing unexpected in state [" + state + "]";
+    private boolean assertWriteOriginInvariants(Engine.Operation.Origin origin) {
+        switch (origin) {
+            case PRIMARY -> {
+                assertPrimaryMode();
+                assertExpectedStateForPrimaryIndexing(state);
+            }
+            case REPLICA -> {
+                assert assertReplicationTarget();
+            }
+            case LOCAL_RESET -> {
+                final var activeOperationsCount = getActiveOperationsCount();
+                assert activeOperationsCount == OPERATIONS_BLOCKED
+                    : "locally resetting without blocking operations, active operations [" + activeOperationsCount + "]";
+            }
+            default -> {
+                assert false : "unexpected origin: " + origin;
+            }
+        }
         return true;
     }
 
+    private void assertExpectedStateForPrimaryIndexing(IndexShardState state) {
+        // We do not do indexing into primaries that have not reached state STARTED since:
+        // * TransportReplicationAction.ReroutePhase only allows to index into active primaries.
+        // * A relocation will retry the reroute phase.
+        // * Allocation ids protect against spurious requests towards old allocations.
+        // * We apply the cluster state on IndexShard instances before making it available for routing
+        assert state == IndexShardState.STARTED || state == IndexShardState.CLOSED : "primary indexing unexpected in state [" + state + "]";
+    }
+
     private boolean assertPrimaryMode() {
         assert shardRouting.primary() && replicationTracker.isPrimaryMode()
             : "shard " + shardRouting + " is not a primary shard in primary mode";

+ 31 - 8
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -219,6 +219,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         Property.Dynamic
     );
 
+    public static final Setting<Boolean> QUERY_PHASE_PARALLEL_COLLECTION_ENABLED = Setting.boolSetting(
+        "search.query_phase_parallel_collection_enabled",
+        false,
+        Property.NodeScope,
+        Property.Dynamic
+    );
+
     public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT = Setting.intSetting(
         "search.max_open_scroll_context",
         500,
@@ -262,6 +269,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
     private final FetchPhase fetchPhase;
     private volatile boolean enableSearchWorkerThreads;
+    private volatile boolean enableQueryPhaseParallelCollection;
 
     private volatile long defaultKeepAlive;
 
@@ -354,12 +362,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
 
         enableSearchWorkerThreads = SEARCH_WORKER_THREADS_ENABLED.get(settings);
         clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_WORKER_THREADS_ENABLED, this::setEnableSearchWorkerThreads);
+
+        enableQueryPhaseParallelCollection = QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.get(settings);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED, this::setEnableQueryPhaseParallelCollection);
     }
 
     private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) {
         this.enableSearchWorkerThreads = enableSearchWorkerThreads;
     }
 
+    private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParallelCollection) {
+        this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection;
+    }
+
     private static void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
         if (defaultKeepAlive.millis() > maxKeepAlive.millis()) {
             throw new IllegalArgumentException(
@@ -1065,8 +1081,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
                 request.getClusterAlias()
             );
             ExecutorService executor = this.enableSearchWorkerThreads ? threadPool.executor(Names.SEARCH_WORKER) : null;
-            int maximumNumberOfSlices = executor instanceof ThreadPoolExecutor tpe
-                && supportsParallelCollection(resultsType, request.source()) ? tpe.getMaximumPoolSize() : 1;
+            int maximumNumberOfSlices = determineMaximumNumberOfSlices(executor, request, resultsType);
             searchContext = new DefaultSearchContext(
                 reader,
                 request,
@@ -1097,16 +1112,24 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         return searchContext;
     }
 
-    static boolean supportsParallelCollection(ResultsType resultsType, SearchSourceBuilder source) {
+    int determineMaximumNumberOfSlices(ExecutorService executor, ShardSearchRequest request, ResultsType resultsType) {
+        return executor instanceof ThreadPoolExecutor tpe
+            && isParallelCollectionSupportedForResults(resultsType, request.source(), this.enableQueryPhaseParallelCollection)
+                ? tpe.getMaximumPoolSize()
+                : 1;
+    }
+
+    static boolean isParallelCollectionSupportedForResults(
+        ResultsType resultsType,
+        SearchSourceBuilder source,
+        boolean isQueryPhaseParallelismEnabled
+    ) {
         if (resultsType == ResultsType.DFS) {
-            return true; // only enable concurrent collection for DFS phase for now
+            return true;
         }
-        /*
-        //TODO uncomment this block to enable inter-segment concurrency for the query phase
-        if (resultsType == ResultsType.QUERY) {
+        if (resultsType == ResultsType.QUERY && isQueryPhaseParallelismEnabled) {
             return source == null || source.supportsParallelCollection();
         }
-        */
         return false;
     }
 

+ 77 - 4
server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

@@ -47,6 +47,8 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
@@ -120,6 +122,8 @@ import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -133,6 +137,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
 import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
+import static org.elasticsearch.search.SearchService.QUERY_PHASE_PARALLEL_COLLECTION_ENABLED;
 import static org.elasticsearch.search.SearchService.SEARCH_WORKER_THREADS_ENABLED;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -1946,6 +1951,66 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         }
     }
 
+    public void testEnableQueryPhaseParallelCollection() throws IOException {
+        IndexService indexService = createIndex("index", Settings.EMPTY);
+        IndexShard indexShard = indexService.getShard(0);
+        ShardSearchRequest request = new ShardSearchRequest(
+            OriginalIndices.NONE,
+            new SearchRequest().allowPartialSearchResults(randomBoolean()),
+            indexShard.shardId(),
+            0,
+            indexService.numberOfShards(),
+            AliasFilter.EMPTY,
+            1f,
+            System.currentTimeMillis(),
+            null
+        );
+        int executorPoolSize = randomIntBetween(1, 100);
+        ExecutorService threadPoolExecutor = EsExecutors.newFixed(
+            "test",
+            executorPoolSize,
+            0,
+            Thread::new,
+            new ThreadContext(Settings.EMPTY),
+            EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
+        );
+        ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
+
+        SearchService service = getInstanceFromNode(SearchService.class);
+        {
+            assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
+            assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.DFS));
+            assertEquals(1, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
+            assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, request, ResultsType.DFS));
+        }
+        try {
+            ClusterUpdateSettingsResponse response = client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder().put(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey(), true).build())
+                .get();
+            assertTrue(response.isAcknowledged());
+            {
+                assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
+                assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.DFS));
+                assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
+                assertEquals(1, service.determineMaximumNumberOfSlices(null, request, ResultsType.QUERY));
+                assertEquals(1, service.determineMaximumNumberOfSlices(notThreadPoolExecutor, request, ResultsType.DFS));
+            }
+        } finally {
+            // reset original default setting
+            client().admin()
+                .cluster()
+                .prepareUpdateSettings()
+                .setPersistentSettings(Settings.builder().putNull(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey()).build())
+                .get();
+            {
+                assertEquals(executorPoolSize, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.DFS));
+                assertEquals(1, service.determineMaximumNumberOfSlices(threadPoolExecutor, request, ResultsType.QUERY));
+            }
+        }
+    }
+
     /**
      * Verify that a single slice is created for requests that don't support parallel collection, while computation
      * is still offloaded to the worker threads.
@@ -2015,16 +2080,24 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
         }
     }
 
-    public void testSupportsParallelCollection() {
+    public void testIsParallelCollectionSupportedForResults() {
         SearchSourceBuilder searchSourceBuilder = randomBoolean() ? null : new SearchSourceBuilder();
         if (searchSourceBuilder != null && randomBoolean()) {
             searchSourceBuilder.aggregation(new TermsAggregationBuilder("terms"));
         }
-        assertTrue(SearchService.supportsParallelCollection(ResultsType.DFS, searchSourceBuilder));
+        assertTrue(SearchService.isParallelCollectionSupportedForResults(ResultsType.DFS, searchSourceBuilder, true));
+        assertFalse(
+            SearchService.isParallelCollectionSupportedForResults(
+                randomFrom(randomFrom(ResultsType.QUERY, ResultsType.NONE, ResultsType.FETCH)),
+                searchSourceBuilder,
+                false
+            )
+        );
         assertFalse(
-            SearchService.supportsParallelCollection(
+            SearchService.isParallelCollectionSupportedForResults(
                 randomFrom(randomFrom(ResultsType.QUERY, ResultsType.NONE, ResultsType.FETCH)),
-                searchSourceBuilder
+                searchSourceBuilder,
+                true
             )
         );
     }