Browse Source

Reload Lucene in time series source when thread changes (#106727)

The timeseries  source operator should reload Lucene structures, 
such as doc values and scorers, when the executing thread changes.
Nhat Nguyen 1 year ago
parent
commit
c1467e02c8

+ 29 - 6
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorFactory.java

@@ -228,6 +228,9 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
             void consume() throws IOException {
                 if (queue != null) {
                     currentTsid = BytesRef.deepCopyOf(queue.top().timeSeriesHash);
+                    if (queue.size() > 0) {
+                        queue.top().reinitializeIfNeeded(Thread.currentThread());
+                    }
                     while (queue.size() > 0) {
                         if (remainingDocs <= 0 || currentPagePos >= maxPageSize) {
                             break;
@@ -249,12 +252,14 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
                             newTop = queue.size() > 0 ? queue.top() : null;
                         }
                         if (newTop != null && newTop.timeSeriesHash.equals(currentTsid) == false) {
+                            newTop.reinitializeIfNeeded(Thread.currentThread());
                             globalTsidOrd++;
                             currentTsid = BytesRef.deepCopyOf(newTop.timeSeriesHash);
                         }
                     }
                 } else {
                     // Only one segment, so no need to use priority queue and use segment ordinals as tsid ord.
+                    leaf.reinitializeIfNeeded(Thread.currentThread());
                     while (leaf.nextDoc()) {
                         tsOrdBuilder.appendInt(leaf.timeSeriesHashOrd);
                         timestampIntervalBuilder.appendLong(leaf.timestamp);
@@ -280,37 +285,55 @@ public record TimeSeriesSortedSourceOperatorFactory(int limit, int maxPageSize,
             static class Leaf {
 
                 private final int segmentOrd;
-                private final SortedDocValues tsids;
-                private final SortedNumericDocValues timestamps;
-                private final DocIdSetIterator iterator;
+                private final Weight weight;
+                private final LeafReaderContext leaf;
+                private SortedDocValues tsids;
+                private SortedNumericDocValues timestamps;
+                private DocIdSetIterator iterator;
+                private Thread createdThread;
 
                 private long timestamp;
                 private int timeSeriesHashOrd;
                 private BytesRef timeSeriesHash;
+                private int docID = -1;
 
                 Leaf(Weight weight, LeafReaderContext leaf) throws IOException {
                     this.segmentOrd = leaf.ord;
+                    this.weight = weight;
+                    this.leaf = leaf;
+                    this.createdThread = Thread.currentThread();
                     tsids = leaf.reader().getSortedDocValues("_tsid");
                     timestamps = leaf.reader().getSortedNumericDocValues("@timestamp");
                     iterator = weight.scorer(leaf).iterator();
                 }
 
                 boolean nextDoc() throws IOException {
-                    int docID = iterator.nextDoc();
+                    docID = iterator.nextDoc();
                     if (docID == DocIdSetIterator.NO_MORE_DOCS) {
                         return false;
                     }
 
-                    boolean advanced = tsids.advanceExact(iterator.docID());
+                    boolean advanced = tsids.advanceExact(docID);
                     assert advanced;
                     timeSeriesHashOrd = tsids.ordValue();
                     timeSeriesHash = tsids.lookupOrd(timeSeriesHashOrd);
-                    advanced = timestamps.advanceExact(iterator.docID());
+                    advanced = timestamps.advanceExact(docID);
                     assert advanced;
                     timestamp = timestamps.nextValue();
                     return true;
                 }
 
+                void reinitializeIfNeeded(Thread executingThread) throws IOException {
+                    if (executingThread != createdThread) {
+                        tsids = leaf.reader().getSortedDocValues("_tsid");
+                        timestamps = leaf.reader().getSortedNumericDocValues("@timestamp");
+                        iterator = weight.scorer(leaf).iterator();
+                        if (docID != -1) {
+                            iterator.advance(docID);
+                        }
+                        createdThread = executingThread;
+                    }
+                }
             }
 
         }

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java

@@ -144,7 +144,7 @@ public class TimeSeriesSortedSourceOperatorTests extends AnyOperatorTestCase {
 
     public void testRandom() {
         record Doc(int host, long timestamp, long metric) {}
-        int numDocs = between(1, 1000);
+        int numDocs = between(1, 5000);
         List<Doc> docs = new ArrayList<>();
         Map<Integer, Long> timestamps = new HashMap<>();
         long t0 = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");