Browse Source

[ML] data frame, verify primary shards are active for configs index before task start (#41551)

Benjamin Trent 6 years ago
parent
commit
df777fee21

+ 35 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

@@ -11,7 +11,11 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
 import org.elasticsearch.persistent.PersistentTaskState;
@@ -32,9 +36,12 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.DataFrame;
 import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
 import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -67,6 +74,34 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
         this.threadPool = threadPool;
     }
 
+    @Override
+    public PersistentTasksCustomMetaData.Assignment getAssignment(DataFrameTransform params, ClusterState clusterState) {
+        List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState);
+        if (unavailableIndices.size() != 0) {
+            String reason = "Not starting data frame transform [" + params.getId() + "], " +
+                "because not all primary shards are active for the following indices [" +
+                String.join(",", unavailableIndices) + "]";
+            logger.debug(reason);
+            return new PersistentTasksCustomMetaData.Assignment(null, reason);
+        }
+        return super.getAssignment(params, clusterState);
+    }
+
+    static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState) {
+        IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
+        String[] indices = resolver.concreteIndexNames(clusterState,
+            IndicesOptions.lenientExpandOpen(),
+            DataFrameInternalIndex.INDEX_TEMPLATE_PATTERN + "*");
+        List<String> unavailableIndices = new ArrayList<>(indices.length);
+        for (String index : indices) {
+            IndexRoutingTable routingTable = clusterState.getRoutingTable().index(index);
+            if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
+                unavailableIndices.add(index);
+            }
+        }
+        return unavailableIndices;
+    }
+
     @Override
     protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) {
         final String transformId = params.getId();

+ 88 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java

@@ -0,0 +1,88 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.transforms;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
+
+    public void testVerifyIndicesPrimaryShardsAreActive() {
+        MetaData.Builder metaData = MetaData.builder();
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        addIndices(metaData, routingTable);
+
+        ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
+        csBuilder.routingTable(routingTable.build());
+        csBuilder.metaData(metaData);
+
+        ClusterState cs = csBuilder.build();
+        assertEquals(0, DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs).size());
+
+        metaData = new MetaData.Builder(cs.metaData());
+        routingTable = new RoutingTable.Builder(cs.routingTable());
+        String indexToRemove = DataFrameInternalIndex.INDEX_NAME;
+        if (randomBoolean()) {
+            routingTable.remove(indexToRemove);
+        } else {
+            Index index = new Index(indexToRemove, "_uuid");
+            ShardId shardId = new ShardId(index, 0);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
+            shardRouting = shardRouting.initialize("node_id", null, 0L);
+            routingTable.add(IndexRoutingTable.builder(index)
+                .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
+        }
+
+        csBuilder.routingTable(routingTable.build());
+        csBuilder.metaData(metaData);
+        List<String> result = DataFrameTransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build());
+        assertEquals(1, result.size());
+        assertEquals(indexToRemove, result.get(0));
+    }
+
+    private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) {
+        List<String> indices = new ArrayList<>();
+        indices.add(DataFrameInternalIndex.AUDIT_INDEX);
+        indices.add(DataFrameInternalIndex.INDEX_NAME);
+        for (String indexName : indices) {
+            IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
+            indexMetaData.settings(Settings.builder()
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
+            );
+            metaData.put(indexMetaData);
+            Index index = new Index(indexName, "_uuid");
+            ShardId shardId = new ShardId(index, 0);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, ""));
+            shardRouting = shardRouting.initialize("node_id", null, 0L);
+            shardRouting = shardRouting.moveToStarted();
+            routingTable.add(IndexRoutingTable.builder(index)
+                .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build()));
+        }
+    }
+
+}