|
@@ -237,7 +237,10 @@ public class TransformContinuousIT extends ESRestTestCase {
|
|
|
|
|
|
// start all transforms, wait until the processed all data and stop them
|
|
|
startTransforms();
|
|
|
- waitUntilTransformsReachedUpperBound(runDate.getEpochSecond() * 1000 + 1);
|
|
|
+
|
|
|
+ // at random we added between 0 and 999_999ns == (1ms - 1ns) to every data point, so we add 1ms, so every data point is before
|
|
|
+ // the checkpoint
|
|
|
+ waitUntilTransformsReachedUpperBound(runDate.toEpochMilli() + 1, run);
|
|
|
stopTransforms();
|
|
|
|
|
|
// TODO: the transform dest index requires a refresh, see gh#51154
|
|
@@ -387,11 +390,12 @@ public class TransformContinuousIT extends ESRestTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis) throws Exception {
|
|
|
+ private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis, int iteration) throws Exception {
|
|
|
logger.info(
|
|
|
- "wait until transform reaches timestamp_millis: {}",
|
|
|
+ "wait until transform reaches timestamp_millis: {} iteration: {}",
|
|
|
ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
|
|
|
- .format(Instant.ofEpochMilli(timeStampUpperBoundMillis))
|
|
|
+ .format(Instant.ofEpochMilli(timeStampUpperBoundMillis)),
|
|
|
+ iteration
|
|
|
);
|
|
|
for (ContinuousTestCase testCase : transformTestCases) {
|
|
|
assertBusy(() -> {
|