|
@@ -55,8 +55,6 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
|
|
|
* for shards allocated on a {@link RoutingNode}</li>
|
|
|
* <li><code>cluster.routing.allocation.balance.index</code> - The <b>index balance</b> defines a factor to the number
|
|
|
* of {@link org.elasticsearch.cluster.routing.ShardRouting}s per index allocated on a specific node</li>
|
|
|
- * <li><code>cluster.routing.allocation.balance.primary</code> - the <b>primary balance</b> defines a weight factor for
|
|
|
- * the number of primaries of a specific index allocated on a node</li>
|
|
|
* <li><code>cluster.routing.allocation.balance.threshold</code> - A <b>threshold</b> to set the minimal optimization
|
|
|
* value of operations that should be performed</li>
|
|
|
* </ul>
|
|
@@ -69,37 +67,25 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
public static final String SETTING_THRESHOLD = "cluster.routing.allocation.balance.threshold";
|
|
|
public static final String SETTING_INDEX_BALANCE_FACTOR = "cluster.routing.allocation.balance.index";
|
|
|
public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard";
|
|
|
- public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary";
|
|
|
|
|
|
private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.55f;
|
|
|
private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f;
|
|
|
- /**
|
|
|
- * The primary balance factor was introduces as a tie-breaker to make the initial allocation
|
|
|
- * more deterministic. Yet other mechanism have been added ensure that the algorithm is more deterministic such that this
|
|
|
- * setting is not needed anymore. Additionally, this setting was abused to balance shards based on their primary flag which can lead
|
|
|
- * to unexpected behavior when allocating or balancing the shards.
|
|
|
- *
|
|
|
- * @deprecated the threshold primary balance factor is deprecated and should not be used.
|
|
|
- */
|
|
|
- @Deprecated
|
|
|
- private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.0f;
|
|
|
|
|
|
class ApplySettings implements NodeSettingsService.Listener {
|
|
|
@Override
|
|
|
public void onRefreshSettings(Settings settings) {
|
|
|
final float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, weightFunction.indexBalance);
|
|
|
final float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, weightFunction.shardBalance);
|
|
|
- final float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, weightFunction.primaryBalance);
|
|
|
float threshold = settings.getAsFloat(SETTING_THRESHOLD, BalancedShardsAllocator.this.threshold);
|
|
|
if (threshold <= 0.0f) {
|
|
|
throw new ElasticsearchIllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold);
|
|
|
}
|
|
|
BalancedShardsAllocator.this.threshold = threshold;
|
|
|
- BalancedShardsAllocator.this.weightFunction = new WeightFunction(indexBalance, shardBalance, primaryBalance);
|
|
|
+ BalancedShardsAllocator.this.weightFunction = new WeightFunction(indexBalance, shardBalance);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR);
|
|
|
+ private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR);
|
|
|
|
|
|
private volatile float threshold = 1.0f;
|
|
|
|
|
@@ -153,13 +139,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return weightFunction.indexBalance;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Returns the primary related weight factor.
|
|
|
- */
|
|
|
- public float getPrimaryBalance() {
|
|
|
- return weightFunction.primaryBalance;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Returns the shard related weight factor.
|
|
|
*/
|
|
@@ -174,11 +153,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* <ul>
|
|
|
* <li><code>index balance</code> - balance property over shards per index</li>
|
|
|
* <li><code>shard balance</code> - balance property over shards per cluster</li>
|
|
|
- * <li><code>primary balance</code> - balance property over primaries per cluster</li>
|
|
|
* </ul>
|
|
|
* <p>
|
|
|
* Each of these properties are expressed as factor such that the properties factor defines the relative importance of the property for the
|
|
|
- * weight function. For example if the weight function should calculate the weights only based on a global (shard) balance the index and primary balance
|
|
|
+ * weight function. For example if the weight function should calculate the weights only based on a global (shard) balance the index balance
|
|
|
* can be set to <tt>0.0</tt> and will in turn have no effect on the distribution.
|
|
|
* </p>
|
|
|
* The weight per index is calculated based on the following formula:
|
|
@@ -189,35 +167,31 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* <li>
|
|
|
* <code>weight<sub>node</sub>(node, index) = shardBalance * (node.numShards() - avgShardsPerNode)</code>
|
|
|
* </li>
|
|
|
- * <li>
|
|
|
- * <code>weight<sub>primary</sub>(node, index) = primaryBalance * (node.numPrimaries() - avgPrimariesPerNode)</code>
|
|
|
- * </li>
|
|
|
* </ul>
|
|
|
- * <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index) + weight<sub>primary</sub>(node, index)</code>
|
|
|
+ * <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
|
|
|
*/
|
|
|
public static class WeightFunction {
|
|
|
|
|
|
private final float indexBalance;
|
|
|
private final float shardBalance;
|
|
|
- private final float primaryBalance;
|
|
|
private final float[] theta;
|
|
|
|
|
|
- public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) {
|
|
|
- float sum = indexBalance + shardBalance + primaryBalance;
|
|
|
+
|
|
|
+ public WeightFunction(float indexBalance, float shardBalance) {
|
|
|
+ float sum = indexBalance + shardBalance;
|
|
|
if (sum <= 0.0f) {
|
|
|
throw new ElasticsearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
|
|
|
}
|
|
|
- theta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum};
|
|
|
+ theta = new float[]{shardBalance / sum, indexBalance / sum};
|
|
|
this.indexBalance = indexBalance;
|
|
|
this.shardBalance = shardBalance;
|
|
|
- this.primaryBalance = primaryBalance;
|
|
|
}
|
|
|
|
|
|
public float weight(Operation operation, Balancer balancer, ModelNode node, String index) {
|
|
|
- final float weightShard = node.numShards() - balancer.avgShardsPerNode();
|
|
|
- final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
|
|
|
- final float weightPrimary = node.numPrimaries() - balancer.avgPrimariesPerNode();
|
|
|
- return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary;
|
|
|
+ final float weightShard = (node.numShards() - balancer.avgShardsPerNode());
|
|
|
+ final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index));
|
|
|
+ assert theta != null;
|
|
|
+ return theta[0] * weightShard + theta[1] * weightIndex;
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -245,7 +219,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
* A {@link Balancer}
|
|
|
*/
|
|
|
public static class Balancer {
|
|
|
-
|
|
|
private final ESLogger logger;
|
|
|
private final Map<String, ModelNode> nodes = new HashMap<>();
|
|
|
private final HashSet<String> indices = new HashSet<>();
|
|
@@ -304,12 +277,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return ((float) metaData.numberOfShards()) / nodes.size();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Returns the average of primaries per node for the given index
|
|
|
- */
|
|
|
- public float avgPrimariesPerNode(String index) {
|
|
|
- return ((float) metaData.index(index).numberOfShards()) / nodes.size();
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Returns a new {@link NodeSorter} that sorts the nodes based on their
|
|
@@ -380,7 +347,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
final ModelNode maxNode = modelNodes[highIdx];
|
|
|
advance_range:
|
|
|
if (maxNode.numShards(index) > 0) {
|
|
|
- float delta = absDelta(weights[lowIdx], weights[highIdx]);
|
|
|
+ final float delta = absDelta(weights[lowIdx], weights[highIdx]);
|
|
|
if (lessThan(delta, threshold)) {
|
|
|
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
|
|
|
&& (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
|
|
@@ -845,7 +812,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
private final Map<String, ModelIndex> indices = new HashMap<>();
|
|
|
/* cached stats - invalidated on add/remove and lazily calculated */
|
|
|
private int numShards = -1;
|
|
|
- private int numPrimaries = -1;
|
|
|
|
|
|
public ModelNode(String id) {
|
|
|
this.id = id;
|
|
@@ -875,22 +841,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
return index == null ? 0 : index.numShards();
|
|
|
}
|
|
|
|
|
|
- public int numPrimaries(String idx) {
|
|
|
- ModelIndex index = indices.get(idx);
|
|
|
- return index == null ? 0 : index.numPrimaries();
|
|
|
- }
|
|
|
-
|
|
|
- public int numPrimaries() {
|
|
|
- if (numPrimaries == -1) {
|
|
|
- int sum = 0;
|
|
|
- for (ModelIndex index : indices.values()) {
|
|
|
- sum += index.numPrimaries();
|
|
|
- }
|
|
|
- numPrimaries = sum;
|
|
|
- }
|
|
|
- return numPrimaries;
|
|
|
- }
|
|
|
-
|
|
|
public Collection<MutableShardRouting> shards() {
|
|
|
Collection<MutableShardRouting> result = new ArrayList<>();
|
|
|
for (ModelIndex index : indices.values()) {
|
|
@@ -908,7 +858,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
}
|
|
|
|
|
|
public void addShard(MutableShardRouting shard, Decision decision) {
|
|
|
- numPrimaries = numShards = -1;
|
|
|
+ numShards = -1;
|
|
|
ModelIndex index = indices.get(shard.index());
|
|
|
if (index == null) {
|
|
|
index = new ModelIndex(shard.index());
|
|
@@ -918,7 +868,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|
|
}
|
|
|
|
|
|
public Decision removeShard(MutableShardRouting shard) {
|
|
|
- numPrimaries = numShards = -1;
|
|
|
+ numShards = -1;
|
|
|
ModelIndex index = indices.get(shard.index());
|
|
|
Decision removed = null;
|
|
|
if (index != null) {
|