|
@@ -427,14 +427,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
* @param state new cluster state
|
|
|
*/
|
|
|
private void createIndicesAndUpdateShards(final ClusterState state) {
|
|
|
- DiscoveryNodes nodes = state.nodes();
|
|
|
- RoutingNode localRoutingNode = state.getRoutingNodes().node(nodes.getLocalNodeId());
|
|
|
+ RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
|
|
if (localRoutingNode == null) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- RoutingTable routingTable = state.routingTable();
|
|
|
-
|
|
|
// create map of indices to create with shards to fail if index creation fails or create or update shards if an existing index
|
|
|
// service is found
|
|
|
final Map<Index, List<ShardRouting>> indicesToCreate = new HashMap<>();
|
|
@@ -455,7 +452,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
if (indexService == null) {
|
|
|
indicesToCreate.computeIfAbsent(index, k -> new ArrayList<>()).add(shardRouting);
|
|
|
} else {
|
|
|
- createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
|
|
|
+ createOrUpdateShard(state, shardRouting, indexService);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -485,24 +482,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
}
|
|
|
// we succeeded in creating the index service, so now we can create the missing shards assigned to this node
|
|
|
for (ShardRouting shardRouting : entry.getValue()) {
|
|
|
- createOrUpdateShard(state, nodes, routingTable, shardRouting, indexService);
|
|
|
+ createOrUpdateShard(state, shardRouting, indexService);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void createOrUpdateShard(
|
|
|
- ClusterState state,
|
|
|
- DiscoveryNodes nodes,
|
|
|
- RoutingTable routingTable,
|
|
|
- ShardRouting shardRouting,
|
|
|
- AllocatedIndex<? extends Shard> indexService
|
|
|
- ) {
|
|
|
+ private void createOrUpdateShard(ClusterState state, ShardRouting shardRouting, AllocatedIndex<? extends Shard> indexService) {
|
|
|
Shard shard = indexService.getShardOrNull(shardRouting.shardId().id());
|
|
|
if (shard == null) {
|
|
|
assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";
|
|
|
- createShard(nodes, routingTable, shardRouting, state);
|
|
|
+ createShard(shardRouting, state);
|
|
|
} else {
|
|
|
- updateShard(nodes, shardRouting, shard, routingTable, state);
|
|
|
+ updateShard(shardRouting, shard, state);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -546,19 +537,21 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
|
|
|
+ private void createShard(ShardRouting shardRouting, ClusterState state) {
|
|
|
assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;
|
|
|
|
|
|
- DiscoveryNode sourceNode = null;
|
|
|
- if (shardRouting.recoverySource().getType() == Type.PEER) {
|
|
|
- sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
|
|
- if (sourceNode == null) {
|
|
|
- logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
|
|
|
- return;
|
|
|
+ try {
|
|
|
+ final DiscoveryNode sourceNode;
|
|
|
+ if (shardRouting.recoverySource().getType() == Type.PEER) {
|
|
|
+ sourceNode = findSourceNodeForPeerRecovery(state.routingTable(), state.nodes(), shardRouting);
|
|
|
+ if (sourceNode == null) {
|
|
|
+ logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sourceNode = null;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- try {
|
|
|
final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
|
|
|
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
|
|
|
indicesService.createShard(
|
|
@@ -569,7 +562,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
failedShardHandler,
|
|
|
this::updateGlobalCheckpointForShard,
|
|
|
retentionLeaseSyncer,
|
|
|
- nodes.getLocalNode(),
|
|
|
+ state.nodes().getLocalNode(),
|
|
|
sourceNode
|
|
|
);
|
|
|
} catch (Exception e) {
|
|
@@ -577,13 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void updateShard(
|
|
|
- DiscoveryNodes nodes,
|
|
|
- ShardRouting shardRouting,
|
|
|
- Shard shard,
|
|
|
- RoutingTable routingTable,
|
|
|
- ClusterState clusterState
|
|
|
- ) {
|
|
|
+ private void updateShard(ShardRouting shardRouting, Shard shard, ClusterState clusterState) {
|
|
|
final ShardRouting currentRoutingEntry = shard.routingEntry();
|
|
|
assert currentRoutingEntry.isSameAllocation(shardRouting)
|
|
|
: "local shard has a different allocation id but wasn't cleaned by removeShards. "
|
|
@@ -597,7 +584,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
final IndexMetadata indexMetadata = clusterState.metadata().index(shard.shardId().getIndex());
|
|
|
primaryTerm = indexMetadata.primaryTerm(shard.shardId().id());
|
|
|
final Set<String> inSyncIds = indexMetadata.inSyncAllocationIds(shard.shardId().id());
|
|
|
- final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
|
|
|
+ final IndexShardRoutingTable indexShardRoutingTable = clusterState.routingTable().shardRoutingTable(shardRouting.shardId());
|
|
|
shard.updateShardState(
|
|
|
shardRouting,
|
|
|
primaryTerm,
|
|
@@ -621,15 +608,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
|
|
|
"{} master marked shard as initializing, but shard has state [{}], resending shard started to {}",
|
|
|
shardRouting.shardId(),
|
|
|
state,
|
|
|
- nodes.getMasterNode()
|
|
|
+ clusterState.nodes().getMasterNode()
|
|
|
);
|
|
|
}
|
|
|
- if (nodes.getMasterNode() != null) {
|
|
|
+ if (clusterState.nodes().getMasterNode() != null) {
|
|
|
shardStateAction.shardStarted(
|
|
|
shardRouting,
|
|
|
primaryTerm,
|
|
|
"master "
|
|
|
- + nodes.getMasterNode()
|
|
|
+ + clusterState.nodes().getMasterNode()
|
|
|
+ " marked shard as initializing, but shard state is ["
|
|
|
+ state
|
|
|
+ "], mark shard as started",
|