|
@@ -44,6 +44,7 @@ import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.logging.Loggers;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.Maps;
|
|
|
import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
@@ -72,7 +73,9 @@ import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClu
|
|
|
import static org.hamcrest.Matchers.aMapWithSize;
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
+import static org.hamcrest.Matchers.everyItem;
|
|
|
import static org.hamcrest.Matchers.hasEntry;
|
|
|
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
@@ -578,7 +581,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
var nodes = randomIntBetween(3, 7);
|
|
|
var nodeIds = new ArrayList<String>(nodes);
|
|
|
var discoveryNodesBuilder = DiscoveryNodes.builder();
|
|
|
- var usedDiskSpace = new HashMap<String, Long>();
|
|
|
+ var usedDiskSpace = Maps.<String, Long>newMapWithExpectedSize(nodes);
|
|
|
for (int node = 0; node < nodes; node++) {
|
|
|
var nodeId = "node-" + node;
|
|
|
nodeIds.add(nodeId);
|
|
@@ -588,6 +591,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
|
|
|
var indices = scaledRandomIntBetween(1, 500);
|
|
|
var totalShards = 0;
|
|
|
+ var totalShardsSize = 0L;
|
|
|
|
|
|
var shardSizes = new HashMap<String, Long>();
|
|
|
var dataPath = new HashMap<NodeAndShard, String>();
|
|
@@ -626,6 +630,7 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
|
|
|
var primaryNodeId = pickAndRemoveRandomValueFrom(remainingNodeIds);
|
|
|
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shardId, true), thisShardSize);
|
|
|
+ totalShardsSize += thisShardSize;
|
|
|
if (primaryNodeId != null) {
|
|
|
dataPath.put(new NodeAndShard(primaryNodeId, shardId), "/data");
|
|
|
usedDiskSpace.compute(primaryNodeId, (k, v) -> v + thisShardSize);
|
|
@@ -644,9 +649,10 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
for (int replica = 0; replica < replicas; replica++) {
|
|
|
var replicaNodeId = primaryNodeId == null ? null : pickAndRemoveRandomValueFrom(remainingNodeIds);
|
|
|
shardSizes.put(ClusterInfo.shardIdentifierFromRouting(shardId, false), thisShardSize);
|
|
|
+ totalShardsSize += thisShardSize;
|
|
|
if (replicaNodeId != null) {
|
|
|
dataPath.put(new NodeAndShard(replicaNodeId, shardId), "/data");
|
|
|
- usedDiskSpace.compute(primaryNodeId, (k, v) -> v + thisShardSize);
|
|
|
+ usedDiskSpace.compute(replicaNodeId, (k, v) -> v + thisShardSize);
|
|
|
}
|
|
|
|
|
|
indexRoutingTableBuilder.addShard(
|
|
@@ -675,7 +681,9 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
|
|
|
var iteration = new AtomicInteger(0);
|
|
|
|
|
|
- long diskSize = usedDiskSpace.values().stream().max(Long::compare).get() * 125 / 100;
|
|
|
+ long diskSize = Math.max(totalShardsSize / nodes, usedDiskSpace.values().stream().max(Long::compare).get()) * 120 / 100;
|
|
|
+ assertTrue("Should have enough space for all shards", diskSize * nodes > totalShardsSize);
|
|
|
+
|
|
|
var diskUsage = usedDiskSpace.entrySet()
|
|
|
.stream()
|
|
|
.collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue())));
|
|
@@ -691,32 +699,34 @@ public class DesiredBalanceComputerTests extends ESTestCase {
|
|
|
new BalancedShardsAllocator(settings)
|
|
|
).compute(DesiredBalance.INITIAL, input, queue(), ignored -> iteration.incrementAndGet() < 1000);
|
|
|
|
|
|
- try {
|
|
|
- assertThat(
|
|
|
- "Balance should converge, but exited by the iteration limit",
|
|
|
- desiredBalance.lastConvergedIndex(),
|
|
|
- equalTo(input.index())
|
|
|
- );
|
|
|
- logger.info(
|
|
|
- "Balance converged after [{}] iterations for [{}] nodes and [{}] total shards",
|
|
|
- iteration.get(),
|
|
|
- nodes,
|
|
|
- totalShards
|
|
|
- );
|
|
|
- } catch (AssertionError e) {
|
|
|
- logger.error(
|
|
|
- "Failed to converge desired balance for [{}] nodes and [{}] total shards:\n {}",
|
|
|
- nodes,
|
|
|
- totalShards,
|
|
|
- clusterState.getRoutingNodes().toString()
|
|
|
+ var desiredDiskUsage = Maps.<String, Long>newMapWithExpectedSize(nodes);
|
|
|
+ for (var assignment : desiredBalance.assignments().entrySet()) {
|
|
|
+ var shardSize = Math.min(
|
|
|
+ clusterInfo.getShardSize(assignment.getKey(), true),
|
|
|
+ clusterInfo.getShardSize(assignment.getKey(), false)
|
|
|
);
|
|
|
- throw e;
|
|
|
+ for (String nodeId : assignment.getValue().nodeIds()) {
|
|
|
+ desiredDiskUsage.compute(nodeId, (key, value) -> (value != null ? value : 0) + shardSize);
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
+ assertThat(
|
|
|
+ "Balance should converge, but exited by the iteration limit",
|
|
|
+ desiredBalance.lastConvergedIndex(),
|
|
|
+ equalTo(input.index())
|
|
|
+ );
|
|
|
+ logger.info("Balance converged after [{}] iterations", iteration.get());
|
|
|
+
|
|
|
+ assertThat(
|
|
|
+ "All desired disk usages " + desiredDiskUsage + " should be smaller then actual disk sizes: " + diskSize,
|
|
|
+ desiredDiskUsage.values(),
|
|
|
+ everyItem(lessThanOrEqualTo(diskSize))
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private static long smallShardSizeDeviation(long originalSize) {
|
|
|
- var deviation = randomIntBetween(0, 50) - 100L;
|
|
|
- return originalSize * (1000 + deviation) / 1000;
|
|
|
+ var deviation = randomIntBetween(-5, 5);
|
|
|
+ return originalSize * (100 + deviation) / 100;
|
|
|
}
|
|
|
|
|
|
private String pickAndRemoveRandomValueFrom(List<String> values) {
|