Browse Source

ESQL: Fix Driver status iterations and cpuTime (#123290)

Fixes https://github.com/elastic/elasticsearch/issues/122967

When the Driver reported status, if the "report at least every X time" report was triggered, it was re-adding the same iterations and cpuTime again, as it wasn't clearing it before the next iteration.
Now, there are separated variables for the last updated iterations and reported time.
Iván Cea Fontenla 7 months ago
parent
commit
e8b01ff7c0

+ 6 - 0
docs/changelog/123290.yaml

@@ -0,0 +1,6 @@
+pr: 123290
+summary: Fix Driver status iterations and `cpuTime`
+area: ES|QL
+type: enhancement
+issues:
+ - 122967

+ 21 - 10
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

@@ -171,9 +171,16 @@ public class Driver implements Releasable, Describable {
     SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
         updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
         long maxTimeNanos = maxTime.nanos();
+        // Start time, used to stop the calculations after maxTime has passed.
         long startTime = nowSupplier.getAsLong();
+        // The time of the next forced status update.
         long nextStatus = startTime + statusNanos;
-        int iter = 0;
+        // Total executed iterations this run, used to stop the calculations after maxIterations have passed.
+        int totalIterationsThisRun = 0;
+        // The iterations to be reported on the next status update.
+        int iterationsSinceLastStatusUpdate = 0;
+        // The time passed since the last status update.
+        long lastStatusUpdateTime = startTime;
         while (true) {
             IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
             try {
@@ -182,29 +189,33 @@ public class Driver implements Releasable, Describable {
                 closeEarlyFinishedOperators();
                 assert isFinished() : "not finished after early termination";
             }
-            iter++;
+            totalIterationsThisRun++;
+            iterationsSinceLastStatusUpdate++;
+
+            long now = nowSupplier.getAsLong();
             if (isBlocked.listener().isDone() == false) {
-                updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason());
+                updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
                 return isBlocked.listener();
             }
             if (isFinished()) {
-                finishNanos = nowSupplier.getAsLong();
-                updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE, "driver done");
+                finishNanos = now;
+                updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
                 driverContext.finish();
                 Releasables.close(releasable, driverContext.getSnapshot());
                 return Operator.NOT_BLOCKED.listener();
             }
-            long now = nowSupplier.getAsLong();
-            if (iter >= maxIterations) {
-                updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver iterations");
+            if (totalIterationsThisRun >= maxIterations) {
+                updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
                 return Operator.NOT_BLOCKED.listener();
             }
             if (now - startTime >= maxTimeNanos) {
-                updateStatus(now - startTime, iter, DriverStatus.Status.WAITING, "driver time");
+                updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
                 return Operator.NOT_BLOCKED.listener();
             }
             if (now > nextStatus) {
-                updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING, "driver running");
+                updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
+                iterationsSinceLastStatusUpdate = 0;
+                lastStatusUpdateTime = now;
                 nextStatus = now + statusNanos;
             }
         }

+ 38 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java

@@ -167,6 +167,44 @@ public class DriverTests extends ESTestCase {
         assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
     }
 
+    public void testProfileAndStatusInterval() {
+        DriverContext driverContext = driverContext();
+        List<Page> inPages = randomList(2, 100, DriverTests::randomPage);
+        List<Page> outPages = new ArrayList<>();
+
+        long startEpoch = randomNonNegativeLong();
+        long startNanos = randomLong();
+        long waitTime = randomLongBetween(10000, 100000);
+        long tickTime = randomLongBetween(10000, 100000);
+        long statusInterval = randomLongBetween(1, 10);
+
+        Driver driver = createDriver(startEpoch, startNanos, driverContext, inPages, outPages, TimeValue.timeValueNanos(statusInterval));
+
+        NowSupplier nowSupplier = new NowSupplier(startNanos, waitTime, tickTime);
+
+        int iterationsPerTick = randomIntBetween(1, 10);
+
+        for (int i = 0; i < inPages.size(); i += iterationsPerTick) {
+            logger.info("status {} {}", i, driver.status());
+            assertThat(driver.status().status(), equalTo(i == 0 ? DriverStatus.Status.QUEUED : DriverStatus.Status.WAITING));
+            assertThat(driver.status().started(), equalTo(startEpoch));
+            assertThat(driver.status().iterations(), equalTo((long) i));
+            assertThat(driver.status().cpuNanos(), equalTo(tickTime * i));
+            driver.run(TimeValue.timeValueDays(10), iterationsPerTick, nowSupplier);
+        }
+
+        logger.info("status {}", driver.status());
+        assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE));
+        assertThat(driver.status().started(), equalTo(startEpoch));
+        assertThat(driver.status().iterations(), equalTo((long) inPages.size()));
+        assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size()));
+
+        logger.info("profile {}", driver.profile());
+        assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1)));
+        assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size()));
+        assertThat(driver.profile().iterations(), equalTo((long) inPages.size()));
+    }
+
     private static Driver createDriver(
         long startEpoch,
         long startNanos,