Bläddra i källkod

[Transform] Use monotonic time in `TransformScheduler` (#95456)

Przemysław Witek 2 år sedan
förälder
incheckning
6b8d1ef7b5

+ 6 - 0
docs/changelog/95456.yaml

@@ -0,0 +1,6 @@
+pr: 95456
+summary: Use monotonic time in `TransformScheduler`
+area: Transform
+type: bug
+issues:
+ - 95445

+ 46 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/MonotonicClock.java

@@ -0,0 +1,46 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms.scheduling;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link MonotonicClock} class ensures that subsequent calls of {@link Clock#instant} yield non-decreasing sequence of instants.
+ */
+class MonotonicClock extends Clock {
+
+    private final Clock internalClock;
+    // Keeps track of what was the latest time seen so far.
+    private final AtomicReference<Instant> latestInstant;
+
+    MonotonicClock(Clock clock) {
+        this.internalClock = Objects.requireNonNull(clock);
+        this.latestInstant = new AtomicReference<>(clock.instant());
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return internalClock.getZone();
+    }
+
+    @Override
+    public Clock withZone(ZoneId zone) {
+        return internalClock.withZone(zone);
+    }
+
+    @Override
+    public Instant instant() {
+        Instant current = internalClock.instant();
+        // Either return the just-fetched current time or the previously recorded latest time, whichever is later.
+        return latestInstant.updateAndGet(previousLatest -> current.isAfter(previousLatest) ? current : previousLatest);
+    }
+}

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java

@@ -79,7 +79,7 @@ public final class TransformScheduler {
     private Scheduler.Cancellable scheduledFuture;
 
     public TransformScheduler(Clock clock, ThreadPool threadPool, Settings settings) {
-        this.clock = Objects.requireNonNull(clock);
+        this.clock = new MonotonicClock(Objects.requireNonNull(clock));
         this.threadPool = Objects.requireNonNull(threadPool);
         this.schedulerFrequency = Transform.SCHEDULER_FREQUENCY.get(settings);
         this.scheduledTasks = new TransformScheduledTaskQueue();

+ 58 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/FakeClock.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms.scheduling;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Objects;
+
+/**
+ * {@link FakeClock} class in a test implementation of {@link Clock} and provides the possibility to set arbitrary current time.
+ */
+class FakeClock extends Clock {
+
+    private Instant currentTime;
+
+    FakeClock(Instant time) {
+        currentTime = Objects.requireNonNull(time);
+    }
+
+    /**
+     * Sets the current time to an arbitrary time.
+     * @param time arbitrary time, may be earlier than the previously-set current time
+     */
+    public void setCurrentTime(Instant time) {
+        currentTime = Objects.requireNonNull(time);
+    }
+
+    /**
+     * Advances the current time by an arbitrary duration.
+     * @param duration arbitrary duration, may be negative
+     */
+    public void advanceTimeBy(Duration duration) {
+        Objects.requireNonNull(duration);
+        setCurrentTime(currentTime.plus(duration));
+    }
+
+    @Override
+    public Instant instant() {
+        return currentTime;
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return ZoneId.systemDefault();
+    }
+
+    @Override
+    public Clock withZone(ZoneId zone) {
+        return this;
+    }
+}

+ 44 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/MonotonicClockTests.java

@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms.scheduling;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+
+public class MonotonicClockTests extends ESTestCase {
+
+    public void testMonotonicityWithFakeClock() {
+        FakeClock fakeClock = new FakeClock(Instant.now());
+        Clock clock = new MonotonicClock(fakeClock);
+        long lastTime = clock.millis();
+        for (int i = 0; i < 100_000_000; ++i) {
+            long currentTime = clock.millis();
+            assertThat("At iteration " + i, currentTime, is(greaterThanOrEqualTo(lastTime)));
+            lastTime = currentTime;
+            // -1 is included in order to simulate that the clock can sometimes go back in time
+            fakeClock.advanceTimeBy(Duration.ofMillis(randomLongBetween(-1, 100)));
+        }
+    }
+
+    public void testMonotonicityWithSystemClock() {
+        Clock systemClock = Clock.systemUTC();
+        Clock clock = new MonotonicClock(systemClock);
+        long lastTime = clock.millis();
+        for (int i = 0; i < 100_000_000; ++i) {
+            long currentTime = clock.millis();
+            assertThat("At iteration " + i, currentTime, is(greaterThanOrEqualTo(lastTime)));
+            lastTime = currentTime;
+        }
+    }
+}

+ 4 - 42
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java

@@ -20,7 +20,6 @@ import org.junit.Before;
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
-import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -39,7 +38,6 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.notNullValue;
 
 public class TransformSchedulerTests extends ESTestCase {
 
@@ -88,7 +86,7 @@ public class TransformSchedulerTests extends ESTestCase {
             assertThat(events, hasSize(1));
             clock.advanceTimeBy(Duration.ofMillis(1001));
         }
-        assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(5005))));
+        assertThat(clock.instant(), is(equalTo(Instant.ofEpochMilli(5005))));
 
         for (int i = 0; i < frequencySeconds; ++i) {
             transformScheduler.processScheduledTasks();
@@ -99,7 +97,7 @@ public class TransformSchedulerTests extends ESTestCase {
             assertThat(events, hasSize(2));
             clock.advanceTimeBy(Duration.ofMillis(1001));
         }
-        assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(10010))));
+        assertThat(clock.instant(), is(equalTo(Instant.ofEpochMilli(10010))));
 
         for (int i = 0; i < frequencySeconds; ++i) {
             transformScheduler.processScheduledTasks();
@@ -110,7 +108,7 @@ public class TransformSchedulerTests extends ESTestCase {
             assertThat(events, hasSize(3));
             clock.advanceTimeBy(Duration.ofMillis(1001));
         }
-        assertThat(clock.currentTime, is(equalTo(Instant.ofEpochMilli(15015))));
+        assertThat(clock.instant(), is(equalTo(Instant.ofEpochMilli(15015))));
 
         assertThat(events.get(0), is(equalTo(new TransformScheduler.Event(transformId, 0, 0))));
         assertThat(events.get(1), is(equalTo(new TransformScheduler.Event(transformId, 5000, 5005))));
@@ -147,7 +145,7 @@ public class TransformSchedulerTests extends ESTestCase {
             assertThat(events, hasSize(1));
             clock.advanceTimeBy(Duration.ofMillis(TEST_SCHEDULER_FREQUENCY.millis()));
         }
-        assertThat(clock.currentTime, is(equalTo(Instant.ofEpochSecond(60))));
+        assertThat(clock.instant(), is(equalTo(Instant.ofEpochSecond(60))));
 
         transformScheduler.handleTransformFailureCountChanged(transformId, 1);
         assertThat(
@@ -451,40 +449,4 @@ public class TransformSchedulerTests extends ESTestCase {
         assertThat(events.get(4).transformId(), is(equalTo(transformId2)));
         assertThat(events.get(5).transformId(), is(equalTo(transformId3)));
     }
-
-    private static class FakeClock extends Clock {
-
-        private Instant currentTime;
-
-        FakeClock(Instant time) {
-            assertThat(time, is(notNullValue()));
-            currentTime = time;
-        }
-
-        public void setCurrentTime(Instant time) {
-            // We cannot go back in time.
-            assertThat(time, is(greaterThanOrEqualTo(currentTime)));
-            currentTime = time;
-        }
-
-        public void advanceTimeBy(Duration duration) {
-            assertThat(duration, is(notNullValue()));
-            setCurrentTime(currentTime.plus(duration));
-        }
-
-        @Override
-        public Instant instant() {
-            return currentTime;
-        }
-
-        @Override
-        public ZoneId getZone() {
-            return ZoneId.systemDefault();
-        }
-
-        @Override
-        public Clock withZone(ZoneId zone) {
-            return this;
-        }
-    }
 }