|
@@ -25,9 +25,11 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
|
|
import org.elasticsearch.action.index.IndexResponse;
|
|
|
import org.elasticsearch.action.support.ActiveShardCount;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
+import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
|
|
|
+import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.set.Sets;
|
|
@@ -54,6 +56,7 @@ import java.util.Collections;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -212,14 +215,26 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
|
|
|
rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true));
|
|
|
}
|
|
|
}
|
|
|
- rerouteBuilder.get();
|
|
|
-
|
|
|
- ClusterState state = client().admin().cluster().prepareState().get().getState();
|
|
|
|
|
|
- Set<String> expectedAllocationIds = useStaleReplica
|
|
|
+ final Set<String> expectedAllocationIds = useStaleReplica
|
|
|
? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)
|
|
|
: Collections.emptySet();
|
|
|
- assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0));
|
|
|
+
|
|
|
+ final CountDownLatch clusterStateChangeLatch = new CountDownLatch(1);
|
|
|
+ final ClusterStateListener clusterStateListener = event -> {
|
|
|
+ final Set<String> allocationIds = event.state().metaData().index(idxName).inSyncAllocationIds(0);
|
|
|
+ if (expectedAllocationIds.equals(allocationIds)) {
|
|
|
+ clusterStateChangeLatch.countDown();
|
|
|
+ }
|
|
|
+ logger.info("expected allocation ids: {} actual allocation ids: {}", expectedAllocationIds, allocationIds);
|
|
|
+ };
|
|
|
+ final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master);
|
|
|
+ clusterService.addListener(clusterStateListener);
|
|
|
+
|
|
|
+ rerouteBuilder.get();
|
|
|
+
|
|
|
+ assertTrue(clusterStateChangeLatch.await(30, TimeUnit.SECONDS));
|
|
|
+ clusterService.removeListener(clusterStateListener);
|
|
|
|
|
|
logger.info("--> check that the stale primary shard gets allocated and that documents are available");
|
|
|
ensureYellow(idxName);
|
|
@@ -235,7 +250,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
|
|
|
assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L);
|
|
|
|
|
|
// allocation id of old primary was cleaned from the in-sync set
|
|
|
- state = client().admin().cluster().prepareState().get().getState();
|
|
|
+ final ClusterState state = client().admin().cluster().prepareState().get().getState();
|
|
|
|
|
|
assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()),
|
|
|
state.metaData().index(idxName).inSyncAllocationIds(0));
|