|
@@ -47,7 +47,6 @@ import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
|
|
-import org.elasticsearch.common.settings.Setting;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.node.NodeClosedException;
|
|
@@ -72,7 +71,6 @@ import java.util.Locale;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Set;
|
|
|
import java.util.function.Predicate;
|
|
|
-import java.util.function.Supplier;
|
|
|
|
|
|
public class ShardStateAction {
|
|
|
|
|
@@ -81,34 +79,10 @@ public class ShardStateAction {
|
|
|
public static final String SHARD_STARTED_ACTION_NAME = "internal:cluster/shard/started";
|
|
|
public static final String SHARD_FAILED_ACTION_NAME = "internal:cluster/shard/failure";
|
|
|
|
|
|
- /**
|
|
|
- * Adjusts the priority of the followup reroute task. NORMAL is right for reasonable clusters, but in a badly configured cluster it may
|
|
|
- * be necessary to raise this higher to recover the older behaviour of rerouting after processing every shard-started task. Deliberately
|
|
|
- * undocumented, since this is a last-resort escape hatch for experts rather than something we want to expose to anyone, and deprecated
|
|
|
- * since we will remove it once we have confirmed from experience that this priority is appropriate in all cases.
|
|
|
- */
|
|
|
- public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING
|
|
|
- = new Setting<>("cluster.routing.allocation.shard_state.reroute.priority", Priority.NORMAL.toString(),
|
|
|
- ShardStateAction::parseReroutePriority, Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated);
|
|
|
-
|
|
|
- private static Priority parseReroutePriority(String priorityString) {
|
|
|
- final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
|
|
|
- switch (priority) {
|
|
|
- case NORMAL:
|
|
|
- case HIGH:
|
|
|
- case URGENT:
|
|
|
- return priority;
|
|
|
- }
|
|
|
- throw new IllegalArgumentException(
|
|
|
- "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]");
|
|
|
- }
|
|
|
-
|
|
|
private final TransportService transportService;
|
|
|
private final ClusterService clusterService;
|
|
|
private final ThreadPool threadPool;
|
|
|
|
|
|
- private volatile Priority followUpRerouteTaskPriority;
|
|
|
-
|
|
|
// a list of shards that failed during replication
|
|
|
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
|
|
|
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
|
|
@@ -120,17 +94,13 @@ public class ShardStateAction {
|
|
|
this.clusterService = clusterService;
|
|
|
this.threadPool = threadPool;
|
|
|
|
|
|
- followUpRerouteTaskPriority = FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(clusterService.getSettings());
|
|
|
- clusterService.getClusterSettings().addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING,
|
|
|
- this::setFollowUpRerouteTaskPriority);
|
|
|
-
|
|
|
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ThreadPool.Names.SAME, StartedShardEntry::new,
|
|
|
new ShardStartedTransportHandler(clusterService,
|
|
|
- new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
|
|
|
+ new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService, logger),
|
|
|
logger));
|
|
|
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ThreadPool.Names.SAME, FailedShardEntry::new,
|
|
|
new ShardFailedTransportHandler(clusterService,
|
|
|
- new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, () -> followUpRerouteTaskPriority, logger),
|
|
|
+ new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService, logger),
|
|
|
logger));
|
|
|
}
|
|
|
|
|
@@ -248,10 +218,6 @@ public class ShardStateAction {
|
|
|
}, changePredicate);
|
|
|
}
|
|
|
|
|
|
- private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
|
|
|
- this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
|
|
|
- }
|
|
|
-
|
|
|
private static class ShardFailedTransportHandler implements TransportRequestHandler<FailedShardEntry> {
|
|
|
private final ClusterService clusterService;
|
|
|
private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor;
|
|
@@ -319,14 +285,11 @@ public class ShardStateAction {
|
|
|
private final AllocationService allocationService;
|
|
|
private final RerouteService rerouteService;
|
|
|
private final Logger logger;
|
|
|
- private final Supplier<Priority> prioritySupplier;
|
|
|
|
|
|
- public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
|
|
|
- Supplier<Priority> prioritySupplier, Logger logger) {
|
|
|
+ public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
|
|
|
this.allocationService = allocationService;
|
|
|
this.rerouteService = rerouteService;
|
|
|
this.logger = logger;
|
|
|
- this.prioritySupplier = prioritySupplier;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -420,7 +383,7 @@ public class ShardStateAction {
|
|
|
// assign it again, even if that means putting it back on the node on which it previously failed:
|
|
|
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
|
|
|
logger.trace("{}, scheduling a reroute", reason);
|
|
|
- rerouteService.reroute(reason, prioritySupplier.get(), ActionListener.wrap(
|
|
|
+ rerouteService.reroute(reason, Priority.NORMAL, ActionListener.wrap(
|
|
|
r -> logger.trace("{}, reroute completed", reason),
|
|
|
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
|
|
|
}
|
|
@@ -552,14 +515,11 @@ public class ShardStateAction {
|
|
|
private final AllocationService allocationService;
|
|
|
private final Logger logger;
|
|
|
private final RerouteService rerouteService;
|
|
|
- private final Supplier<Priority> prioritySupplier;
|
|
|
|
|
|
- public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService,
|
|
|
- Supplier<Priority> prioritySupplier, Logger logger) {
|
|
|
+ public ShardStartedClusterStateTaskExecutor(AllocationService allocationService, RerouteService rerouteService, Logger logger) {
|
|
|
this.allocationService = allocationService;
|
|
|
this.logger = logger;
|
|
|
this.rerouteService = rerouteService;
|
|
|
- this.prioritySupplier = prioritySupplier;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -637,7 +597,7 @@ public class ShardStateAction {
|
|
|
|
|
|
@Override
|
|
|
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
|
|
|
- rerouteService.reroute("reroute after starting shards", prioritySupplier.get(), ActionListener.wrap(
|
|
|
+ rerouteService.reroute("reroute after starting shards", Priority.NORMAL, ActionListener.wrap(
|
|
|
r -> logger.trace("reroute after starting shards succeeded"),
|
|
|
e -> logger.debug("reroute after starting shards failed", e)));
|
|
|
}
|