|
@@ -83,6 +83,7 @@ import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.function.Consumer;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
@@ -874,23 +875,48 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|
|
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
|
|
|
// If the index gets deleted after primary operation, we skip replication
|
|
|
final ClusterState state = clusterService.state();
|
|
|
- final IndexRoutingTable index = state.getRoutingTable().index(shardId.getIndex());
|
|
|
- final IndexShardRoutingTable shardRoutingTable = (index != null) ? index.shard(shardId.id()) : null;
|
|
|
+ final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId);
|
|
|
final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex());
|
|
|
- this.shards = (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList();
|
|
|
- this.executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
|
|
|
- this.nodes = state.getNodes();
|
|
|
+ List<ShardRouting> shards = shards(shardRoutingTable);
|
|
|
+ boolean executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
|
|
|
+ DiscoveryNodes nodes = state.getNodes();
|
|
|
|
|
|
if (shards.isEmpty()) {
|
|
|
logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId);
|
|
|
}
|
|
|
|
|
|
// we calculate number of target nodes to send replication operations, including nodes with relocating shards
|
|
|
+ AtomicInteger numberOfPendingShardInstances = new AtomicInteger();
|
|
|
+ this.totalShards = countTotalAndPending(shards, executeOnReplica, nodes, numberOfPendingShardInstances);
|
|
|
+ this.pending = numberOfPendingShardInstances;
|
|
|
+ this.shards = shards;
|
|
|
+ this.executeOnReplica = executeOnReplica;
|
|
|
+ this.nodes = nodes;
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
|
|
|
+ transportReplicaAction, replicaRequest, state.version());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int countTotalAndPending(List<ShardRouting> shards, boolean executeOnReplica, DiscoveryNodes nodes, AtomicInteger pending) {
|
|
|
+ assert pending.get() == 0;
|
|
|
+ int numberOfIgnoredShardInstances = performOnShards(shards, executeOnReplica, nodes, shard -> pending.incrementAndGet(), shard -> pending.incrementAndGet());
|
|
|
+ // one for the local primary copy
|
|
|
+ return 1 + numberOfIgnoredShardInstances + pending.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ private int performOnShards(List<ShardRouting> shards, boolean executeOnReplica, DiscoveryNodes nodes, Consumer<ShardRouting> onLocalShard, Consumer<ShardRouting> onRelocatingShard) {
|
|
|
int numberOfIgnoredShardInstances = 0;
|
|
|
- int numberOfPendingShardInstances = 0;
|
|
|
for (ShardRouting shard : shards) {
|
|
|
- // the following logic to select the shards to replicate to is mirrored and explained in the doRun method below
|
|
|
if (shard.primary() == false && executeOnReplica == false) {
|
|
|
+ // If the replicas use shadow replicas, there is no reason to
|
|
|
+ // perform the action on the replica, so skip it and
|
|
|
+ // immediately return
|
|
|
+
|
|
|
+ // this delays mapping updates on replicas because they have
|
|
|
+ // to wait until they get the new mapping through the cluster
|
|
|
+ // state, which is why we recommend pre-defined mappings for
|
|
|
+ // indices using shadow replicas
|
|
|
numberOfIgnoredShardInstances++;
|
|
|
continue;
|
|
|
}
|
|
@@ -898,20 +924,26 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|
|
numberOfIgnoredShardInstances++;
|
|
|
continue;
|
|
|
}
|
|
|
+ // we index on a replica that is initializing as well since we might not have got the event
|
|
|
+ // yet that it was started. We will get an exception IllegalShardState exception if its not started
|
|
|
+ // and that's fine, we will ignore it
|
|
|
+
|
|
|
+ // we never execute replication operation locally as primary operation has already completed locally
|
|
|
+ // hence, we ignore any local shard for replication
|
|
|
if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
|
|
|
- numberOfPendingShardInstances++;
|
|
|
+ onLocalShard.accept(shard);
|
|
|
}
|
|
|
+ // send operation to relocating shard
|
|
|
+ // local shard can be a relocation target of a primary that is in relocated state
|
|
|
if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
|
|
|
- numberOfPendingShardInstances++;
|
|
|
+ onRelocatingShard.accept(shard);
|
|
|
}
|
|
|
}
|
|
|
- // one for the local primary copy
|
|
|
- this.totalShards = 1 + numberOfPendingShardInstances + numberOfIgnoredShardInstances;
|
|
|
- this.pending = new AtomicInteger(numberOfPendingShardInstances);
|
|
|
- if (logger.isTraceEnabled()) {
|
|
|
- logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(),
|
|
|
- transportReplicaAction, replicaRequest, state.version());
|
|
|
- }
|
|
|
+ return numberOfIgnoredShardInstances;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<ShardRouting> shards(IndexShardRoutingTable shardRoutingTable) {
|
|
|
+ return (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -951,36 +983,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|
|
doFinish();
|
|
|
return;
|
|
|
}
|
|
|
- for (ShardRouting shard : shards) {
|
|
|
- if (shard.primary() == false && executeOnReplica == false) {
|
|
|
- // If the replicas use shadow replicas, there is no reason to
|
|
|
- // perform the action on the replica, so skip it and
|
|
|
- // immediately return
|
|
|
-
|
|
|
- // this delays mapping updates on replicas because they have
|
|
|
- // to wait until they get the new mapping through the cluster
|
|
|
- // state, which is why we recommend pre-defined mappings for
|
|
|
- // indices using shadow replicas
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (shard.unassigned()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- // we index on a replica that is initializing as well since we might not have got the event
|
|
|
- // yet that it was started. We will get an exception IllegalShardState exception if its not started
|
|
|
- // and that's fine, we will ignore it
|
|
|
-
|
|
|
- // we never execute replication operation locally as primary operation has already completed locally
|
|
|
- // hence, we ignore any local shard for replication
|
|
|
- if (nodes.localNodeId().equals(shard.currentNodeId()) == false) {
|
|
|
- performOnReplica(shard);
|
|
|
- }
|
|
|
- // send operation to relocating shard
|
|
|
- // local shard can be a relocation target of a primary that is in relocated state
|
|
|
- if (shard.relocating() && nodes.localNodeId().equals(shard.relocatingNodeId()) == false) {
|
|
|
- performOnReplica(shard.buildTargetRelocatingShard());
|
|
|
- }
|
|
|
- }
|
|
|
+ performOnShards(shards, executeOnReplica, nodes, shard -> performOnReplica(shard), shard -> performOnReplica(shard.buildTargetRelocatingShard()));
|
|
|
}
|
|
|
|
|
|
/**
|