Browse Source

[ML] Avoid memory tracker race condition (#69290)

This change fixes a race condition that can occur if the
return value of memoryTracker.isRecentlyRefreshed() changes
between two calls that are assumed to return the same value.
The solution is to just call the method once and pass that
value to the other place where it is needed.  Then all related
code makes decisions based on the same view of whether the
memory tracker has been recently refreshed or not.

Fixes #69289
David Roberts 4 years ago
parent
commit
cf7e6b320f

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -614,7 +614,8 @@ public class TransportStartDataFrameAnalyticsAction
         @Override
         public PersistentTasksCustomMetadata.Assignment getAssignment(TaskParams params, ClusterState clusterState) {
             boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
-            Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
+            Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
+                getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
             if (optionalAssignment.isPresent()) {
                 return optionalAssignment.get();
             }

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java

@@ -78,7 +78,8 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
     @Override
     public PersistentTasksCustomMetadata.Assignment getAssignment(SnapshotUpgradeTaskParams params, ClusterState clusterState) {
         boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
-        Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
+        Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
+            getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
         if (optionalAssignment.isPresent()) {
             return optionalAssignment.get();
         }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -114,7 +114,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
             return AWAITING_MIGRATION;
         }
         boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
-        Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState);
+        Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
         if (optionalAssignment.isPresent()) {
             return optionalAssignment.get();
         }

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java

@@ -152,7 +152,8 @@ public abstract class AbstractJobPersistentTasksExecutor<Params extends Persiste
         return true;
     }
 
-    public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState) {
+    public Optional<PersistentTasksCustomMetadata.Assignment> getPotentialAssignment(Params params, ClusterState clusterState,
+                                                                                     boolean isMemoryTrackerRecentlyRefreshed) {
         // If we are waiting for an upgrade to complete, we should not assign to a node
         if (MlMetadata.getMlMetadata(clusterState).isUpgradeMode()) {
             return Optional.of(AWAITING_UPGRADE);
@@ -165,7 +166,7 @@ public abstract class AbstractJobPersistentTasksExecutor<Params extends Persiste
         if (missingIndices.isPresent()) {
             return missingIndices;
         }
-        Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId);
+        Optional<PersistentTasksCustomMetadata.Assignment> staleMemory = checkMemoryFreshness(jobId, isMemoryTrackerRecentlyRefreshed);
         if (staleMemory.isPresent()) {
             return staleMemory;
         }
@@ -212,8 +213,7 @@ public abstract class AbstractJobPersistentTasksExecutor<Params extends Persiste
         return Optional.empty();
     }
 
-    public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId) {
-        boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
+    public Optional<PersistentTasksCustomMetadata.Assignment> checkMemoryFreshness(String jobId, boolean isMemoryTrackerRecentlyRefreshed) {
         if (isMemoryTrackerRecentlyRefreshed == false) {
             boolean scheduledRefresh = memoryTracker.asyncRefresh();
             if (scheduledRefresh) {