|
@@ -36,6 +36,8 @@ import org.apache.lucene.search.ConstantScoreWeight;
|
|
|
import org.apache.lucene.search.DocIdSetIterator;
|
|
|
import org.apache.lucene.search.Explanation;
|
|
|
import org.apache.lucene.search.IndexSearcher;
|
|
|
+import org.apache.lucene.search.IndexSearcher.LeafSlice;
|
|
|
+import org.apache.lucene.search.KnnFloatVectorQuery;
|
|
|
import org.apache.lucene.search.LeafCollector;
|
|
|
import org.apache.lucene.search.MatchAllDocsQuery;
|
|
|
import org.apache.lucene.search.Query;
|
|
@@ -58,6 +60,9 @@ import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
|
|
import org.elasticsearch.common.lucene.index.SequentialStoredFieldsLeafReader;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
+import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
|
|
+import org.elasticsearch.common.util.concurrent.ThreadContext;
|
|
|
import org.elasticsearch.core.IOUtils;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
|
@@ -78,6 +83,7 @@ import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import static org.elasticsearch.search.internal.ContextIndexSearcher.intersectScorerAndBitSet;
|
|
@@ -175,6 +181,116 @@ public class ContextIndexSearcherTests extends ESTestCase {
|
|
|
directory.close();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check that knn queries rewrite parallelizes on the number of segments if there are enough
|
|
|
+ * threads available.
|
|
|
+ */
|
|
|
+ public void testConcurrentKnnRewrite() throws Exception {
|
|
|
+ final Directory directory = newDirectory();
|
|
|
+ try (
|
|
|
+ IndexWriter iw = new IndexWriter(
|
|
|
+ directory,
|
|
|
+ new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)
|
|
|
+ )
|
|
|
+ ) {
|
|
|
+ final int numDocs = randomIntBetween(100, 200);
|
|
|
+ for (int i = 0; i < numDocs; i++) {
|
|
|
+ Document document = new Document();
|
|
|
+ document.add(new StringField("field", "value", Field.Store.NO));
|
|
|
+ iw.addDocument(document);
|
|
|
+ if (rarely()) {
|
|
|
+ iw.commit();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // make sure we have more threads than segments available to check later call to execute method
|
|
|
+ int nThreads = randomIntBetween(2, 5);
|
|
|
+
|
|
|
+ // use an executor that counts calls to its "execute" method
|
|
|
+ AtomicInteger executeCalls = new AtomicInteger(0);
|
|
|
+ ThreadPoolExecutor executor = null;
|
|
|
+ DirectoryReader directoryReader = null;
|
|
|
+ try {
|
|
|
+ executor = new PrioritizedEsThreadPoolExecutor(
|
|
|
+ "test",
|
|
|
+ nThreads,
|
|
|
+ Integer.MAX_VALUE,
|
|
|
+ 0L,
|
|
|
+ TimeUnit.MILLISECONDS,
|
|
|
+ EsExecutors.daemonThreadFactory("queuetest"),
|
|
|
+ new ThreadContext(Settings.EMPTY),
|
|
|
+ null
|
|
|
+ ) {
|
|
|
+ @Override
|
|
|
+ public void execute(Runnable command) {
|
|
|
+ executeCalls.incrementAndGet();
|
|
|
+ super.execute(command);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ directoryReader = DirectoryReader.open(directory);
|
|
|
+ ContextIndexSearcher searcher = new ContextIndexSearcher(
|
|
|
+ directoryReader,
|
|
|
+ IndexSearcher.getDefaultSimilarity(),
|
|
|
+ IndexSearcher.getDefaultQueryCache(),
|
|
|
+ IndexSearcher.getDefaultQueryCachingPolicy(),
|
|
|
+ 1,
|
|
|
+ randomBoolean(),
|
|
|
+ executor
|
|
|
+ );
|
|
|
+ // check that we calculate one slice per segment
|
|
|
+ int numSegments = directoryReader.getContext().leaves().size();
|
|
|
+ assertEquals(numSegments, searcher.slices(directoryReader.getContext().leaves()).length);
|
|
|
+
|
|
|
+ KnnFloatVectorQuery vectorQuery = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, 10, null);
|
|
|
+ Query rewritenQuery = vectorQuery.rewrite(searcher);
|
|
|
+ // Note: we expect one execute calls less than segments since the last is executed on the caller thread.
|
|
|
+ // For details see QueueSizeBasedExecutor#processTask
|
|
|
+ assertEquals(numSegments - 1, executeCalls.get());
|
|
|
+
|
|
|
+ AtomicInteger collectorCalls = new AtomicInteger(0);
|
|
|
+ searcher.search(rewritenQuery, new CollectorManager<Collector, Object>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Collector newCollector() {
|
|
|
+ collectorCalls.incrementAndGet();
|
|
|
+ return new Collector() {
|
|
|
+ @Override
|
|
|
+ public LeafCollector getLeafCollector(LeafReaderContext context) {
|
|
|
+ return new LeafBucketCollector() {
|
|
|
+ @Override
|
|
|
+ public void collect(int doc, long owningBucketOrd) throws IOException {
|
|
|
+ // noop
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ScoreMode scoreMode() {
|
|
|
+ return ScoreMode.COMPLETE;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object reduce(Collection<Collector> collectors) throws IOException {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LeafSlice[] leafSlices = ContextIndexSearcher.computeSlices(
|
|
|
+ directoryReader.getContext().leaves(),
|
|
|
+ executor.getMaximumPoolSize(),
|
|
|
+ 1
|
|
|
+ );
|
|
|
+ assertEquals(leafSlices.length, collectorCalls.get());
|
|
|
+ } finally {
|
|
|
+ directoryReader.close();
|
|
|
+ directory.close();
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testConcurrentSearchAllThreadsFinish() throws Exception {
|
|
|
final Directory directory = newDirectory();
|
|
|
IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE));
|
|
@@ -195,17 +311,41 @@ public class ContextIndexSearcherTests extends ESTestCase {
|
|
|
AtomicInteger missingDocs = new AtomicInteger();
|
|
|
AtomicInteger visitDocs = new AtomicInteger(0);
|
|
|
|
|
|
+ // determine how many docs are in the first slice for correct, this is how much we are missing by
|
|
|
+ // throwing the exception in the first collector
|
|
|
+ int minDocsPerSlice = 1;
|
|
|
+
|
|
|
+ ContextIndexSearcher searcher = new ContextIndexSearcher(
|
|
|
+ directoryReader,
|
|
|
+ IndexSearcher.getDefaultSimilarity(),
|
|
|
+ IndexSearcher.getDefaultQueryCache(),
|
|
|
+ IndexSearcher.getDefaultQueryCachingPolicy(),
|
|
|
+ minDocsPerSlice,
|
|
|
+ randomBoolean(),
|
|
|
+ executor
|
|
|
+ );
|
|
|
+
|
|
|
+ LeafSlice[] leafSlices = ContextIndexSearcher.computeSlices(
|
|
|
+ directoryReader.getContext().leaves(),
|
|
|
+ executor.getMaximumPoolSize(),
|
|
|
+ minDocsPerSlice
|
|
|
+ );
|
|
|
+ // The test collector manager throws an exception when the first segment gets collected.
|
|
|
+ // All documents in that slice count towards the "missing" docs in the later assertion.
|
|
|
+ int docsFirstSlice = Arrays.stream(leafSlices[0].leaves).map(LeafReaderContext::reader).mapToInt(LeafReader::maxDoc).sum();
|
|
|
+ AtomicInteger collectorCalls = new AtomicInteger(0);
|
|
|
CollectorManager<Collector, Void> collectorManager = new CollectorManager<>() {
|
|
|
boolean first = true;
|
|
|
|
|
|
@Override
|
|
|
public Collector newCollector() {
|
|
|
+ collectorCalls.incrementAndGet();
|
|
|
if (first) {
|
|
|
first = false;
|
|
|
return new Collector() {
|
|
|
@Override
|
|
|
public LeafCollector getLeafCollector(LeafReaderContext context) {
|
|
|
- missingDocs.set(context.reader().numDocs());
|
|
|
+ missingDocs.set(docsFirstSlice);
|
|
|
throw new IllegalArgumentException("fake exception");
|
|
|
}
|
|
|
|
|
@@ -245,26 +385,12 @@ public class ContextIndexSearcherTests extends ESTestCase {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- ContextIndexSearcher searcher = new ContextIndexSearcher(
|
|
|
- directoryReader,
|
|
|
- IndexSearcher.getDefaultSimilarity(),
|
|
|
- IndexSearcher.getDefaultQueryCache(),
|
|
|
- IndexSearcher.getDefaultQueryCachingPolicy(),
|
|
|
- 1,
|
|
|
- randomBoolean(),
|
|
|
- executor
|
|
|
- ) {
|
|
|
- @Override
|
|
|
- protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
|
|
|
- return slices(leaves, 1, 1);
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
IllegalArgumentException exception = expectThrows(
|
|
|
IllegalArgumentException.class,
|
|
|
() -> searcher.search(new MatchAllDocsQuery(), collectorManager)
|
|
|
);
|
|
|
assertThat(exception.getMessage(), equalTo("fake exception"));
|
|
|
+ assertEquals(leafSlices.length, collectorCalls.get());
|
|
|
assertThat(visitDocs.get() + missingDocs.get(), equalTo(numDocs));
|
|
|
directoryReader.close();
|
|
|
directory.close();
|
|
@@ -396,7 +522,7 @@ public class ContextIndexSearcherTests extends ESTestCase {
|
|
|
int iter = randomIntBetween(16, 64);
|
|
|
for (int i = 0; i < iter; i++) {
|
|
|
int numThreads = randomIntBetween(1, 16);
|
|
|
- IndexSearcher.LeafSlice[] slices = ContextIndexSearcher.computeSlices(contexts, numThreads, 1);
|
|
|
+ LeafSlice[] slices = ContextIndexSearcher.computeSlices(contexts, numThreads, 1);
|
|
|
assertSlices(slices, numDocs, numThreads);
|
|
|
}
|
|
|
// expect exception for numThreads < 1
|
|
@@ -409,12 +535,12 @@ public class ContextIndexSearcherTests extends ESTestCase {
|
|
|
IOUtils.close(reader, w, dir);
|
|
|
}
|
|
|
|
|
|
- private void assertSlices(IndexSearcher.LeafSlice[] slices, int numDocs, int numThreads) {
|
|
|
+ private void assertSlices(LeafSlice[] slices, int numDocs, int numThreads) {
|
|
|
// checks that the number of slices is not bigger than the number of available threads
|
|
|
// and each slice contains at least 10% of the data (which means the max number of slices is 10)
|
|
|
int sumDocs = 0;
|
|
|
assertThat(slices.length, lessThanOrEqualTo(numThreads));
|
|
|
- for (IndexSearcher.LeafSlice slice : slices) {
|
|
|
+ for (LeafSlice slice : slices) {
|
|
|
int sliceDocs = Arrays.stream(slice.leaves).mapToInt(l -> l.reader().maxDoc()).sum();
|
|
|
assertThat(sliceDocs, greaterThanOrEqualTo((int) (0.1 * numDocs)));
|
|
|
sumDocs += sliceDocs;
|