|
@@ -10,16 +10,22 @@
|
|
|
package org.elasticsearch.index.engine;
|
|
|
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
|
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
|
|
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
|
|
|
+import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
|
|
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
|
|
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
|
|
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
|
|
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
|
|
+import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|
|
+import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.index.IndexNotFoundException;
|
|
|
import org.elasticsearch.indices.IndicesService;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
@@ -28,16 +34,20 @@ import org.junit.BeforeClass;
|
|
|
|
|
|
import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.IntStream;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
+import static org.hamcrest.Matchers.iterableWithSize;
|
|
|
import static org.hamcrest.Matchers.lessThan;
|
|
|
+import static org.hamcrest.Matchers.not;
|
|
|
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
|
|
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
|
|
|
+ private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
|
|
|
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;
|
|
|
|
|
|
@BeforeClass
|
|
@@ -235,6 +245,106 @@ public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
|
|
|
assertAcked(indicesAdmin().prepareDelete(indexName).get());
|
|
|
}
|
|
|
|
|
|
+ public void testRelocationWhileForceMerging() throws Exception {
|
|
|
+ final String node1 = internalCluster().startNode();
|
|
|
+ ensureStableCluster(1);
|
|
|
+ setTotalSpace(node1, Long.MAX_VALUE);
|
|
|
+ String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
|
|
+ createIndex(
|
|
|
+ indexName,
|
|
|
+ Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
|
|
|
+ );
|
|
|
+ // get current disk space usage (for all indices on the node)
|
|
|
+ IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
|
|
|
+ long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
|
|
|
+ // restrict the total disk space such that the next merge does not have sufficient disk space
|
|
|
+ long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
|
|
|
+ setTotalSpace(node1, insufficientTotalDiskSpace);
|
|
|
+ // node stats' FS stats should report that there is insufficient disk space available
|
|
|
+ assertBusy(() -> {
|
|
|
+ NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
|
|
|
+ assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
|
|
|
+ NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
|
|
|
+ assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
|
|
|
+ assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
|
|
|
+ });
|
|
|
+ int indexingRounds = randomIntBetween(5, 10);
|
|
|
+ while (indexingRounds-- > 0) {
|
|
|
+ indexRandom(
|
|
|
+ true,
|
|
|
+ true,
|
|
|
+ true,
|
|
|
+ false,
|
|
|
+ IntStream.range(1, randomIntBetween(5, 10))
|
|
|
+ .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
|
|
|
+ .toList()
|
|
|
+ );
|
|
|
+ }
|
|
|
+ // the max segments argument makes it a blocking call
|
|
|
+ ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
|
|
|
+ .setMaxNumSegments(1)
|
|
|
+ .execute();
|
|
|
+ ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
|
|
|
+ .getThreadPoolMergeExecutorService();
|
|
|
+ assertBusy(() -> {
|
|
|
+ // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
|
|
|
+ assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
|
|
|
+ assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
|
|
|
+ // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
|
|
|
+ IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
|
|
|
+ long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
|
|
|
+ assertThat(currentMergeCount, equalTo(0L));
|
|
|
+ });
|
|
|
+ // the force merge call is still blocked
|
|
|
+ assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
|
|
|
+ assertFalse(forceMergeBeforeRelocationFuture.isDone());
|
|
|
+ // merge executor still confirms merging is blocked due to insufficient disk space
|
|
|
+ assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
|
|
|
+ IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
|
|
|
+ // the index should have more than 1 segments at this stage
|
|
|
+ assertThat(
|
|
|
+ indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
|
|
|
+ iterableWithSize(greaterThan(1))
|
|
|
+ );
|
|
|
+ // start another node
|
|
|
+ final String node2 = internalCluster().startNode();
|
|
|
+ ensureStableCluster(2);
|
|
|
+ setTotalSpace(node2, Long.MAX_VALUE);
|
|
|
+ // relocate the shard from node1 to node2
|
|
|
+ ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2));
|
|
|
+ ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
|
|
|
+ .setWaitForEvents(Priority.LANGUID)
|
|
|
+ .setWaitForNoRelocatingShards(true)
|
|
|
+ .setTimeout(ACCEPTABLE_RELOCATION_TIME)
|
|
|
+ .get();
|
|
|
+ assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
|
|
+ // the force merge call is now unblocked
|
|
|
+ assertBusy(() -> {
|
|
|
+ assertTrue(forceMergeBeforeRelocationFuture.isDone());
|
|
|
+ assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
|
|
|
+ });
|
|
|
+ // there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
|
|
|
+ // so let's trigger a force merge to 1 segment again (this one should succeed promptly)
|
|
|
+ indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
|
|
|
+ IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
|
|
|
+ // assert there's only one segment now
|
|
|
+ assertThat(
|
|
|
+ indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
|
|
|
+ iterableWithSize(1)
|
|
|
+ );
|
|
|
+ // also assert that the shard was indeed moved to a different node
|
|
|
+ assertThat(
|
|
|
+ indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
|
|
|
+ .currentNodeId(),
|
|
|
+ not(
|
|
|
+ equalTo(
|
|
|
+ indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
|
|
|
+ .currentNodeId()
|
|
|
+ )
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
public void setTotalSpace(String dataNodeName, long totalSpace) {
|
|
|
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
|
|
|
refreshClusterInfo();
|