Browse Source

[Transform] Finetune Schedule to be less noisy on retry and retry slower (#88531)

reduce amount of log and audits if the same failure happens in a row
and change the mininimum wait time for retrying to 5s
Hendrik Muhs 3 years ago
parent
commit
9a0f05f955

+ 6 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.core.transform.notifications;
 
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
@@ -37,4 +38,9 @@ public class TransformAuditMessage extends AbstractAuditMessage {
     protected Optional<String> getResourceField() {
         return Optional.of(TRANSFORM_ID.getPreferredName());
     }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this, true, true);
+    }
 }

+ 9 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java

@@ -31,6 +31,8 @@ class TransformContext {
     private final Listener taskListener;
     private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES;
     private final AtomicInteger failureCount;
+    // Keeps track of the last failure that occured, used for throttling logs and audit
+    private final AtomicReference<String> lastFailure = new AtomicReference<>();
     private volatile Instant changesLastDetectedAt;
     private volatile Instant lastSearchTime;
     private volatile boolean shouldStopAtCheckpoint = false;
@@ -68,6 +70,7 @@ class TransformContext {
     void resetReasonAndFailureCounter() {
         stateReason.set(null);
         failureCount.set(0);
+        lastFailure.set(null);
         taskListener.failureCountChanged();
     }
 
@@ -99,12 +102,17 @@ class TransformContext {
         return failureCount.get();
     }
 
-    int incrementAndGetFailureCount() {
+    int incrementAndGetFailureCount(String failure) {
         int newFailureCount = failureCount.incrementAndGet();
+        lastFailure.set(failure);
         taskListener.failureCountChanged();
         return newFailureCount;
     }
 
+    String getLastFailure() {
+        return lastFailure.get();
+    }
+
     void setChangesLastDetectedAt(Instant time) {
         changesLastDetectedAt = time;
     }

+ 18 - 11
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -120,8 +120,6 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
     private volatile TransformCheckpoint lastCheckpoint;
     private volatile TransformCheckpoint nextCheckpoint;
 
-    // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
-    private volatile String lastAuditedExceptionMessage = null;
     private volatile RunState runState;
 
     private volatile long lastCheckpointCleanup = 0L;
@@ -924,7 +922,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
      * (Note: originally this method was synchronized, which is not necessary)
      */
     void handleFailure(Exception e) {
-        logger.warn(() -> "[" + getJobId() + "] transform encountered an exception: ", e);
+        // more detailed reporting in the handlers and below
+        logger.debug(() -> "[" + getJobId() + "] transform encountered an exception: ", e);
         Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
 
         if (unwrappedException instanceof CircuitBreakingException) {
@@ -957,7 +956,13 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
         int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries())
             .orElse(context.getNumFailureRetries());
-        if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) {
+
+        // group failures to decide whether to report it below
+        final String thisFailureClass = unwrappedException.getClass().toString();
+        final String lastFailureClass = context.getLastFailure();
+        final int failureCount = context.incrementAndGetFailureCount(thisFailureClass);
+
+        if (numFailureRetries != -1 && failureCount > numFailureRetries) {
             failIndexer(
                 "task encountered more than "
                     + numFailureRetries
@@ -969,14 +974,16 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
 
         // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
         // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
-        if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
-            String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);
-
-            auditor.warning(
-                getJobId(),
-                "Transform encountered an exception: " + message + "; Will attempt again at next scheduled trigger."
+        // and if the number of retries is about to exceed
+        if (thisFailureClass.equals(lastFailureClass) == false || failureCount == numFailureRetries) {
+            String message = format(
+                "Transform encountered an exception: [%s]; Will automatically retry [%d/%d]",
+                ExceptionRootCauseFinder.getDetailedMessage(unwrappedException),
+                failureCount,
+                numFailureRetries
             );
-            lastAuditedExceptionMessage = message;
+            logger.warn(() -> "[" + getJobId() + "] " + message);
+            auditor.warning(getJobId(), message);
         }
     }
 

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java

@@ -23,7 +23,7 @@ final class TransformScheduledTask {
     /**
      * Minimum delay that can be applied after a failure.
      */
-    private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(1).toMillis();
+    private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(5).toMillis();
     /**
      * Maximum delay that can be applied after a failure.
      */

+ 66 - 14
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.transform.notifications;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
@@ -14,7 +16,9 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.xpack.core.common.notifications.Level;
+import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -35,6 +39,7 @@ import static org.mockito.Mockito.when;
 public class MockTransformAuditor extends TransformAuditor {
 
     private static final String MOCK_NODE_NAME = "mock_node_name";
+    private static final Logger logger = LogManager.getLogger(MockTransformAuditor.class);
 
     @SuppressWarnings("unchecked")
     public static MockTransformAuditor createMockAuditor() {
@@ -97,18 +102,26 @@ public class MockTransformAuditor extends TransformAuditor {
     }
 
     public abstract static class AbstractAuditExpectation implements AuditExpectation {
-        protected final String expectedName;
+        protected final String name;
         protected final Level expectedLevel;
         protected final String expectedResourceId;
         protected final String expectedMessage;
-        volatile boolean saw;
-
-        public AbstractAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
-            this.expectedName = expectedName;
+        protected final int expectedCount;
+        volatile int count;
+
+        public AbstractAuditExpectation(
+            String name,
+            Level expectedLevel,
+            String expectedResourceId,
+            String expectedMessage,
+            int expectedCount
+        ) {
+            this.name = name;
             this.expectedLevel = expectedLevel;
             this.expectedResourceId = expectedResourceId;
             this.expectedMessage = expectedMessage;
-            this.saw = false;
+            this.expectedCount = expectedCount;
+            this.count = 0;
         }
 
         @Override
@@ -116,11 +129,11 @@ public class MockTransformAuditor extends TransformAuditor {
             if (level.equals(expectedLevel) && resourceId.equals(expectedResourceId) && innerMatch(level, resourceId, message)) {
                 if (Regex.isSimpleMatchPattern(expectedMessage)) {
                     if (Regex.simpleMatch(expectedMessage, message)) {
-                        saw = true;
+                        ++count;
                     }
                 } else {
                     if (message.contains(expectedMessage)) {
-                        saw = true;
+                        ++count;
                     }
                 }
             }
@@ -131,31 +144,70 @@ public class MockTransformAuditor extends TransformAuditor {
         }
     }
 
+    /**
+     * Expectation to assert a certain audit message has been issued once or multiple times.
+     */
     public static class SeenAuditExpectation extends AbstractAuditExpectation {
 
-        public SeenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
-            super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
+        /**
+         * Expectation to match an audit exactly once.
+         *
+         * @param name name of the expected audit, free of choice, used for the assert message
+         * @param expectedLevel The expected level of the audit
+         * @param expectedResourceId The expected resource id
+         * @param expectedMessage Expected message of the audit, supports simple wildcard matching
+         */
+        public SeenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) {
+            super(name, expectedLevel, expectedResourceId, expectedMessage, 1);
+        }
+
+        /**
+         * Expectation to match an audit a certain number of times.
+         *
+         * @param name name of the expected audit, free of choice, used for the assert message
+         * @param expectedLevel The expected level of the audit
+         * @param expectedResourceId The expected resource id
+         * @param expectedMessage Expected message of the audit, supports simple wildcard matching
+         * @param expectedCount Expected number of times the audit should be matched
+         */
+        public SeenAuditExpectation(
+            String name,
+            Level expectedLevel,
+            String expectedResourceId,
+            String expectedMessage,
+            int expectedCount
+        ) {
+            super(name, expectedLevel, expectedResourceId, expectedMessage, expectedCount);
         }
 
         @Override
         public void assertMatched() {
-            assertThat("expected to see " + expectedName + " but did not", saw, equalTo(true));
+            assertThat(
+                "expected to see " + name + " " + expectedCount + " times but saw it " + count + " times ",
+                count,
+                equalTo(expectedCount)
+            );
         }
     }
 
+    /**
+     * Expectation to assert a certain audit message is not issued.
+     */
     public static class UnseenAuditExpectation extends AbstractAuditExpectation {
 
-        public UnseenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) {
-            super(expectedName, expectedLevel, expectedResourceId, expectedMessage);
+        public UnseenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) {
+            super(name, expectedLevel, expectedResourceId, expectedMessage, 0);
         }
 
         @Override
         public void assertMatched() {
-            assertThat("expected not to see " + expectedName + " but did", saw, equalTo(false));
+            assertThat("expected not to see " + name + " but did", count, equalTo(expectedCount));
         }
     }
 
     private void audit(Level level, String resourceId, String message) {
+        logger.info("AUDIT: {}", new TransformAuditMessage(resourceId, message, level, new Date(), MOCK_NODE_NAME));
+
         for (AuditExpectation expectation : expectations) {
             expectation.match(level, resourceId, message);
         }

+ 3 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java

@@ -36,12 +36,13 @@ public class TransformContextTests extends ESTestCase {
 
     public void testFailureCount() {
         TransformContext context = new TransformContext(null, null, 0, listener);
-        assertThat(context.incrementAndGetFailureCount(), is(equalTo(1)));
+        assertThat(context.incrementAndGetFailureCount("some_exception"), is(equalTo(1)));
         assertThat(context.getFailureCount(), is(equalTo(1)));
-        assertThat(context.incrementAndGetFailureCount(), is(equalTo(2)));
+        assertThat(context.incrementAndGetFailureCount("some_other_exception"), is(equalTo(2)));
         assertThat(context.getFailureCount(), is(equalTo(2)));
         context.resetReasonAndFailureCounter();
         assertThat(context.getFailureCount(), is(equalTo(0)));
+        assertThat(context.getLastFailure(), is(nullValue()));
 
         // Verify that the listener is notified every time the failure count is incremented or reset
         verify(listener, times(3)).failureCountChanged();

+ 174 - 18
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
@@ -689,8 +690,8 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
                 "timed out during dbq",
                 Level.WARNING,
                 transformId,
-                "Transform encountered an exception: org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq;"
-                    + " Will attempt again at next scheduled trigger."
+                "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];"
+                    + " Will automatically retry [1/10]"
             )
         );
         TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
@@ -835,28 +836,170 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
         assertEquals(0, context.getFailureCount());
     }
 
+    // tests throttling of audits on logs based on repeated exception types
+    public void testHandleFailureAuditing() {
+        String transformId = randomAlphaOfLength(10);
+        TransformConfig config = new TransformConfig.Builder().setId(transformId)
+            .setSource(randomSourceConfig())
+            .setDest(randomDestConfig())
+            .setSyncConfig(new TimeSyncConfig("time", TimeSyncConfig.DEFAULT_DELAY))
+            .setPivotConfig(randomPivotConfig())
+            .build();
+
+        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+        Function<SearchRequest, SearchResponse> searchFunction = request -> mock(SearchResponse.class);
+        Function<BulkRequest, BulkResponse> bulkFunction = request -> mock(BulkResponse.class);
+
+        final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
+        final AtomicReference<String> failureMessage = new AtomicReference<>();
+        Consumer<String> failureConsumer = message -> {
+            failIndexerCalled.compareAndSet(false, true);
+            failureMessage.compareAndSet(null, message);
+        };
+
+        MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
+        TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
+        TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
+
+        auditor.addExpectation(
+            new MockTransformAuditor.SeenAuditExpectation(
+                "timeout_exception_1",
+                Level.WARNING,
+                transformId,
+                "Transform encountered an exception: [*ElasticsearchTimeoutException: timeout_1]; Will automatically retry [1/"
+                    + Transform.DEFAULT_FAILURE_RETRIES
+                    + "]"
+            )
+        );
+
+        auditor.addExpectation(
+            new MockTransformAuditor.SeenAuditExpectation(
+                "bulk_exception_1",
+                Level.WARNING,
+                transformId,
+                "Transform encountered an exception: [*BulkIndexingException: bulk_exception_1*]; Will automatically retry [2/"
+                    + Transform.DEFAULT_FAILURE_RETRIES
+                    + "]"
+            )
+        );
+
+        auditor.addExpectation(
+            new MockTransformAuditor.SeenAuditExpectation(
+                "timeout_exception_2",
+                Level.WARNING,
+                transformId,
+                "Transform encountered an exception: [*ElasticsearchTimeoutException: timeout_2]; Will automatically retry [3/"
+                    + Transform.DEFAULT_FAILURE_RETRIES
+                    + "]"
+            )
+        );
+
+        auditor.addExpectation(
+            new MockTransformAuditor.SeenAuditExpectation(
+                "bulk_exception_2",
+                Level.WARNING,
+                transformId,
+                "Transform encountered an exception: [*BulkIndexingException: bulk_exception_2*]; Will automatically retry [6/"
+                    + Transform.DEFAULT_FAILURE_RETRIES
+                    + "]"
+            )
+        );
+
+        MockedTransformIndexer indexer = createMockIndexer(
+            config,
+            state,
+            searchFunction,
+            bulkFunction,
+            null,
+            failureConsumer,
+            threadPool,
+            ThreadPool.Names.GENERIC,
+            auditor,
+            context
+        );
+
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_1")) }
+            )
+        );
+
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] {
+                    new ShardSearchFailure(
+                        new BulkIndexingException("bulk_exception_1", new EsRejectedExecutionException("full queue"), false)
+                    ) }
+            )
+        );
+
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) }
+            )
+        );
+
+        // not logged
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) }
+            )
+        );
+
+        // not logged
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) }
+            )
+        );
+
+        indexer.handleFailure(
+            new SearchPhaseExecutionException(
+                "query",
+                "Partial shards failure",
+                new ShardSearchFailure[] {
+                    new ShardSearchFailure(
+                        new BulkIndexingException("bulk_exception_2", new EsRejectedExecutionException("full queue"), false)
+                    ) }
+            )
+        );
+
+        auditor.assertAllExpectationsMatched();
+    }
+
     public void testHandleFailure() {
-        testHandleFailure(0, 5, 0);
-        testHandleFailure(5, 0, 5);
-        testHandleFailure(3, 5, 3);
-        testHandleFailure(5, 3, 5);
-        testHandleFailure(0, null, 0);
-        testHandleFailure(3, null, 3);
-        testHandleFailure(5, null, 5);
-        testHandleFailure(7, null, 7);
-        testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES);
-        testHandleFailure(null, 0, 0);
-        testHandleFailure(null, 3, 3);
-        testHandleFailure(null, 5, 5);
-        testHandleFailure(null, 7, 7);
-        testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES);
-        testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES);
+        testHandleFailure(0, 5, 0, 0);
+        testHandleFailure(5, 0, 5, 2);
+        testHandleFailure(3, 5, 3, 2);
+        testHandleFailure(5, 3, 5, 2);
+        testHandleFailure(0, null, 0, 0);
+        testHandleFailure(3, null, 3, 2);
+        testHandleFailure(5, null, 5, 2);
+        testHandleFailure(7, null, 7, 2);
+        testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES, 2);
+        testHandleFailure(null, 0, 0, 0);
+        testHandleFailure(null, 3, 3, 2);
+        testHandleFailure(null, 5, 5, 2);
+        testHandleFailure(null, 7, 7, 2);
+        testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES, 2);
+        testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES, 2);
     }
 
     private void testHandleFailure(
         Integer configNumFailureRetries,
         Integer contextNumFailureRetries,
-        int expectedEffectiveNumFailureRetries
+        int expectedEffectiveNumFailureRetries,
+        int expecedNumberOfRetryAudits
     ) {
         String transformId = randomAlphaOfLength(10);
         TransformConfig config = new TransformConfig.Builder().setId(transformId)
@@ -885,6 +1028,19 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
             context.setNumFailureRetries(contextNumFailureRetries);
         }
 
+        int indexerRetries = configNumFailureRetries != null ? configNumFailureRetries
+            : contextNumFailureRetries != null ? contextNumFailureRetries
+            : Transform.DEFAULT_FAILURE_RETRIES;
+        auditor.addExpectation(
+            new MockTransformAuditor.SeenAuditExpectation(
+                getTestName(),
+                Level.WARNING,
+                transformId,
+                "Transform encountered an exception: [*]; Will automatically retry [*/" + indexerRetries + "]",
+                expecedNumberOfRetryAudits
+            )
+        );
+
         MockedTransformIndexer indexer = createMockIndexer(
             config,
             state,

+ 4 - 4
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java

@@ -58,16 +58,16 @@ public class TransformScheduledTaskTests extends ESTestCase {
         {
             TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 1, LISTENER);
             // Verify that the next scheduled time is calculated properly when failure count is greater than 0
-            assertThat(task.getNextScheduledTimeMillis(), is(equalTo(102000L)));
+            assertThat(task.getNextScheduledTimeMillis(), is(equalTo(105000L)));
         }
     }
 
     public void testCalculateNextScheduledTimeAfterFailure() {
         long lastTriggeredTimeMillis = Instant.now().toEpochMilli();
         long[] expectedDelayMillis = {
-            1000,    // 1s
-            2000,    // 2s
-            4000,    // 4s
+            5000,    // 5s
+            5000,    // 5s
+            5000,    // 5s
             8000,    // 8s
             16000,   // 16s
             32000,   // 32s

+ 4 - 3
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java

@@ -149,20 +149,20 @@ public class TransformSchedulerTests extends ESTestCase {
         transformScheduler.handleTransformFailureCountChanged(transformId, 1);
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
-            contains(new TransformScheduledTask(transformId, frequency, 0L, 1, 2 * 1000, listener))
+            contains(new TransformScheduledTask(transformId, frequency, 0L, 1, 5 * 1000, listener))
         );
         assertThat(events, hasSize(1));
 
         transformScheduler.processScheduledTasks();
         assertThat(
             transformScheduler.getTransformScheduledTasks(),
-            contains(new TransformScheduledTask(transformId, frequency, 60 * 1000L, 1, 62 * 1000, listener))
+            contains(new TransformScheduledTask(transformId, frequency, 60 * 1000L, 1, 65 * 1000, listener))
         );
         assertThat(events, hasSize(2));
 
         assertThat(
             events,
-            contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 2 * 1000, 60 * 1000))
+            contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 5 * 1000, 60 * 1000))
         );
 
         transformScheduler.deregisterTransform(transformId);
@@ -396,6 +396,7 @@ public class TransformSchedulerTests extends ESTestCase {
             setCurrentTime(currentTime.plus(duration));
         }
 
+        @Override
         public Instant instant() {
             return currentTime;
         }