فهرست منبع

Put a fake allocation id on allocate stale primary command (#34140)

removes fake allocation id after recovery is done

Relates to #33432
Vladimir Dolzhenko 7 سال پیش
والد
کامیت
f789d49fb3

+ 13 - 6
server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

@@ -140,12 +140,19 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
                 }
 
                 if (shardRouting.primary() && shardRouting.initializing() &&
-                    shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
-                    inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
-                    throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
-                        "a known allocation id but has no corresponding entry in the in-sync " +
-                        "allocation set " + inSyncAllocationIds);
-
+                    shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
+                    if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) {
+                        if (inSyncAllocationIds.size() != 1) {
+                            throw new IllegalStateException("a primary shard routing " + shardRouting
+                                + " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " +
+                                "allocation set " + inSyncAllocationIds);
+                        }
+                    } else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) {
+                        throw new IllegalStateException("a primary shard routing " + shardRouting
+                            + " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " +
+                            "allocation set " + inSyncAllocationIds);
+                    }
+                }
             }
         }
         return true;

+ 5 - 0
server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

@@ -132,6 +132,11 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
      * Recovery from an existing on-disk store
      */
     public static final class ExistingStoreRecoverySource extends RecoverySource {
+        /**
+         * Special allocation id that shard has during initialization on allocate_stale_primary
+         */
+        public static final String FORCED_ALLOCATION_ID = "_forced_allocation_";
+
         public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
         public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);
 

+ 25 - 11
server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java

@@ -39,6 +39,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -68,7 +69,16 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
 
     @Override
     public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) {
-        addAllocationId(startedShard);
+        assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId())
+            : "initializingShard.allocationId [" + initializingShard.allocationId().getId()
+            + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same";
+        Updates updates = changes(startedShard.shardId());
+        updates.addedAllocationIds.add(startedShard.allocationId().getId());
+        if (startedShard.primary()
+            // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state
+            && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) {
+            updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID);
+        }
     }
 
     @Override
@@ -144,7 +154,8 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
             oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) {
             // we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating
             // an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand).
-            RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType();
+            RecoverySource recoverySource = updates.initializedPrimary.recoverySource();
+            RecoverySource.Type recoverySourceType = recoverySource.getType();
             boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE;
             assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") +
                 " primary is not force-initialized in same allocation round where shards are started";
@@ -156,9 +167,15 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
                 // forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate)
                 indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet());
             } else {
+                final String allocationId;
+                if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) {
+                    allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
+                } else {
+                    assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource;
+                    allocationId = updates.initializedPrimary.allocationId().getId();
+                }
                 // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
-                indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(),
-                    Collections.singleton(updates.initializedPrimary.allocationId().getId()));
+                indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.singleton(allocationId));
             }
         } else {
             // standard path for updating in-sync ids
@@ -166,6 +183,10 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
             inSyncAllocationIds.addAll(updates.addedAllocationIds);
             inSyncAllocationIds.removeAll(updates.removedAllocationIds);
 
+            assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false
+                || inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false :
+                "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds;
+
             // Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary
             // but repeatedly shut down nodes that have active replicas.
             // We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set
@@ -287,13 +308,6 @@ public class IndexMetaDataUpdater extends RoutingChangesObserver.AbstractRouting
         }
     }
 
-    /**
-     * Add allocation id of this shard to the set of in-sync shard copies
-     */
-    private void addAllocationId(ShardRouting shardRouting) {
-        changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId());
-    }
-
     /**
      * Increase primary term for this shard id
      */

+ 238 - 0
server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java

@@ -0,0 +1,238 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.apache.lucene.store.SimpleFSDirectory;
+import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
+import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MockEngineFactoryPlugin;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.DummyShardLock;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalSettingsPlugin;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.transport.MockTransportService;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
+public class AllocationIdIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
+    }
+
+    public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception {
+        /*
+         * Allocation id is put on start of shard while historyUUID is adjusted after recovery is done.
+         *
+         * If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed
+         * shard restart skips the stage where historyUUID is changed.
+         *
+         * That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and
+         * replica will receive operations after local checkpoint while documents before checkpoints could be significant different.
+         *
+         * Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that)
+         * and any failure during recovery requires extra AllocateStalePrimary command to be executed.
+         */
+
+        // initial set up
+        final String indexName = "index42";
+        final String master = internalCluster().startMasterOnlyNode();
+        String node1 = internalCluster().startNode();
+        createIndex(indexName, Settings.builder()
+            .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
+            .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build());
+        final int numDocs = indexDocs(indexName, "foo", "bar");
+        final IndexSettings indexSettings = getIndexSettings(indexName, node1);
+        final Set<String> allocationIds = getAllocationIds(indexName);
+        final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
+        final Path indexPath = getIndexPath(node1, shardId);
+        assertThat(allocationIds, hasSize(1));
+        final String historyUUID = historyUUID(node1, indexName);
+        String node2 = internalCluster().startNode();
+        ensureGreen(indexName);
+        internalCluster().assertSameDocIdsOnShards();
+        // initial set up is done
+
+        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
+
+        // index more docs to node2 that marks node1 as stale
+        int numExtraDocs = indexDocs(indexName, "foo", "bar2");
+        assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs);
+
+        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));
+
+        // create fake corrupted marker on node1
+        putFakeCorruptionMarker(indexSettings, shardId, indexPath);
+
+        // thanks to master node1 is out of sync
+        node1 = internalCluster().startNode();
+
+        // there is only _stale_ primary
+        checkNoValidShardCopy(indexName, shardId);
+
+        // allocate stale primary
+        client(node1).admin().cluster().prepareReroute()
+            .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
+            .get();
+
+        // allocation fails due to corruption marker
+        assertBusy(() -> {
+            final ClusterState state = client().admin().cluster().prepareState().get().getState();
+            final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard();
+            assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED));
+            assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
+        });
+
+        try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
+            store.removeCorruptionMarker();
+        }
+
+        // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything)
+        checkHealthStatus(indexName, ClusterHealthStatus.RED);
+        checkNoValidShardCopy(indexName, shardId);
+
+        internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK);
+
+        // index is still red due to mismatch of allocation id
+        checkHealthStatus(indexName, ClusterHealthStatus.RED);
+        checkNoValidShardCopy(indexName, shardId);
+
+        // no any valid shard is there; have to invoke AllocateStalePrimary again
+        client().admin().cluster().prepareReroute()
+            .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
+            .get();
+
+        ensureYellow(indexName);
+
+        // bring node2 back
+        node2 = internalCluster().startNode();
+        ensureGreen(indexName);
+
+        assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID)));
+        assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName)));
+
+        internalCluster().assertSameDocIdsOnShards();
+    }
+
+    public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) {
+        final ClusterHealthStatus indexHealthStatus = client().admin().cluster()
+            .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus();
+        assertThat(indexHealthStatus, is(healthStatus));
+    }
+
+    private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
+        // index some docs in several segments
+        int numDocs = 0;
+        for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
+            final int numExtraDocs = between(10, 100);
+            IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs];
+            for (int i = 0; i < builders.length; i++) {
+                builders[i] = client().prepareIndex(indexName, "type").setSource(source);
+            }
+
+            indexRandom(true, false, true, Arrays.asList(builders));
+            numDocs += numExtraDocs;
+        }
+
+        return numDocs;
+    }
+
+    private Path getIndexPath(String nodeName, ShardId shardId) {
+        final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
+        assertThat(indexDirs, hasSize(1));
+        return indexDirs.iterator().next();
+    }
+
+    private Set<String> getAllocationIds(String indexName) {
+        final ClusterState state = client().admin().cluster().prepareState().get().getState();
+        final Set<String> allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0);
+        return allocationIds;
+    }
+
+    private IndexSettings getIndexSettings(String indexName, String nodeName) {
+        final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
+        final IndexService indexService = indicesService.indexService(resolveIndex(indexName));
+        return indexService.getIndexSettings();
+    }
+
+    private String historyUUID(String node, String indexName) {
+        final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards();
+        assertThat(shards.length, greaterThan(0));
+        final Set<String> historyUUIDs = Arrays.stream(shards)
+            .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY))
+            .collect(Collectors.toSet());
+        assertThat(historyUUIDs, hasSize(1));
+        return historyUUIDs.iterator().next();
+    }
+
+    private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException {
+        try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
+            store.markStoreCorrupted(new IOException("fake ioexception"));
+        }
+    }
+
+    private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
+        final ClusterAllocationExplanation explanation =
+            client().admin().cluster().prepareAllocationExplain()
+                .setIndex(indexName).setShard(shardId.id()).setPrimary(true)
+                .get().getExplanation();
+
+        final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
+        assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
+        assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
+            equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
+    }
+
+}

+ 9 - 1
server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

@@ -214,6 +214,13 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
         }
         rerouteBuilder.get();
 
+        ClusterState state = client().admin().cluster().prepareState().get().getState();
+
+        Set<String> expectedAllocationIds = useStaleReplica
+            ? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)
+            : Collections.emptySet();
+        assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0));
+
         logger.info("--> check that the stale primary shard gets allocated and that documents are available");
         ensureYellow(idxName);
 
@@ -228,7 +235,8 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
         assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);
 
         // allocation id of old primary was cleaned from the in-sync set
-        ClusterState state = client().admin().cluster().prepareState().get().getState();
+        state = client().admin().cluster().prepareState().get().getState();
+
         assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
             state.metaData().index(idxName).inSyncAllocationIds(0));
 

+ 58 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java

@@ -23,14 +23,18 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterInfo;
+import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ESAllocationTestCase;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
@@ -58,14 +62,18 @@ import org.elasticsearch.index.shard.ShardNotFoundException;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singleton;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -259,6 +267,56 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         }
     }
 
+    public void testAllocateStalePrimaryCommand() {
+        AllocationService allocation = createAllocationService(Settings.builder()
+            .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
+            .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
+            .build());
+        final String index = "test";
+
+        logger.info("--> building initial routing table");
+        MetaData metaData = MetaData.builder()
+            .put(IndexMetaData.builder(index).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
+                .putInSyncAllocationIds(0, Collections.singleton("asdf")).putInSyncAllocationIds(1, Collections.singleton("qwertz")))
+            .build();
+        // shard routing is added as "from recovery" instead of "new index creation" so that we can test below that allocating an empty
+        // primary with accept_data_loss flag set to false fails
+        RoutingTable routingTable = RoutingTable.builder()
+            .addAsRecovery(metaData.index(index))
+            .build();
+        ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
+            .metaData(metaData).routingTable(routingTable).build();
+
+        final String node1 = "node1";
+        final String node2 = "node2";
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
+            .add(newNode(node1))
+            .add(newNode(node2))
+        ).build();
+        clusterState = allocation.reroute(clusterState, "reroute");
+
+        // mark all shards as stale
+        final List<ShardRouting> shardRoutings = clusterState.getRoutingNodes().shardsWithState(UNASSIGNED);
+        assertThat(shardRoutings, hasSize(2));
+
+        logger.info("--> allocating empty primary with acceptDataLoss flag set to true");
+        clusterState = allocation.reroute(clusterState,
+            new AllocationCommands(new AllocateStalePrimaryAllocationCommand(index, 0, node1, true)), false, false).getClusterState();
+        RoutingNode routingNode1 = clusterState.getRoutingNodes().node(node1);
+        assertThat(routingNode1.size(), equalTo(1));
+        assertThat(routingNode1.shardsWithState(INITIALIZING).size(), equalTo(1));
+        Set<String> inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0);
+        assertThat(inSyncAllocationIds, equalTo(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)));
+
+        clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
+        routingNode1 = clusterState.getRoutingNodes().node(node1);
+        assertThat(routingNode1.size(), equalTo(1));
+        assertThat(routingNode1.shardsWithState(STARTED).size(), equalTo(1));
+        inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0);
+        assertThat(inSyncAllocationIds, hasSize(1));
+        assertThat(inSyncAllocationIds, not(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)));
+    }
+
     public void testCancelCommand() {
         AllocationService allocation = createAllocationService(Settings.builder()
                 .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")

+ 3 - 21
server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java

@@ -42,12 +42,9 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.gateway.GatewayAllocator;
 import org.elasticsearch.test.gateway.TestGatewayAllocator;
 import org.hamcrest.Matchers;
 
-import java.util.List;
-
 import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
 
@@ -73,7 +70,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
         settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
         settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold);
 
-        AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
+        AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator());
 
         ClusterState clusterState = initCluster(strategy);
         assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices,
@@ -101,7 +98,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
         settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
         settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold);
 
-        AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
+        AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator());
 
         ClusterState clusterState = initCluster(strategy);
         assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices,
@@ -366,7 +363,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
                 assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING));
             }
         }
-        strategy = createAllocationService(settings.build(), new NoopGatewayAllocator());
+        strategy = createAllocationService(settings.build(), new TestGatewayAllocator());
 
         logger.info("use the new allocator and check if it moves shards");
         routingNodes = clusterState.getRoutingNodes();
@@ -402,19 +399,4 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
         }
     }
 
-    private class NoopGatewayAllocator extends GatewayAllocator {
-        @Override
-        public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
-            // noop
-        }
-
-        @Override
-        public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> failedShards) {
-            // noop
-        }
-        @Override
-        public void allocateUnassigned(RoutingAllocation allocation) {
-            // noop
-        }
-    }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java

@@ -610,7 +610,7 @@ public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase {
         return getDirs(nodeId, shardId, dirSuffix);
     }
 
-    private Set<Path> getDirs(String nodeId, ShardId shardId, String dirSuffix) {
+    public static Set<Path> getDirs(String nodeId, ShardId shardId, String dirSuffix) {
         final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get();
         final Set<Path> translogDirs = new TreeSet<>();
         final NodeStats nodeStats = nodeStatses.getNodes().get(0);