Sfoglia il codice sorgente

[ML] Make autoscaling and task assignment use same memory staleness definition (#86632)

Previously autoscaling used a staleness definition of ~7 minutes
and task assignment used a staleness definition of ~90seconds.

This could lead to the assignment explanation of tasks not being
"awaiting lazy assignment" at the moment when autoscaling ran,
thus preventing an autoscaling decision being made.

The solution is to always use the same definition of staleness in
the memory tracker. This is set to the maximum of what the two
interested parties suggest.

Fixes #86616
David Roberts 3 anni fa
parent
commit
4a95624ab7

+ 6 - 0
docs/changelog/86632.yaml

@@ -0,0 +1,6 @@
+pr: 86632
+summary: Make autoscaling and task assignment use same memory staleness definition
+area: Machine Learning
+type: bug
+issues:
+ - 86616

+ 13 - 19
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

@@ -367,13 +367,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
         if (isMaster == false) {
             throw new IllegalArgumentException("request for scaling information is only allowed on the master node");
         }
-        final Duration memoryTrackingStale;
-        long previousTimeStamp = this.lastTimeToScale;
-        this.lastTimeToScale = this.timeSupplier.getAsLong();
-        if (previousTimeStamp == 0L) {
-            memoryTrackingStale = DEFAULT_MEMORY_REFRESH_RATE;
-        } else {
-            memoryTrackingStale = Duration.ofMillis(TimeValue.timeValueMinutes(1).millis() + this.lastTimeToScale - previousTimeStamp);
+        long previousTimeStamp = lastTimeToScale;
+        lastTimeToScale = timeSupplier.getAsLong();
+        if (previousTimeStamp > 0L && lastTimeToScale > previousTimeStamp) {
+            mlMemoryTracker.setAutoscalingCheckInterval(Duration.ofMillis(lastTimeToScale - previousTimeStamp));
         }
 
         final ClusterState clusterState = context.state();
@@ -466,8 +463,11 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             );
         }
 
-        if (mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false) {
-            logger.debug("view of job memory is stale given duration [{}]. Not attempting to make scaling decision", memoryTrackingStale);
+        if (mlMemoryTracker.isRecentlyRefreshed() == false) {
+            logger.debug(
+                "view of job memory is stale given duration [{}]. Not attempting to make scaling decision",
+                mlMemoryTracker.getStalenessDuration()
+            );
             return buildDecisionAndRequestRefresh(reasonBuilder);
         }
         // We need the current node loads to determine if we need to scale up or down
@@ -508,12 +508,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             );
         }
 
-        Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureAvailableCapacity(
-            tasks,
-            memoryTrackingStale,
-            mlNodes,
-            clusterState
-        );
+        Optional<NativeMemoryCapacity> futureFreedCapacity = calculateFutureAvailableCapacity(tasks, mlNodes, clusterState);
 
         final Optional<AutoscalingDeciderResult> scaleUpDecision = checkForScaleUp(
             numAnomalyJobsInQueue,
@@ -539,7 +534,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             resetScaleDownCoolDown();
             return noScaleResultOrRefresh(
                 reasonBuilder,
-                mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false,
+                mlMemoryTracker.isRecentlyRefreshed() == false,
                 new AutoscalingDeciderResult(
                     context.currentCapacity(),
                     reasonBuilder.setSimpleReason(
@@ -687,7 +682,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
 
         return noScaleResultOrRefresh(
             reasonBuilder,
-            mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false,
+            mlMemoryTracker.isRecentlyRefreshed() == false,
             new AutoscalingDeciderResult(
                 context.currentCapacity(),
                 reasonBuilder.setSimpleReason("Passing currently perceived capacity as no scaling changes were detected to be possible")
@@ -964,11 +959,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
      */
     Optional<NativeMemoryCapacity> calculateFutureAvailableCapacity(
         PersistentTasksCustomMetadata tasks,
-        Duration jobMemoryExpiry,
         Collection<DiscoveryNode> mlNodes,
         ClusterState clusterState
     ) {
-        if (mlMemoryTracker.isRecentlyRefreshed(jobMemoryExpiry) == false) {
+        if (mlMemoryTracker.isRecentlyRefreshed() == false) {
             return Optional.empty();
         }
         final List<PersistentTask<DatafeedParams>> jobsWithLookbackDatafeeds = datafeedTasks(tasks).stream()

+ 21 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

@@ -43,6 +43,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -67,6 +68,7 @@ import java.util.stream.Stream;
 public class MlMemoryTracker implements LocalNodeMasterListener {
 
     private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1);
+    private static final Duration DEFAULT_AUTOSCALING_CHECK_INTERVAL = Duration.ofMinutes(5);
 
     private final Logger logger = LogManager.getLogger(MlMemoryTracker.class);
     private final Map<String, Long> memoryRequirementByAnomalyDetectorJob = new ConcurrentHashMap<>();
@@ -85,6 +87,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
     private volatile boolean stopped;
     private volatile Instant lastUpdateTime;
     private volatile Duration reassignmentRecheckInterval;
+    private volatile Duration autoscalingCheckInterval = DEFAULT_AUTOSCALING_CHECK_INTERVAL;
 
     public MlMemoryTracker(
         Settings settings,
@@ -121,6 +124,10 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
         reassignmentRecheckInterval = Duration.ofNanos(recheckInterval.getNanos());
     }
 
+    public void setAutoscalingCheckInterval(Duration autoscalingCheckInterval) {
+        this.autoscalingCheckInterval = Objects.requireNonNull(autoscalingCheckInterval);
+    }
+
     @Override
     public void onMaster() {
         isMaster = true;
@@ -196,18 +203,21 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
      * for valid task assignment decisions to be made using it?
      */
     public boolean isRecentlyRefreshed() {
-        return isRecentlyRefreshed(reassignmentRecheckInterval);
+        Instant localLastUpdateTime = lastUpdateTime;
+        return isMaster && localLastUpdateTime != null && localLastUpdateTime.plus(getStalenessDuration()).isAfter(Instant.now());
     }
 
     /**
-     * Is the information in this object sufficiently up to date
-     * for valid task assignment decisions to be made using it?
+     * @return The definition of "staleness" used by {@link #isRecentlyRefreshed()}. This method is intended only as
+     *         a debugging aid, as calling it separately to {@link #isRecentlyRefreshed()} could return a different
+     *         number if settings were modified in between the two calls.
      */
-    public boolean isRecentlyRefreshed(Duration customDuration) {
-        Instant localLastUpdateTime = lastUpdateTime;
-        return isMaster
-            && localLastUpdateTime != null
-            && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(customDuration).isAfter(Instant.now());
+    public Duration getStalenessDuration() {
+        return max(reassignmentRecheckInterval, autoscalingCheckInterval).plus(RECENT_UPDATE_THRESHOLD);
+    }
+
+    static Duration max(Duration first, Duration second) {
+        return first.compareTo(second) > 0 ? first : second;
     }
 
     /**
@@ -404,12 +414,13 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
                     for (ActionListener<Void> listener : fullRefreshCompletionListeners) {
                         listener.onResponse(null);
                     }
-                    logger.trace("ML memory tracker last update time now [{}] and listeners called", lastUpdateTime);
+                    logger.debug("ML memory tracker last update time now [{}] and listeners called", lastUpdateTime);
                 } else {
                     Exception e = new NotMasterException("Node ceased to be master during ML memory tracker refresh");
                     for (ActionListener<Void> listener : fullRefreshCompletionListeners) {
                         listener.onFailure(e);
                     }
+                    logger.debug(e.getMessage());
                 }
                 fullRefreshCompletionListeners.clear();
             }
@@ -514,7 +525,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
         if (stopPhaser.register() != phase.get()) {
             // Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction
             stopPhaser.arriveAndDeregister();
-            logger.info(() -> new ParameterizedMessage("[{}] not refreshing anomaly detector memory as node is shutting down", jobId));
+            logger.info("[{}] not refreshing anomaly detector memory as node is shutting down", jobId);
             listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down"));
             return;
         }

+ 1 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java

@@ -44,7 +44,6 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 import org.junit.Before;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -137,7 +136,7 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
     @Before
     public void setup() {
         mlMemoryTracker = mock(MlMemoryTracker.class);
-        when(mlMemoryTracker.isRecentlyRefreshed(any())).thenReturn(true);
+        when(mlMemoryTracker.isRecentlyRefreshed()).thenReturn(true);
         when(mlMemoryTracker.asyncRefresh()).thenReturn(true);
         when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(any())).thenReturn(TEST_JOB_SIZE);
         when(mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(any())).thenReturn(TEST_JOB_SIZE);
@@ -1138,7 +1137,6 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
         Collection<DiscoveryNode> nodesInCluster = clusterState.getNodes().getNodes().values();
         Optional<NativeMemoryCapacity> nativeMemoryCapacity = service.calculateFutureAvailableCapacity(
             clusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE),
-            Duration.ofMillis(randomIntBetween(1, 10000)), // exact value shouldn't matter
             nodesInCluster,
             clusterState
         );

+ 8 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ml.job.JobManager;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.junit.Before;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -379,6 +381,12 @@ public class MlMemoryTrackerTests extends ESTestCase {
         assertEquals("Couldn't run ML memory update - node is shutting down", exception.get().getMessage());
     }
 
+    public void testMaxDuration() {
+        assertThat(MlMemoryTracker.max(Duration.ofMinutes(1), Duration.ofMinutes(2)), equalTo(Duration.ofMinutes(2)));
+        assertThat(MlMemoryTracker.max(Duration.ofMinutes(4), Duration.ofMinutes(3)), equalTo(Duration.ofMinutes(4)));
+        assertThat(MlMemoryTracker.max(Duration.ofMinutes(5), Duration.ofMinutes(5)), equalTo(Duration.ofMinutes(5)));
+    }
+
     private PersistentTasksCustomMetadata.PersistentTask<OpenJobAction.JobParams> makeTestAnomalyDetectorTask(String jobId) {
         return new PersistentTasksCustomMetadata.PersistentTask<>(
             MlTasks.jobTaskId(jobId),