Browse Source

ESQL: Fix Driver creating status with a live list of operators (#132260) (#132340)

Manual 8.19 backport of https://github.com/elastic/elasticsearch/pull/132260
Iván Cea Fontenla 2 months ago
parent
commit
261da78e6a

+ 6 - 0
docs/changelog/132260.yaml

@@ -0,0 +1,6 @@
+pr: 132260
+summary: FIx Driver creating status with a live list of operators
+area: ES|QL
+type: bug
+issues:
+ - 131564

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

@@ -571,7 +571,7 @@ public class Driver implements Releasable, Describable {
                 prev.cpuNanos() + extraCpuNanos,
                 prev.iterations() + extraIterations,
                 status,
-                statusOfCompletedOperators,
+                List.copyOf(statusOfCompletedOperators),
                 activeOperators.stream().map(op -> new OperatorStatus(op.toString(), op.status())).toList(),
                 sleeps
             );

+ 55 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

@@ -49,6 +49,8 @@ import java.util.function.LongSupplier;
 
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
 
 public class DriverTests extends ESTestCase {
     /**
@@ -252,6 +254,59 @@ public class DriverTests extends ESTestCase {
         assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
     }
 
+    public void testUnchangedStatus() {
+        DriverContext driverContext = driverContext();
+        List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
+        List<Page> outPages = new ArrayList<>();
+
+        long startEpoch = randomNonNegativeLong();
+        long startNanos = randomLong();
+        long waitTime = randomLongBetween(10000, 100000);
+        long tickTime = randomLongBetween(10000, 100000);
+        long statusInterval = randomLongBetween(1, 10);
+
+        Driver driver = new Driver(
+            "unset",
+            "test",
+            startEpoch,
+            startNanos,
+            driverContext,
+            () -> "unset",
+            new CannedSourceOperator(inPages.iterator()),
+            List.of(),
+            new TestResultPageSinkOperator(outPages::add),
+            TimeValue.timeValueNanos(statusInterval),
+            () -> {}
+        );
+
+        NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
+
+        int iterationsPerTick = randomIntBetween(1, 10);
+
+        for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
+            DriverStatus initialStatus = driver.status();
+            long completedOperatorsHash = initialStatus.completedOperators().hashCode();
+            long activeOperatorsHash = initialStatus.activeOperators().hashCode();
+            long sleepsHash = initialStatus.sleeps().hashCode();
+
+            driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
+
+            DriverStatus newStatus = driver.status();
+            assertThat(newStatus, not(sameInstance(initialStatus)));
+            assertThat(
+                newStatus.completedOperators() != initialStatus.completedOperators()
+                    || newStatus.completedOperators().hashCode() == completedOperatorsHash,
+                equalTo(true)
+            );
+            assertThat(
+                newStatus.activeOperators() != initialStatus.activeOperators()
+                    || newStatus.activeOperators().hashCode() == activeOperatorsHash,
+                equalTo(true)
+            );
+            assertThat(newStatus.sleeps() != initialStatus.sleeps() || newStatus.sleeps().hashCode() == sleepsHash, equalTo(true));
+        }
+    }
+
     class NowSupplier implements LongSupplier {
         private final long startNanos;
         private final long waitTime;