|
@@ -197,6 +197,7 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.function.BiFunction;
|
|
|
import java.util.function.BooleanSupplier;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -2359,17 +2360,28 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
protected void assertSeqNos() throws Exception {
|
|
|
+ final BiFunction<ClusterState, ShardRouting, IndexShard> getInstanceShardInstance = (clusterState, shardRouting) -> {
|
|
|
+ if (shardRouting.assignedToNode() == false) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ final DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
|
|
|
+ if (assignedNode == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return internalCluster().getInstance(IndicesService.class, assignedNode.getName()).getShardOrNull(shardRouting.shardId());
|
|
|
+ };
|
|
|
assertBusy(() -> {
|
|
|
final ClusterState state = clusterService().state();
|
|
|
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
|
|
|
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
|
|
|
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
|
|
|
- if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
|
|
|
+ if (primaryShardRouting == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
|
|
|
- IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
|
|
|
- .indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
|
|
|
+ final IndexShard primaryShard = getInstanceShardInstance.apply(state, primaryShardRouting);
|
|
|
+ if (primaryShard == null) {
|
|
|
+ continue; //just ignore - shard movement
|
|
|
+ }
|
|
|
final SeqNoStats primarySeqNoStats;
|
|
|
final ObjectLongMap<String> syncGlobalCheckpoints;
|
|
|
try {
|
|
@@ -2381,12 +2393,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|
|
assertThat(primaryShardRouting + " should have set the global checkpoint",
|
|
|
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
|
|
|
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
|
|
|
- if (replicaShardRouting.assignedToNode() == false) {
|
|
|
- continue;
|
|
|
+ final IndexShard replicaShard = getInstanceShardInstance.apply(state, replicaShardRouting);
|
|
|
+ if (replicaShard == null) {
|
|
|
+ continue; //just ignore - shard movement
|
|
|
}
|
|
|
- DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
|
|
|
- IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
|
|
|
- .indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
|
|
|
final SeqNoStats seqNoStats;
|
|
|
try {
|
|
|
seqNoStats = replicaShard.seqNoStats();
|