Browse Source

Minimum frequency for transforms (#103515)

* Minimum frequency for transforms

* Fix existing tests

* Default implementation of TransformExtension::getMinFrequency

* Unit test for min transform frequency

* Clean up code

* Lint fixes
Jan Kuipers 1 year ago
parent
commit
e015a7c383
15 changed files with 87 additions and 48 deletions
  1. 8 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/DefaultTransformExtension.java
  2. 6 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java
  3. 7 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformExtension.java
  4. 1 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java
  5. 16 5
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java
  6. 6 5
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java
  7. 1 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java
  8. 4 3
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java
  9. 2 2
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java
  10. 1 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java
  11. 2 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
  12. 4 4
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java
  13. 2 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java
  14. 0 5
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java
  15. 27 17
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java

+ 8 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/DefaultTransformExtension.java

@@ -9,9 +9,12 @@ package org.elasticsearch.xpack.transform;
 
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 
 public class DefaultTransformExtension implements TransformExtension {
 
+    private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1);
+
     @Override
     public boolean includeNodeInfo() {
         return true;
@@ -33,4 +36,9 @@ public class DefaultTransformExtension implements TransformExtension {
             .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
             .build();
     }
+
+    @Override
+    public TimeValue getMinFrequency() {
+        return MIN_FREQUENCY;
+    }
 }

+ 6 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -243,7 +243,12 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
             configManager,
             auditor
         );
-        TransformScheduler scheduler = new TransformScheduler(clock, services.threadPool(), settings);
+        TransformScheduler scheduler = new TransformScheduler(
+            clock,
+            services.threadPool(),
+            settings,
+            getTransformExtension().getMinFrequency()
+        );
         scheduler.start();
 
         transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));

+ 7 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformExtension.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.transform;
 
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 
 public interface TransformExtension {
 
@@ -20,4 +21,10 @@ public interface TransformExtension {
      * source settings.
      */
     Settings getTransformDestinationIndexSettings();
+
+    // TODO(jkuipers): remove this default implementation after the ServerlessTransformPlugin
+    // in the elasticsearch-serverless project is updated.
+    default TimeValue getMinFrequency() {
+        return TimeValue.timeValueSeconds(1);
+    }
 }

+ 1 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.transform.transforms.scheduling;
 
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.xpack.transform.Transform;
 
 import java.util.Objects;
 
@@ -37,7 +36,7 @@ final class TransformScheduledTask {
         TransformScheduler.Listener listener
     ) {
         this.transformId = Objects.requireNonNull(transformId);
-        this.frequency = frequency != null ? frequency : Transform.DEFAULT_TRANSFORM_FREQUENCY;
+        this.frequency = Objects.requireNonNull(frequency);
         this.lastTriggeredTimeMillis = lastTriggeredTimeMillis;
         this.failureCount = failureCount;
         this.nextScheduledTimeMillis = nextScheduledTimeMillis;

+ 16 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java

@@ -73,17 +73,21 @@ public final class TransformScheduler {
      * Set to {@code false} after processing (doesn't matter whether successful or not).
      */
     private final AtomicBoolean isProcessingActive;
+
+    private final TimeValue minFrequency;
+
     /**
      * Stored the scheduled execution for future cancellation.
      */
     private Scheduler.Cancellable scheduledFuture;
 
-    public TransformScheduler(Clock clock, ThreadPool threadPool, Settings settings) {
+    public TransformScheduler(Clock clock, ThreadPool threadPool, Settings settings, TimeValue minFrequency) {
         this.clock = new MonotonicClock(Objects.requireNonNull(clock));
         this.threadPool = Objects.requireNonNull(threadPool);
         this.schedulerFrequency = Transform.SCHEDULER_FREQUENCY.get(settings);
         this.scheduledTasks = new TransformScheduledTaskQueue();
         this.isProcessingActive = new AtomicBoolean();
+        this.minFrequency = minFrequency;
     }
 
     /**
@@ -165,7 +169,7 @@ public final class TransformScheduler {
             }
             return new TransformScheduledTask(
                 task.getTransformId(),
-                task.getFrequency(),
+                getFrequency(task.getFrequency()),
                 currentTimeMillis,
                 task.getFailureCount(),
                 task.getListener()
@@ -198,7 +202,7 @@ public final class TransformScheduler {
         long currentTimeMillis = clock.millis();
         TransformScheduledTask transformScheduledTask = new TransformScheduledTask(
             transformId,
-            transformTaskParams.getFrequency(),
+            getFrequency(transformTaskParams.getFrequency()),
             null,  // this task has not been triggered yet
             0,  // this task has not failed yet
             currentTimeMillis,  // we schedule this task at current clock time so that it is processed ASAP
@@ -223,7 +227,7 @@ public final class TransformScheduler {
             transformId,
             task -> new TransformScheduledTask(
                 task.getTransformId(),
-                task.getFrequency(),
+                getFrequency(task.getFrequency()),
                 task.getLastTriggeredTimeMillis(),
                 failureCount,
                 task.getListener()
@@ -245,7 +249,7 @@ public final class TransformScheduler {
             transformId,
             task -> new TransformScheduledTask(
                 task.getTransformId(),
-                task.getFrequency(),
+                getFrequency(task.getFrequency()),
                 task.getLastTriggeredTimeMillis(),
                 task.getFailureCount(),
                 currentTimeMillis,  // we schedule this task at current clock time so that it is processed ASAP
@@ -273,4 +277,11 @@ public final class TransformScheduler {
     List<TransformScheduledTask> getTransformScheduledTasks() {
         return scheduledTasks.listScheduledTasks();
     }
+
+    private TimeValue getFrequency(TimeValue frequency) {
+        if (frequency == null) {
+            frequency = Transform.DEFAULT_TRANSFORM_FREQUENCY;
+        }
+        return frequency.compareTo(minFrequency) >= 0 ? frequency : minFrequency;
+    }
 }

+ 6 - 5
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.client.internal.ParentTaskAssigningClient;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.search.SearchContextMissingException;
@@ -135,7 +136,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -229,7 +230,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -306,7 +307,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -370,7 +371,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
                     mock(IndexBasedTransformConfigManager.class),
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),
@@ -599,7 +600,7 @@ public class ClientTransformIndexerTests extends ESTestCase {
                 mock(IndexBasedTransformConfigManager.class),
                 mock(TransformCheckpointService.class),
                 mock(TransformAuditor.class),
-                new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
             ),
             mock(CheckpointProvider.class),
             new AtomicReference<>(IndexerState.STOPPED),

+ 1 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

@@ -129,7 +129,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
                     transformsConfigManager,
                     mock(TransformCheckpointService.class),
                     auditor,
-                    new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
                 ),
                 checkpointProvider,
                 initialState,

+ 4 - 3
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.client.internal.ParentTaskAssigningClient;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.shard.ShardId;
@@ -217,7 +218,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         configManager,
                         mock(TransformCheckpointService.class),
                         mock(TransformAuditor.class),
-                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                     ),
                     mock(CheckpointProvider.class),
                     new AtomicReference<>(IndexerState.STOPPED),
@@ -299,7 +300,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                         configManager,
                         mock(TransformCheckpointService.class),
                         mock(TransformAuditor.class),
-                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                        new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                     ),
                     mock(CheckpointProvider.class),
                     new AtomicReference<>(IndexerState.STOPPED),
@@ -430,7 +431,7 @@ public class TransformIndexerFailureOnStatePersistenceTests extends ESTestCase {
                     configManager,
                     mock(TransformCheckpointService.class),
                     mock(TransformAuditor.class),
-                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
+                    new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
                 ),
                 mock(CheckpointProvider.class),
                 new AtomicReference<>(IndexerState.STOPPED),

+ 2 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java

@@ -806,7 +806,7 @@ public class TransformIndexerStateTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
 
         MockedTransformIndexer indexer = new MockedTransformIndexer(
@@ -840,7 +840,7 @@ public class TransformIndexerStateTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
 
         MockedTransformIndexerForStatePersistenceTesting indexer = new MockedTransformIndexerForStatePersistenceTesting(

+ 1 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

@@ -452,7 +452,7 @@ public class TransformIndexerTests extends ESTestCase {
             transformConfigManager,
             mock(TransformCheckpointService.class),
             transformAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
 
         MockedTransformIndexer indexer = new MockedTransformIndexer(

+ 2 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.shard.ShardId;
@@ -444,7 +445,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
             transformsConfigManager,
             transformCheckpointService,
             mockAuditor,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY)
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
 
         ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING));

+ 4 - 4
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

@@ -112,7 +112,7 @@ public class TransformTaskTests extends ESTestCase {
             transformsConfigManager,
             transformsCheckpointService,
             auditor,
-            new TransformScheduler(clock, threadPool, Settings.EMPTY)
+            new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO)
         );
 
         TransformState transformState = new TransformState(
@@ -134,7 +134,7 @@ public class TransformTaskTests extends ESTestCase {
             TaskId.EMPTY_TASK_ID,
             createTransformTaskParams(transformConfig.getId()),
             transformState,
-            new TransformScheduler(clock, threadPool, Settings.EMPTY),
+            new TransformScheduler(clock, threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
             Collections.emptyMap()
@@ -212,7 +212,7 @@ public class TransformTaskTests extends ESTestCase {
             TaskId.EMPTY_TASK_ID,
             createTransformTaskParams(transformConfig.getId()),
             transformState,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY),
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
             Collections.emptyMap()
@@ -431,7 +431,7 @@ public class TransformTaskTests extends ESTestCase {
             TaskId.EMPTY_TASK_ID,
             createTransformTaskParams(transformConfig.getId()),
             transformState,
-            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY),
+            new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY, TimeValue.ZERO),
             auditor,
             threadPool,
             Collections.emptyMap()

+ 2 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskQueueTests.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.transform.transforms.scheduling;
 
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler.Event;
 import org.hamcrest.Matchers;
 import org.junit.After;
@@ -197,7 +198,7 @@ public class TransformScheduledTaskQueueTests extends ESTestCase {
     private static TransformScheduledTask createTask(String transformId, long nextScheduledTimeMillis) {
         return new TransformScheduledTask(
             transformId,
-            null,
+            Transform.DEFAULT_SCHEDULER_FREQUENCY,
             null,
             0,
             nextScheduledTimeMillis,

+ 0 - 5
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java

@@ -32,11 +32,6 @@ public class TransformScheduledTaskTests extends ESTestCase {
         assertThat(task.getListener(), is(equalTo(LISTENER)));
     }
 
-    public void testDefaultFrequency() {
-        TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, null, LAST_TRIGGERED_TIME_MILLIS, 0, 0, LISTENER);
-        assertThat(task.getFrequency(), is(equalTo(DEFAULT_FREQUENCY)));
-    }
-
     public void testNextScheduledTimeMillis() {
         {
             TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 0, 123, LISTENER);

+ 27 - 17
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java

@@ -61,49 +61,59 @@ public class TransformSchedulerTests extends ESTestCase {
     }
 
     public void testScheduling() {
+        testScheduling(5, 0);
+    }
+
+    public void testScheduling_withMinFrequency() {
+        testScheduling(1, 5);
+    }
+
+    // Note: frequencySeconds and minFrequencySeconds together should lead to an expected frequency of 5 seconds.
+    private void testScheduling(int frequencySeconds, int minFreqencySeconds) {
         String transformId = "test-with-fake-clock";
-        int frequencySeconds = 5;
         TimeValue frequency = TimeValue.timeValueSeconds(frequencySeconds);
+        TimeValue minFrequency = TimeValue.timeValueSeconds(minFreqencySeconds);
+        TimeValue fiveSeconds = TimeValue.timeValueSeconds(5);
         TransformTaskParams transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, frequency, false);
         FakeClock clock = new FakeClock(Instant.ofEpochMilli(0));
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, minFrequency);
         transformScheduler.registerTransform(transformTaskParams, listener);
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
-            contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, listener))
+            contains(new TransformScheduledTask(transformId, fiveSeconds, 0L, 0, 5000, listener))
         );
         assertThat(events, hasSize(1));
 
-        for (int i = 0; i < frequencySeconds; ++i) {
+        for (int i = 0; i < 5; ++i) {
             transformScheduler.processScheduledTasks();
             assertThat(
                 transformScheduler.getTransformScheduledTasks(),
-                contains(new TransformScheduledTask(transformId, frequency, 0L, 0, 5000, listener))
+                contains(new TransformScheduledTask(transformId, fiveSeconds, 0L, 0, 5000, listener))
             );
             assertThat(events, hasSize(1));
             clock.advanceTimeBy(Duration.ofMillis(1001));
         }
         assertThat(clock.instant(), is(equalTo(Instant.ofEpochMilli(5005))));
 
-        for (int i = 0; i < frequencySeconds; ++i) {
+        for (int i = 0; i < 5; ++i) {
             transformScheduler.processScheduledTasks();
             assertThat(
                 transformScheduler.getTransformScheduledTasks(),
-                contains(new TransformScheduledTask(transformId, frequency, 5005L, 0, 10005, listener))
+                contains(new TransformScheduledTask(transformId, fiveSeconds, 5005L, 0, 10005, listener))
             );
             assertThat(events, hasSize(2));
             clock.advanceTimeBy(Duration.ofMillis(1001));
         }
         assertThat(clock.instant(), is(equalTo(Instant.ofEpochMilli(10010))));
 
-        for (int i = 0; i < frequencySeconds; ++i) {
+        for (int i = 0; i < 5; ++i) {
             transformScheduler.processScheduledTasks();
             assertThat(
                 transformScheduler.getTransformScheduledTasks(),
-                contains(new TransformScheduledTask(transformId, frequency, 10010L, 0, 15010, listener))
+                contains(new TransformScheduledTask(transformId, fiveSeconds, 10010L, 0, 15010, listener))
             );
             assertThat(events, hasSize(3));
             clock.advanceTimeBy(Duration.ofMillis(1001));
@@ -128,7 +138,7 @@ public class TransformSchedulerTests extends ESTestCase {
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.registerTransform(transformTaskParams, listener);
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
@@ -180,7 +190,7 @@ public class TransformSchedulerTests extends ESTestCase {
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.registerTransform(transformTaskParams, listener);
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
@@ -230,7 +240,7 @@ public class TransformSchedulerTests extends ESTestCase {
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.registerTransform(transformTaskParams, listener);
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
@@ -267,7 +277,7 @@ public class TransformSchedulerTests extends ESTestCase {
         FakeClock clock = new FakeClock(Instant.ofEpochMilli(0));
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         TransformScheduler.Listener taskModifyingListener = new TransformScheduler.Listener() {
             private boolean firstTime = true;
 
@@ -309,7 +319,7 @@ public class TransformSchedulerTests extends ESTestCase {
         Clock clock = Clock.systemUTC();
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.start();
         transformScheduler.registerTransform(transformTaskParams, events::add);
         assertThat(events, hasSize(1));
@@ -334,7 +344,7 @@ public class TransformSchedulerTests extends ESTestCase {
         Clock clock = Clock.systemUTC();
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.start();
         transformScheduler.registerTransform(transformTaskParams, events::add);
         assertThat(events, hasSize(1));
@@ -391,7 +401,7 @@ public class TransformSchedulerTests extends ESTestCase {
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.registerTransform(transformTaskParams1, listener);
         transformScheduler.registerTransform(transformTaskParams2, listener);
         transformScheduler.registerTransform(transformTaskParams3, listener);
@@ -421,7 +431,7 @@ public class TransformSchedulerTests extends ESTestCase {
         CopyOnWriteArrayList<TransformScheduler.Event> events = new CopyOnWriteArrayList<>();
         TransformScheduler.Listener listener = events::add;
 
-        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS);
+        TransformScheduler transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
         transformScheduler.registerTransform(transformTaskParams1, listener);
         transformScheduler.registerTransform(transformTaskParams2, listener);
         transformScheduler.registerTransform(transformTaskParams3, listener);