Selaa lähdekoodia

Fix issues with ReinitializingSourceProvider (#118370) (#118431)

The previous fix to ensure that each thread uses its own SearchProvider wasn't good enough.  The read from `perThreadProvider` field could be stale and therefore returning a previous source provider.  Instead the source provider should be returned from `provider` local variable.

This change also addresses another issue, sometimes current docid goes backwards compared to last seen docid and this causes issue when synthetic source provider is used, as doc values can't advance backwards. This change addresses that by returning a new source provider if backwards docid is detected.

Closes #118238

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
Martijn van Groningen 10 kuukautta sitten
vanhempi
commit
e1d83e95be

+ 6 - 0
docs/changelog/118370.yaml

@@ -0,0 +1,6 @@
+pr: 118370
+summary: Fix concurrency issue with `ReinitializingSourceProvider`
+area: Mapping
+type: bug
+issues:
+ - 118238

+ 21 - 5
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

@@ -37,6 +37,7 @@ import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.parser.ParsingException;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -1673,17 +1674,32 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         String sourceMode = randomBoolean() ? "stored" : "synthetic";
         Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
         client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
-        for (int i = 0; i < 10; i++) {
+        int numDocs = 256;
+        for (int i = 0; i < numDocs; i++) {
             index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
         }
         refresh("test-script");
-        try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 |  LIMIT 10")) {
+
+        var pragmas = randomPragmas();
+        if (canUseQueryPragmas()) {
+            Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings());
+            pragmaSettings.put("task_concurrency", 10);
+            pragmaSettings.put("data_partitioning", "doc");
+            pragmas = new QueryPragmas(pragmaSettings.build());
+        }
+        try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
             List<Object> k1Column = Iterators.toList(resp.column(0));
-            assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
+            assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
             List<Object> k2Column = Iterators.toList(resp.column(1));
-            assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
+            assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null)));
             List<Object> meterColumn = Iterators.toList(resp.column(2));
-            assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
+            var expectedMeterColumn = new ArrayList<>(numDocs);
+            double val = 0.0;
+            for (int i = 0; i < numDocs; i++) {
+                expectedMeterColumn.add(val);
+                val += 10000.0;
+            }
+            assertThat(meterColumn, equalTo(expectedMeterColumn));
         }
     }
 

+ 14 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java

@@ -15,13 +15,23 @@ import java.io.IOException;
 import java.util.function.Supplier;
 
 /**
- * This is a workaround for when compute engine executes concurrently with data partitioning by docid.
+ * This class exists as a workaround for using SourceProvider in the compute engine.
+ * <p>
+ * The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
+ * A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
+ * its own {@link SourceProvider}.
+ * <p>
+ * Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle.
  */
 final class ReinitializingSourceProvider implements SourceProvider {
 
     private PerThreadSourceProvider perThreadProvider;
     private final Supplier<SourceProvider> sourceProviderFactory;
 
+    // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
+    // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
+    private int lastSeenDocId;
+
     ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
         this.sourceProviderFactory = sourceProviderFactory;
     }
@@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider {
     public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
         var currentThread = Thread.currentThread();
         PerThreadSourceProvider provider = perThreadProvider;
-        if (provider == null || provider.creatingThread != currentThread) {
+        if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
             provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
             this.perThreadProvider = provider;
         }
-        return perThreadProvider.source.getSource(ctx, doc);
+        lastSeenDocId = doc;
+        return provider.source.getSource(ctx, doc);
     }
 
     private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {