Browse Source

Wire concurrent top docs collector managers when size is 0 (#97755)

We recently introduced a collector manager for PartialHitCountCollector.
This commit enables top docs collection to use it (when size is set to 0). This effectively makes collection support concurrency when size is  set to 0, provided we have yet to set an executor to the index searcher.
Luca Cavanna 2 years ago
parent
commit
08bb1fe902

+ 5 - 0
docs/changelog/97755.yaml

@@ -0,0 +1,5 @@
+pr: 97755
+summary: Wire concurrent top docs collector managers when size is 0
+area: Search
+type: enhancement
+issues: []

+ 33 - 11
server/src/main/java/org/elasticsearch/search/query/TopDocsCollectorManagerFactory.java

@@ -39,6 +39,7 @@ import org.apache.lucene.search.TopFieldCollector;
 import org.apache.lucene.search.TopFieldDocs;
 import org.apache.lucene.search.TopScoreDocCollector;
 import org.apache.lucene.search.TotalHitCountCollector;
+import org.apache.lucene.search.TotalHitCountCollectorManager;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.action.search.MaxScoreCollector;
 import org.elasticsearch.common.lucene.Lucene;
@@ -58,6 +59,7 @@ import org.elasticsearch.search.rescore.RescoreContext;
 import org.elasticsearch.search.sort.SortAndFormats;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -81,7 +83,7 @@ abstract class TopDocsCollectorManagerFactory {
     /**
      * Returns the collector manager used to collect top hits, created depending on the incoming request options
      */
-    CollectorManager<Collector, Void> collectorManager() {
+    CollectorManager<? extends Collector, Void> collectorManager() {
         return new SingleThreadCollectorManager(collector());
     }
 
@@ -98,7 +100,7 @@ abstract class TopDocsCollectorManagerFactory {
 
     static class EmptyTopDocsCollectorManagerFactory extends TopDocsCollectorManagerFactory {
         private final Sort sort;
-        private final Collector collector;
+        private final CollectorManager<? extends Collector, Void> collectorManager;
         private final Supplier<TotalHits> hitCountSupplier;
 
         /**
@@ -110,28 +112,48 @@ abstract class TopDocsCollectorManagerFactory {
             super(REASON_SEARCH_COUNT, null);
             this.sort = sortAndFormats == null ? null : sortAndFormats.sort;
             if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
-                this.collector = new PartialHitCountCollector(0);
+                this.collectorManager = new PartialHitCountCollector.CollectorManager(0);
                 // for bwc hit count is set to 0, it will be converted to -1 by the coordinating node
                 this.hitCountSupplier = () -> new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
             } else {
                 if (trackTotalHitsUpTo == SearchContext.TRACK_TOTAL_HITS_ACCURATE) {
-                    TotalHitCountCollector hitCountCollector = new TotalHitCountCollector();
-                    this.collector = hitCountCollector;
-                    this.hitCountSupplier = () -> new TotalHits(hitCountCollector.getTotalHits(), TotalHits.Relation.EQUAL_TO);
+                    final int[] totalHits = new int[1];
+                    TotalHitCountCollectorManager totalHitCountCollectorManager = new TotalHitCountCollectorManager();
+                    // wrapping total hit count collector manager is needed to bridge the type mismatch between <Void> and <Integer>.
+                    this.collectorManager = new CollectorManager<>() {
+                        @Override
+                        public Collector newCollector() throws IOException {
+                            return totalHitCountCollectorManager.newCollector();
+                        }
+
+                        @Override
+                        public Void reduce(Collection<Collector> collectors) throws IOException {
+                            totalHits[0] = totalHitCountCollectorManager.reduce(
+                                collectors.stream().map(c -> (TotalHitCountCollector) c).toList()
+                            );
+                            return null;
+                        }
+                    };
+                    this.hitCountSupplier = () -> new TotalHits(totalHits[0], TotalHits.Relation.EQUAL_TO);
                 } else {
-                    PartialHitCountCollector col = new PartialHitCountCollector(trackTotalHitsUpTo);
-                    this.collector = col;
+                    PartialHitCountCollector.CollectorManager cm = new PartialHitCountCollector.CollectorManager(trackTotalHitsUpTo);
+                    this.collectorManager = cm;
                     this.hitCountSupplier = () -> new TotalHits(
-                        col.getTotalHits(),
-                        col.hasEarlyTerminated() ? TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO : TotalHits.Relation.EQUAL_TO
+                        cm.getTotalHits(),
+                        cm.hasEarlyTerminated() ? TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO : TotalHits.Relation.EQUAL_TO
                     );
                 }
             }
         }
 
+        @Override
+        CollectorManager<? extends Collector, Void> collectorManager() {
+            return collectorManager;
+        }
+
         @Override
         Collector collector() {
-            return collector;
+            throw new UnsupportedOperationException();
         }
 
         @Override