|
@@ -62,34 +62,31 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
|
|
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
|
|
|
|
|
|
/**
|
|
|
- * The {@link BalancedShardsAllocator} re-balances the nodes allocations
|
|
|
- * within an cluster based on a {@link WeightFunction}. The clusters balance is defined by four parameters which can be set
|
|
|
- * in the cluster update API that allows changes in real-time:
|
|
|
- * <ul><li><code>cluster.routing.allocation.balance.shard</code> - The <b>shard balance</b> defines the weight factor
|
|
|
- * for shards allocated on a {@link RoutingNode}</li>
|
|
|
- * <li><code>cluster.routing.allocation.balance.index</code> - The <b>index balance</b> defines a factor to the number
|
|
|
- * of {@link org.elasticsearch.cluster.routing.ShardRouting}s per index allocated on a specific node</li>
|
|
|
- * <li><code>cluster.routing.allocation.balance.threshold</code> - A <b>threshold</b> to set the minimal optimization
|
|
|
- * value of operations that should be performed</li>
|
|
|
+ * The {@link BalancedShardsAllocator} allocates and balances shards on the cluster nodes using {@link WeightFunction}.
|
|
|
+ * The balancing attempts to:
|
|
|
+ * <ul>
|
|
|
+ * <li>even shard count across nodes (weighted by cluster.routing.allocation.balance.shard)</li>
|
|
|
+ * <li>spread shards of the same index across different nodes (weighted by cluster.routing.allocation.balance.index)</li>
|
|
|
+ * <li>even write load of the data streams write indices across nodes (weighted by cluster.routing.allocation.balance.write_load)</li>
|
|
|
+ * <li>even disk usage across nodes (weighted by cluster.routing.allocation.balance.write_load)</li>
|
|
|
* </ul>
|
|
|
- * <p>
|
|
|
- * These parameters are combined in a {@link WeightFunction} that allows calculation of node weights which
|
|
|
- * are used to re-balance shards based on global as well as per-index factors.
|
|
|
+ * The sensitivity of the algorithm is defined by cluster.routing.allocation.balance.threshold.
|
|
|
+ * Allocator takes into account constraints set by {@code AllocationDeciders} when allocating and balancing shards.
|
|
|
*/
|
|
|
public class BalancedShardsAllocator implements ShardsAllocator {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(BalancedShardsAllocator.class);
|
|
|
|
|
|
- public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting(
|
|
|
- "cluster.routing.allocation.balance.index",
|
|
|
- 0.55f,
|
|
|
+ public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting(
|
|
|
+ "cluster.routing.allocation.balance.shard",
|
|
|
+ 0.45f,
|
|
|
0.0f,
|
|
|
Property.Dynamic,
|
|
|
Property.NodeScope
|
|
|
);
|
|
|
- public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting(
|
|
|
- "cluster.routing.allocation.balance.shard",
|
|
|
- 0.45f,
|
|
|
+ public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting(
|
|
|
+ "cluster.routing.allocation.balance.index",
|
|
|
+ 0.55f,
|
|
|
0.0f,
|
|
|
Property.Dynamic,
|
|
|
Property.NodeScope
|
|
@@ -138,8 +135,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|
|
|
|
|
@Inject
|
|
|
public BalancedShardsAllocator(ClusterSettings clusterSettings, WriteLoadForecaster writeLoadForecaster) {
|
|
|
- clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
|
|
|
clusterSettings.initializeAndWatch(SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
|
|
|
+ clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
|
|
|
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
|
|
|
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
|
|
|
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = ensureValidThreshold(value));
|
|
@@ -179,8 +176,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|
|
return;
|
|
|
}
|
|
|
final WeightFunction weightFunction = new WeightFunction(
|
|
|
- indexBalanceFactor,
|
|
|
shardBalanceFactor,
|
|
|
+ indexBalanceFactor,
|
|
|
writeLoadBalanceFactor,
|
|
|
diskUsageBalanceFactor
|
|
|
);
|
|
@@ -193,8 +190,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|
|
@Override
|
|
|
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
|
|
|
WeightFunction weightFunction = new WeightFunction(
|
|
|
- indexBalanceFactor,
|
|
|
shardBalanceFactor,
|
|
|
+ indexBalanceFactor,
|
|
|
writeLoadBalanceFactor,
|
|
|
diskUsageBalanceFactor
|
|
|
);
|
|
@@ -293,8 +290,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
|
|
private final float theta2;
|
|
|
private final float theta3;
|
|
|
|
|
|
- WeightFunction(float indexBalance, float shardBalance, float writeLoadBalance, float diskUsageBalance) {
|
|
|
- float sum = indexBalance + shardBalance + writeLoadBalance + diskUsageBalance;
|
|
|
+ WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) {
|
|
|
+ float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance;
|
|
|
if (sum <= 0.0f) {
|
|
|
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
|
|
|
}
|