|
@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.elasticsearch.common.util.CancellableThreads;
|
|
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
|
|
|
|
|
|
import java.util.Date;
|
|
@@ -94,8 +95,15 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
|
|
|
try {
|
|
|
taskToWaitFor.get();
|
|
|
} catch (ExecutionException e) {
|
|
|
- // This shouldn't happen, because we catch normalization errors inside the normalization loop
|
|
|
+ if (e.getCause() instanceof CancellableThreads.ExecutionCancelledException) {
|
|
|
+ // This happens if reset mode is enabled while result writes are being retried.
|
|
|
+ // Reset mode means the job whose results were being normalized will shortly
|
|
|
+ // cease to exist, so it's fine to consider the wait for renormalization complete.
|
|
|
+ break;
|
|
|
+ }
|
|
|
logger.error("[" + jobId + "] Error propagated from normalization", e);
|
|
|
+ // Don't loop again for the same task that caused the error
|
|
|
+ taskToWaitFor = null;
|
|
|
} catch (CancellationException e) {
|
|
|
// Convert cancellations to interruptions to simplify the interface
|
|
|
throw new InterruptedException("Normalization cancelled");
|