Browse Source

[ML] Report index unavailable instead of waiting for lazy node (#38423)

If a job cannot be assigned to a node because an index it
requires is unavailable and there are lazy ML nodes then
index unavailable should be reported as the assignment
explanation rather than waiting for a lazy ML node.
David Roberts 6 years ago
parent
commit
92bc681705

+ 28 - 26
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -89,7 +89,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
 
     private final XPackLicenseState licenseState;
     private final PersistentTasksService persistentTasksService;
-    private final Client client;
     private final JobConfigProvider jobConfigProvider;
     private final MlMemoryTracker memoryTracker;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
@@ -98,13 +97,12 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
     public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
                                   XPackLicenseState licenseState, ClusterService clusterService,
                                   PersistentTasksService persistentTasksService, ActionFilters actionFilters,
-                                  IndexNameExpressionResolver indexNameExpressionResolver, Client client,
+                                  IndexNameExpressionResolver indexNameExpressionResolver,
                                   JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
         super(OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
                 OpenJobAction.Request::new);
         this.licenseState = licenseState;
         this.persistentTasksService = persistentTasksService;
-        this.client = client;
         this.jobConfigProvider = jobConfigProvider;
         this.memoryTracker = memoryTracker;
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
@@ -136,32 +134,15 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
                                                                             int maxConcurrentJobAllocations,
                                                                             int maxMachineMemoryPercent,
                                                                             MlMemoryTracker memoryTracker,
+                                                                            boolean isMemoryTrackerRecentlyRefreshed,
                                                                             Logger logger) {
-        String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
-        List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
-        if (unavailableIndices.size() != 0) {
-            String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
-                    String.join(",", unavailableIndices) + "]";
-            logger.debug(reason);
-            return new PersistentTasksCustomMetaData.Assignment(null, reason);
-        }
 
         // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
         // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
-        boolean allocateByMemory = true;
-
-        if (memoryTracker.isRecentlyRefreshed() == false) {
-
-            boolean scheduledRefresh = memoryTracker.asyncRefresh();
-            if (scheduledRefresh) {
-                String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
-                logger.debug(reason);
-                return new PersistentTasksCustomMetaData.Assignment(null, reason);
-            } else {
-                allocateByMemory = false;
-                logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
-                    jobId);
-            }
+        boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
+        if (isMemoryTrackerRecentlyRefreshed == false) {
+            logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
+                jobId);
         }
 
         List<String> reasons = new LinkedList<>();
@@ -592,12 +573,33 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
                 return AWAITING_UPGRADE;
             }
 
-            PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
+            String jobId = params.getJobId();
+            String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
+            List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
+            if (unavailableIndices.size() != 0) {
+                String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
+                    String.join(",", unavailableIndices) + "]";
+                logger.debug(reason);
+                return new PersistentTasksCustomMetaData.Assignment(null, reason);
+            }
+
+            boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
+            if (isMemoryTrackerRecentlyRefreshed == false) {
+                boolean scheduledRefresh = memoryTracker.asyncRefresh();
+                if (scheduledRefresh) {
+                    String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
+                    logger.debug(reason);
+                    return new PersistentTasksCustomMetaData.Assignment(null, reason);
+                }
+            }
+
+            PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
                 params.getJob(),
                 clusterState,
                 maxConcurrentJobAllocations,
                 maxMachineMemoryPercent,
                 memoryTracker,
+                isMemoryTrackerRecentlyRefreshed,
                 logger);
             if (assignment.getExecutorNode() == null) {
                 int numMlNodes = 0;

+ 56 - 20
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -60,11 +61,9 @@ import org.junit.Before;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -79,11 +78,13 @@ import static org.mockito.Mockito.when;
 public class TransportOpenJobActionTests extends ESTestCase {
 
     private MlMemoryTracker memoryTracker;
+    private boolean isMemoryTrackerRecentlyRefreshed;
 
     @Before
     public void setup() {
         memoryTracker = mock(MlMemoryTracker.class);
-        when(memoryTracker.isRecentlyRefreshed()).thenReturn(true);
+        isMemoryTrackerRecentlyRefreshed = true;
+        when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed);
     }
 
     public void testValidate_jobMissing() {
@@ -141,7 +142,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         jobBuilder.setJobVersion(Version.CURRENT);
 
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
-                cs.build(), 2, 30, memoryTracker, logger);
+                cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
         assertEquals("", result.getExplanation());
         assertEquals("_node_id3", result.getExecutorNode());
     }
@@ -178,7 +179,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
 
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
-                30, memoryTracker, logger);
+                30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
         assertNull(result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
                 + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
@@ -204,7 +205,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
 
-        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, logger);
+        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker,
+            isMemoryTrackerRecentlyRefreshed, logger);
         assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
         assertNull(result.getExecutorNode());
     }
@@ -239,7 +241,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
 
         ClusterState cs = csBuilder.build();
-        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, logger);
+        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker,
+            isMemoryTrackerRecentlyRefreshed, logger);
         assertEquals("_node_id3", result.getExecutorNode());
 
         tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@@ -249,7 +252,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
         cs = csBuilder.build();
-        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
+        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
+            logger);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
 
@@ -260,7 +264,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
         cs = csBuilder.build();
-        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
+        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
+            logger);
         assertNull("no node selected, because stale task", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
 
@@ -271,7 +276,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
         cs = csBuilder.build();
-        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
+        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
+            logger);
         assertNull("no node selected, because null state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -310,7 +316,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
 
         // Allocation won't be possible if the stale failed job is treated as opening
-        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger);
+        Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker,
+            isMemoryTrackerRecentlyRefreshed, logger);
         assertEquals("_node_id1", result.getExecutorNode());
 
         tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@@ -320,7 +327,8 @@ public class TransportOpenJobActionTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
         cs = csBuilder.build();
-        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, logger);
+        result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed,
+            logger);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -353,7 +361,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
         cs.metaData(metaData);
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30,
-            memoryTracker, logger);
+            memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
         assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
         assertNull(result.getExecutorNode());
     }
@@ -384,7 +392,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
         cs.metaData(metaData);
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(),
-                2, 30, memoryTracker, logger);
+                2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
         assertThat(result.getExplanation(), containsString(
                 "because the job's model snapshot requires a node of version [6.3.0] or higher"));
         assertNull(result.getExecutorNode());
@@ -413,7 +421,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
 
         Job job = jobWithRules("job_with_rules");
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
-            logger);
+            isMemoryTrackerRecentlyRefreshed, logger);
         assertThat(result.getExplanation(), containsString(
                 "because jobs using custom_rules require a node of version [6.4.0] or higher"));
         assertNull(result.getExecutorNode());
@@ -442,7 +450,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
 
         Job job = jobWithRules("job_with_rules");
         Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker,
-            logger);
+            isMemoryTrackerRecentlyRefreshed, logger);
         assertNotNull(result.getExecutorNode());
     }
 
@@ -529,10 +537,10 @@ public class TransportOpenJobActionTests extends ESTestCase {
 
     public void testGetAssignment_GivenJobThatRequiresMigration() {
         ClusterService clusterService = mock(ClusterService.class);
-        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(
-                Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
-                        MachineLearning.MAX_LAZY_ML_NODES)
-        ));
+        ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
+            Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
+                MachineLearning.MAX_LAZY_ML_NODES)
+        );
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
 
         TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
@@ -542,6 +550,34 @@ public class TransportOpenJobActionTests extends ESTestCase {
         assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class)));
     }
 
+    // An index being unavailable should take precedence over waiting for a lazy node
+    public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() {
+        Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 1).build();
+        ClusterService clusterService = mock(ClusterService.class);
+        ClusterSettings clusterSettings = new ClusterSettings(settings,
+            Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
+                MachineLearning.MAX_LAZY_ML_NODES)
+        );
+        when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+
+        ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
+        MetaData.Builder metaData = MetaData.builder();
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        addIndices(metaData, routingTable);
+        routingTable.remove(".ml-state");
+        csBuilder.metaData(metaData);
+        csBuilder.routingTable(routingTable.build());
+
+        TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
+            settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
+
+        OpenJobAction.JobParams params = new OpenJobAction.JobParams("unavailable_index_with_lazy_node");
+        params.setJob(mock(Job.class));
+        assertEquals("Not opening job [unavailable_index_with_lazy_node], " +
+            "because not all primary shards are active for the following indices [.ml-state]",
+            executor.getAssignment(params, csBuilder.build()).getExplanation());
+    }
+
     public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
         addJobTask(jobId, nodeId, jobState, builder, false);
     }