|
@@ -11,7 +11,6 @@ package org.elasticsearch.cluster.routing.allocation.decider;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
-import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.cluster.ClusterInfoService;
|
|
|
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
|
|
|
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
|
|
@@ -39,13 +38,16 @@ import org.hamcrest.Matcher;
|
|
|
import org.hamcrest.TypeSafeMatcher;
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.Locale;
|
|
|
+import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
-import static java.util.stream.Collectors.toMap;
|
|
|
import static java.util.stream.Collectors.toSet;
|
|
|
+import static org.elasticsearch.cluster.routing.RoutingNodesHelper.numberOfShardsWithState;
|
|
|
+import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
|
|
|
import static org.elasticsearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
@@ -74,26 +76,25 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
final String dataNodeName = internalCluster().startDataOnlyNode();
|
|
|
ensureStableCluster(3);
|
|
|
|
|
|
- final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(
|
|
|
- ClusterInfoService.class
|
|
|
- );
|
|
|
- internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
|
|
|
- .addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService));
|
|
|
+ final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
|
|
|
+ internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
|
|
|
+ ClusterInfoServiceUtils.refresh(clusterInfoService);
|
|
|
+ });
|
|
|
|
|
|
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
|
|
|
|
|
|
- final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
|
|
+ final String indexName = randomIdentifier();
|
|
|
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
|
|
|
- var smallestShard = createReasonableSizedShards(indexName);
|
|
|
+ var shardSizes = createReasonableSizedShards(indexName);
|
|
|
|
|
|
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
|
|
|
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
|
|
|
- getTestFileStore(dataNodeName).setTotalSpace(smallestShard.size + WATERMARK_BYTES - 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() + WATERMARK_BYTES - 1L);
|
|
|
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty());
|
|
|
|
|
|
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
|
|
|
- getTestFileStore(dataNodeName).setTotalSpace(smallestShard.size + WATERMARK_BYTES);
|
|
|
- assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, new ContainsExactlyOneOf<>(smallestShard.shardIds));
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() + WATERMARK_BYTES);
|
|
|
+ assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, new ContainsExactlyOneOf<>(shardSizes.getSmallestShardIds()));
|
|
|
}
|
|
|
|
|
|
public void testRestoreSnapshotAllocationDoesNotExceedWatermark() throws Exception {
|
|
@@ -108,17 +109,20 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
|
|
|
);
|
|
|
|
|
|
- final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(
|
|
|
- ClusterInfoService.class
|
|
|
- );
|
|
|
- internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
|
|
|
- .addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService));
|
|
|
+ final AtomicBoolean allowRelocations = new AtomicBoolean(true);
|
|
|
+ final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
|
|
|
+ internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
|
|
|
+ ClusterInfoServiceUtils.refresh(clusterInfoService);
|
|
|
+ if (allowRelocations.get() == false) {
|
|
|
+ assertThat(numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), equalTo(0));
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
|
|
|
|
|
|
- final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
|
|
+ final String indexName = randomIdentifier();
|
|
|
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
|
|
|
- var smallestShard = createReasonableSizedShards(indexName);
|
|
|
+ var shardSizes = createReasonableSizedShards(indexName);
|
|
|
|
|
|
final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("repo", "snap")
|
|
|
.setWaitForCompletion(true)
|
|
@@ -128,15 +132,13 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
|
|
|
|
|
|
assertAcked(indicesAdmin().prepareDelete(indexName).get());
|
|
|
+ updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
|
|
|
+ allowRelocations.set(false);
|
|
|
|
|
|
// reduce disk size of node 0 so that no shards fit below the low watermark, forcing shards to be assigned to the other data node
|
|
|
- getTestFileStore(dataNodeName).setTotalSpace(smallestShard.size + WATERMARK_BYTES - 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() + WATERMARK_BYTES - 1L);
|
|
|
refreshDiskUsage();
|
|
|
|
|
|
- updateClusterSettings(
|
|
|
- Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())
|
|
|
- );
|
|
|
-
|
|
|
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("repo", "snap")
|
|
|
.setWaitForCompletion(true)
|
|
|
.get();
|
|
@@ -144,13 +146,71 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
|
|
|
assertThat(restoreInfo.failedShards(), is(0));
|
|
|
|
|
|
- assertBusy(() -> assertThat(getShardIds(dataNode0Id, indexName), empty()));
|
|
|
+ assertThat(getShardIds(dataNode0Id, indexName), empty());
|
|
|
|
|
|
- updateClusterSettings(Settings.builder().putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey()));
|
|
|
+ allowRelocations.set(true);
|
|
|
+ updateClusterSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey()));
|
|
|
|
|
|
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
|
|
|
- getTestFileStore(dataNodeName).setTotalSpace(smallestShard.size + WATERMARK_BYTES);
|
|
|
- assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, new ContainsExactlyOneOf<>(smallestShard.shardIds));
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(shardSizes.getSmallestShardSize() + WATERMARK_BYTES);
|
|
|
+ assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, new ContainsExactlyOneOf<>(shardSizes.getSmallestShardIds()));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception {
|
|
|
+ internalCluster().startMasterOnlyNode();
|
|
|
+ internalCluster().startDataOnlyNode();
|
|
|
+ final String dataNodeName = internalCluster().startDataOnlyNode();
|
|
|
+ ensureStableCluster(3);
|
|
|
+
|
|
|
+ assertAcked(
|
|
|
+ clusterAdmin().preparePutRepository("repo")
|
|
|
+ .setType(FsRepository.TYPE)
|
|
|
+ .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
|
|
|
+ );
|
|
|
+
|
|
|
+ final AtomicBoolean allowRelocations = new AtomicBoolean(true);
|
|
|
+ final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
|
|
|
+ internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
|
|
|
+ ClusterInfoServiceUtils.refresh(clusterInfoService);
|
|
|
+ if (allowRelocations.get() == false) {
|
|
|
+ assertThat(numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), equalTo(0));
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
|
|
|
+
|
|
|
+ final String indexName = randomIdentifier();
|
|
|
+ createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
|
|
|
+ var shardSizes = createReasonableSizedShards(indexName);
|
|
|
+
|
|
|
+ final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("repo", "snap")
|
|
|
+ .setWaitForCompletion(true)
|
|
|
+ .get();
|
|
|
+ final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
|
|
|
+ assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards()));
|
|
|
+ assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
|
|
|
+
|
|
|
+ assertAcked(indicesAdmin().prepareDelete(indexName).get());
|
|
|
+ updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
|
|
|
+ allowRelocations.set(false);
|
|
|
+
|
|
|
+ // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated
|
|
|
+ var usableSpace = shardSizes.sizes().get(1).size();
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES + 1L);
|
|
|
+ refreshDiskUsage();
|
|
|
+
|
|
|
+ final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("repo", "snap")
|
|
|
+ .setWaitForCompletion(true)
|
|
|
+ .get();
|
|
|
+ final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
|
|
|
+ assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
|
|
|
+ assertThat(restoreInfo.failedShards(), is(0));
|
|
|
+
|
|
|
+ assertBusyWithDiskUsageRefresh(
|
|
|
+ dataNode0Id,
|
|
|
+ indexName,
|
|
|
+ new ContainsExactlyOneOf<>(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private Set<ShardId> getShardIds(final String nodeId, final String indexName) {
|
|
@@ -178,13 +238,9 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
/**
|
|
|
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the one with the smallest size
|
|
|
*/
|
|
|
- private SmallestShards createReasonableSizedShards(final String indexName) throws InterruptedException {
|
|
|
+ private ShardSizes createReasonableSizedShards(final String indexName) throws InterruptedException {
|
|
|
while (true) {
|
|
|
- final IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[scaledRandomIntBetween(100, 10000)];
|
|
|
- for (int i = 0; i < indexRequestBuilders.length; i++) {
|
|
|
- indexRequestBuilders[i] = prepareIndex(indexName).setSource("field", randomAlphaOfLength(10));
|
|
|
- }
|
|
|
- indexRandom(true, indexRequestBuilders);
|
|
|
+ indexRandom(true, indexName, scaledRandomIntBetween(100, 10000));
|
|
|
forceMerge();
|
|
|
refresh();
|
|
|
|
|
@@ -201,23 +257,36 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
.orElseThrow(() -> new AssertionError("no shards"));
|
|
|
|
|
|
if (smallestShardSize > WATERMARK_BYTES) {
|
|
|
- var smallestShardIds = Arrays.stream(shardStates)
|
|
|
- .filter(it -> it.getStats().getStore().sizeInBytes() == smallestShardSize)
|
|
|
- .map(it -> removeIndexUUID(it.getShardRouting().shardId()))
|
|
|
- .collect(toSet());
|
|
|
-
|
|
|
- logger.info(
|
|
|
- "Created shards with sizes {}",
|
|
|
- Arrays.stream(shardStates)
|
|
|
- .collect(toMap(it -> it.getShardRouting().shardId(), it -> it.getStats().getStore().sizeInBytes()))
|
|
|
- );
|
|
|
-
|
|
|
- return new SmallestShards(smallestShardSize, smallestShardIds);
|
|
|
+ var shardSizes = Arrays.stream(shardStates)
|
|
|
+ .map(it -> new ShardSize(removeIndexUUID(it.getShardRouting().shardId()), it.getStats().getStore().sizeInBytes()))
|
|
|
+ .sorted(Comparator.comparing(ShardSize::size))
|
|
|
+ .toList();
|
|
|
+ logger.info("Created shards with sizes {}", shardSizes);
|
|
|
+ return new ShardSizes(shardSizes);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private record SmallestShards(long size, Set<ShardId> shardIds) {}
|
|
|
+ private record ShardSizes(List<ShardSize> sizes) {
|
|
|
+
|
|
|
+ public long getSmallestShardSize() {
|
|
|
+ return sizes.get(0).size();
|
|
|
+ }
|
|
|
+
|
|
|
+ public Set<ShardId> getShardIdsWithSizeSmallerOrEqual(long size) {
|
|
|
+ return sizes.stream().filter(entry -> entry.size <= size).map(ShardSize::shardId).collect(toSet());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Set<ShardId> getSmallestShardIds() {
|
|
|
+ return getShardIdsWithSizeSmallerOrEqual(getSmallestShardSize());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Set<ShardId> getAllShardIds() {
|
|
|
+ return sizes.stream().map(ShardSize::shardId).collect(toSet());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private record ShardSize(ShardId shardId, long size) {}
|
|
|
|
|
|
private static ShardId removeIndexUUID(ShardId shardId) {
|
|
|
return ShardId.fromString(shardId.toString());
|
|
@@ -246,16 +315,20 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private void assertBusyWithDiskUsageRefresh(String nodeName, String indexName, Matcher<? super Set<ShardId>> matcher) throws Exception {
|
|
|
+ private void assertBusyWithDiskUsageRefresh(String nodeId, String indexName, Matcher<? super Set<ShardId>> matcher) throws Exception {
|
|
|
assertBusy(() -> {
|
|
|
// refresh the master's ClusterInfoService before checking the assigned shards because DiskThresholdMonitor might still
|
|
|
// be processing a previous ClusterInfo update and will skip the new one (see DiskThresholdMonitor#onNewInfo(ClusterInfo)
|
|
|
// and its internal checkInProgress flag)
|
|
|
refreshDiskUsage();
|
|
|
|
|
|
- final Set<ShardId> shardRoutings = getShardIds(nodeName, indexName);
|
|
|
+ final Set<ShardId> shardRoutings = getShardIds(nodeId, indexName);
|
|
|
assertThat("Mismatching shard routings: " + shardRoutings, shardRoutings, matcher);
|
|
|
- }, 30L, TimeUnit.SECONDS);
|
|
|
+ }, 5L, TimeUnit.SECONDS);
|
|
|
+ }
|
|
|
+
|
|
|
+ private InternalClusterInfoService getInternalClusterInfoService() {
|
|
|
+ return (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
|
|
|
}
|
|
|
|
|
|
private static final class ContainsExactlyOneOf<T> extends TypeSafeMatcher<Set<T>> {
|