Browse Source

Enhance empty queue conditional in slicing logic (#114911) (#114940)

With recent changes in Lucene 9.12 around not forking execution when not necessary
(see https://github.com/apache/lucene/pull/13472), we have removed the search
worker thread pool in #111099. The worker thread pool had unlimited queue, and we
feared that we couuld have much more queueing on the search thread pool if we execute
segment level searches on the same thread pool as the shard level searches, because
every shard search would take up to a thread per slice when executing the query phase.

We have then introduced an additional conditional to stop parallelizing when there
is a queue. That is perhaps a bit extreme, as it's a decision made when creating the
searcher, while a queue may no longer be there once the search is executing.
This has caused some benchmarks regressions, given that having a queue may be a transient
scenario, especially with short-lived segment searches being queued up. We may end
up disabling inter-segment concurrency more aggressively than we would want, penalizing
requests that do benefit from concurrency. At the same time, we do want to have some kind
of protection against rejections of shard searches that would be caused by excessive slicing.
When the queue is above a certain size, we can turn off the slicing and effectively disable
inter-segment concurrency. With this commit we set that threshold to be the number of
threads in the search pool.
Luca Cavanna 1 year ago
parent
commit
4be9122150

+ 1 - 1
server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

@@ -291,7 +291,7 @@ final class DefaultSearchContext extends SearchContext {
         ToLongFunction<String> fieldCardinality
     ) {
         return executor instanceof ThreadPoolExecutor tpe
-            && tpe.getQueue().isEmpty()
+            && tpe.getQueue().size() <= tpe.getMaximumPoolSize()
             && isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection)
                 ? tpe.getMaximumPoolSize()
                 : 1;

+ 147 - 62
server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

@@ -81,6 +81,7 @@ import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
@@ -507,10 +508,10 @@ public class DefaultSearchContextTests extends MapperServiceTestCase {
         }
     }
 
-    public void testDetermineMaximumNumberOfSlices() {
+    private static ShardSearchRequest createParallelRequest() {
         IndexShard indexShard = mock(IndexShard.class);
         when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
-        ShardSearchRequest parallelReq = new ShardSearchRequest(
+        return new ShardSearchRequest(
             OriginalIndices.NONE,
             new SearchRequest().allowPartialSearchResults(randomBoolean()),
             indexShard.shardId(),
@@ -521,69 +522,74 @@ public class DefaultSearchContextTests extends MapperServiceTestCase {
             System.currentTimeMillis(),
             null
         );
-        ShardSearchRequest singleSliceReq = new ShardSearchRequest(
-            OriginalIndices.NONE,
-            new SearchRequest().allowPartialSearchResults(randomBoolean())
-                .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
-            indexShard.shardId(),
-            0,
-            1,
-            AliasFilter.EMPTY,
-            1f,
-            System.currentTimeMillis(),
-            null
-        );
-        int executorPoolSize = randomIntBetween(1, 100);
-        ExecutorService threadPoolExecutor = EsExecutors.newFixed(
-            "test",
-            executorPoolSize,
-            0,
-            Thread::new,
-            new ThreadContext(Settings.EMPTY),
-            EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
-        );
-        ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
-        ToLongFunction<String> fieldCardinality = name -> -1;
 
+    }
+
+    public void testDetermineMaximumNumberOfSlicesNoExecutor() {
+        ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
         assertEquals(
-            executorPoolSize,
+            1,
             DefaultSearchContext.determineMaximumNumberOfSlices(
-                threadPoolExecutor,
-                parallelReq,
+                null,
+                createParallelRequest(),
                 SearchService.ResultsType.DFS,
-                true,
+                randomBoolean(),
                 fieldCardinality
             )
         );
         assertEquals(
-            executorPoolSize,
+            1,
             DefaultSearchContext.determineMaximumNumberOfSlices(
-                threadPoolExecutor,
-                singleSliceReq,
-                SearchService.ResultsType.DFS,
-                true,
+                null,
+                createParallelRequest(),
+                SearchService.ResultsType.QUERY,
+                randomBoolean(),
                 fieldCardinality
             )
         );
+    }
+
+    public void testDetermineMaximumNumberOfSlicesNotThreadPoolExecutor() {
+        ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
+        ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
         assertEquals(
             1,
-            DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, true, fieldCardinality)
+            DefaultSearchContext.determineMaximumNumberOfSlices(
+                notThreadPoolExecutor,
+                createParallelRequest(),
+                SearchService.ResultsType.DFS,
+                randomBoolean(),
+                fieldCardinality
+            )
         );
         assertEquals(
-            executorPoolSize,
+            1,
             DefaultSearchContext.determineMaximumNumberOfSlices(
-                threadPoolExecutor,
-                parallelReq,
+                notThreadPoolExecutor,
+                createParallelRequest(),
                 SearchService.ResultsType.QUERY,
-                true,
+                randomBoolean(),
                 fieldCardinality
             )
         );
+    }
+
+    public void testDetermineMaximumNumberOfSlicesEnableQueryPhaseParallelCollection() {
+        int executorPoolSize = randomIntBetween(1, 100);
+        ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
+            "test",
+            executorPoolSize,
+            0,
+            Thread::new,
+            new ThreadContext(Settings.EMPTY),
+            EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
+        );
+        ToLongFunction<String> fieldCardinality = name -> -1;
         assertEquals(
-            1,
+            executorPoolSize,
             DefaultSearchContext.determineMaximumNumberOfSlices(
                 threadPoolExecutor,
-                singleSliceReq,
+                createParallelRequest(),
                 SearchService.ResultsType.QUERY,
                 true,
                 fieldCardinality
@@ -592,54 +598,133 @@ public class DefaultSearchContextTests extends MapperServiceTestCase {
         assertEquals(
             1,
             DefaultSearchContext.determineMaximumNumberOfSlices(
-                notThreadPoolExecutor,
-                parallelReq,
-                SearchService.ResultsType.DFS,
-                true,
+                threadPoolExecutor,
+                createParallelRequest(),
+                SearchService.ResultsType.QUERY,
+                false,
                 fieldCardinality
             )
         );
-
         assertEquals(
             executorPoolSize,
             DefaultSearchContext.determineMaximumNumberOfSlices(
                 threadPoolExecutor,
-                parallelReq,
+                createParallelRequest(),
                 SearchService.ResultsType.DFS,
-                false,
+                randomBoolean(),
                 fieldCardinality
             )
         );
-        assertEquals(
+    }
+
+    public void testDetermineMaximumNumberOfSlicesSingleSortByField() {
+        IndexShard indexShard = mock(IndexShard.class);
+        when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
+        ShardSearchRequest singleSliceReq = new ShardSearchRequest(
+            OriginalIndices.NONE,
+            new SearchRequest().allowPartialSearchResults(randomBoolean())
+                .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
+            indexShard.shardId(),
+            0,
             1,
-            DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, false, fieldCardinality)
+            AliasFilter.EMPTY,
+            1f,
+            System.currentTimeMillis(),
+            null
         );
+        ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
+        int executorPoolSize = randomIntBetween(1, 100);
+        ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
+            "test",
+            executorPoolSize,
+            0,
+            Thread::new,
+            new ThreadContext(Settings.EMPTY),
+            EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
+        );
+        // DFS concurrency does not rely on slices, hence it kicks in regardless of the request (supportsParallelCollection is not called)
         assertEquals(
-            1,
+            executorPoolSize,
             DefaultSearchContext.determineMaximumNumberOfSlices(
                 threadPoolExecutor,
-                parallelReq,
-                SearchService.ResultsType.QUERY,
-                false,
+                singleSliceReq,
+                SearchService.ResultsType.DFS,
+                true,
                 fieldCardinality
             )
         );
-        assertEquals(
-            1,
-            DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.QUERY, false, fieldCardinality)
-        );
         assertEquals(
             1,
             DefaultSearchContext.determineMaximumNumberOfSlices(
-                notThreadPoolExecutor,
-                parallelReq,
-                SearchService.ResultsType.DFS,
-                false,
+                threadPoolExecutor,
+                singleSliceReq,
+                SearchService.ResultsType.QUERY,
+                true,
                 fieldCardinality
             )
         );
     }
 
+    public void testDetermineMaximumNumberOfSlicesWithQueue() {
+        int executorPoolSize = randomIntBetween(1, 100);
+        ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
+            "test",
+            executorPoolSize,
+            1000,
+            Thread::new,
+            new ThreadContext(Settings.EMPTY),
+            EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
+        );
+        ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
+
+        for (int i = 0; i < executorPoolSize; i++) {
+            assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
+            assertEquals(
+                executorPoolSize,
+                DefaultSearchContext.determineMaximumNumberOfSlices(
+                    threadPoolExecutor,
+                    createParallelRequest(),
+                    SearchService.ResultsType.DFS,
+                    true,
+                    fieldCardinality
+                )
+            );
+            assertEquals(
+                executorPoolSize,
+                DefaultSearchContext.determineMaximumNumberOfSlices(
+                    threadPoolExecutor,
+                    createParallelRequest(),
+                    SearchService.ResultsType.QUERY,
+                    true,
+                    fieldCardinality
+                )
+            );
+        }
+        for (int i = 0; i < 100; i++) {
+            assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
+            assertEquals(
+                1,
+                DefaultSearchContext.determineMaximumNumberOfSlices(
+                    threadPoolExecutor,
+                    createParallelRequest(),
+                    SearchService.ResultsType.DFS,
+                    true,
+                    fieldCardinality
+                )
+            );
+            assertEquals(
+                1,
+                DefaultSearchContext.determineMaximumNumberOfSlices(
+                    threadPoolExecutor,
+                    createParallelRequest(),
+                    SearchService.ResultsType.QUERY,
+                    true,
+                    fieldCardinality
+                )
+            );
+        }
+    }
+
     public void testIsParallelCollectionSupportedForResults() {
         SearchSourceBuilder searchSourceBuilderOrNull = randomBoolean() ? null : new SearchSourceBuilder();
         ToLongFunction<String> fieldCardinality = name -> -1;