|
@@ -19,15 +19,12 @@
|
|
|
|
|
|
package org.elasticsearch.cluster.routing.allocation.decider;
|
|
|
|
|
|
-import org.apache.lucene.mockfile.FilterFileStore;
|
|
|
-import org.apache.lucene.mockfile.FilterFileSystemProvider;
|
|
|
-import org.apache.lucene.mockfile.FilterPath;
|
|
|
-import org.apache.lucene.util.Constants;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.cluster.ClusterInfoService;
|
|
|
+import org.elasticsearch.cluster.DiskUsageIntegTestCase;
|
|
|
import org.elasticsearch.cluster.InternalClusterInfoService;
|
|
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
@@ -37,95 +34,40 @@ import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Rebalance;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
-import org.elasticsearch.common.io.PathUtils;
|
|
|
-import org.elasticsearch.common.io.PathUtilsForTesting;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
-import org.elasticsearch.core.internal.io.IOUtils;
|
|
|
-import org.elasticsearch.env.Environment;
|
|
|
import org.elasticsearch.env.NodeEnvironment;
|
|
|
-import org.elasticsearch.monitor.fs.FsService;
|
|
|
-import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.repositories.fs.FsRepository;
|
|
|
import org.elasticsearch.snapshots.RestoreInfo;
|
|
|
import org.elasticsearch.snapshots.SnapshotInfo;
|
|
|
import org.elasticsearch.snapshots.SnapshotState;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
-import org.elasticsearch.test.InternalSettingsPlugin;
|
|
|
import org.hamcrest.Matcher;
|
|
|
-import org.junit.After;
|
|
|
-import org.junit.Before;
|
|
|
-
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.io.IOException;
|
|
|
-import java.nio.file.DirectoryStream;
|
|
|
-import java.nio.file.FileStore;
|
|
|
-import java.nio.file.FileSystem;
|
|
|
-import java.nio.file.Files;
|
|
|
-import java.nio.file.NoSuchFileException;
|
|
|
-import java.nio.file.NotDirectoryException;
|
|
|
-import java.nio.file.Path;
|
|
|
+
|
|
|
import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
import java.util.Locale;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.stream.Collectors;
|
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
-import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
|
|
import static org.elasticsearch.index.store.Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
-import static org.hamcrest.Matchers.anyOf;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
-import static org.hamcrest.Matchers.greaterThan;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
|
|
-public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
-
|
|
|
- private static TestFileSystemProvider fileSystemProvider;
|
|
|
-
|
|
|
- private FileSystem defaultFileSystem;
|
|
|
-
|
|
|
- @Before
|
|
|
- public void installFilesystemProvider() {
|
|
|
- assertNull(defaultFileSystem);
|
|
|
- defaultFileSystem = PathUtils.getDefaultFileSystem();
|
|
|
- assertNull(fileSystemProvider);
|
|
|
- fileSystemProvider = new TestFileSystemProvider(defaultFileSystem, createTempDir());
|
|
|
- PathUtilsForTesting.installMock(fileSystemProvider.getFileSystem(null));
|
|
|
- }
|
|
|
-
|
|
|
- @After
|
|
|
- public void removeFilesystemProvider() {
|
|
|
- fileSystemProvider = null;
|
|
|
- assertNotNull(defaultFileSystem);
|
|
|
- PathUtilsForTesting.installMock(defaultFileSystem); // set the default filesystem back
|
|
|
- defaultFileSystem = null;
|
|
|
- }
|
|
|
+public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
|
|
|
|
|
|
private static final long WATERMARK_BYTES = new ByteSizeValue(10, ByteSizeUnit.KB).getBytes();
|
|
|
|
|
|
@Override
|
|
|
protected Settings nodeSettings(int nodeOrdinal) {
|
|
|
- final Path dataPath = fileSystemProvider.getRootDir().resolve("node-" + nodeOrdinal);
|
|
|
- try {
|
|
|
- Files.createDirectories(dataPath);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new AssertionError("unexpected", e);
|
|
|
- }
|
|
|
- fileSystemProvider.addTrackedPath(dataPath);
|
|
|
return Settings.builder()
|
|
|
.put(super.nodeSettings(nodeOrdinal))
|
|
|
- .put(Environment.PATH_DATA_SETTING.getKey(), dataPath)
|
|
|
- .put(FsService.ALWAYS_REFRESH_SETTING.getKey(), true)
|
|
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), WATERMARK_BYTES + "b")
|
|
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), WATERMARK_BYTES + "b")
|
|
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
|
|
@@ -133,11 +75,6 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
|
- return List.of(InternalSettingsPlugin.class);
|
|
|
- }
|
|
|
-
|
|
|
public void testHighWatermarkNotExceeded() throws Exception {
|
|
|
internalCluster().startMasterOnlyNode();
|
|
|
internalCluster().startDataOnlyNode();
|
|
@@ -149,7 +86,6 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
|
|
|
|
|
|
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
|
|
|
- final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
|
|
|
|
|
|
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
|
|
createIndex(indexName, Settings.builder()
|
|
@@ -161,11 +97,11 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
|
|
|
// reduce disk size of node 0 so that no shards fit below the high watermark, forcing all shards onto the other data node
|
|
|
// (subtract the translog size since the disk threshold decider ignores this and may therefore move the shard back again)
|
|
|
- fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
|
|
|
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, empty());
|
|
|
|
|
|
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
|
|
|
- fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
|
|
|
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
|
|
|
}
|
|
|
|
|
@@ -186,7 +122,6 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> clusterInfoService.refresh());
|
|
|
|
|
|
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
|
|
|
- final Path dataNode0Path = internalCluster().getInstance(Environment.class, dataNodeName).dataFiles()[0];
|
|
|
|
|
|
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
|
|
createIndex(indexName, Settings.builder()
|
|
@@ -205,7 +140,7 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
assertAcked(client().admin().indices().prepareDelete(indexName).get());
|
|
|
|
|
|
// reduce disk size of node 0 so that no shards fit below the low watermark, forcing shards to be assigned to the other data node
|
|
|
- fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(minShardSize + WATERMARK_BYTES - 1L);
|
|
|
refreshDiskUsage();
|
|
|
|
|
|
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
|
@@ -229,7 +164,7 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
.get());
|
|
|
|
|
|
// increase disk size of node 0 to allow just enough room for one shard, and check that it's rebalanced back
|
|
|
- fileSystemProvider.getTestFileStore(dataNode0Path).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
|
|
|
+ getTestFileStore(dataNodeName).setTotalSpace(minShardSize + WATERMARK_BYTES + 1L);
|
|
|
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, hasSize(1));
|
|
|
}
|
|
|
|
|
@@ -307,132 +242,4 @@ public class DiskThresholdDeciderIT extends ESIntegTestCase {
|
|
|
assertThat("Mismatching shard routings: " + shardRoutings, shardRoutings, matcher);
|
|
|
}, 30L, TimeUnit.SECONDS);
|
|
|
}
|
|
|
-
|
|
|
- private static class TestFileStore extends FilterFileStore {
|
|
|
-
|
|
|
- private final Path path;
|
|
|
-
|
|
|
- private volatile long totalSpace = -1;
|
|
|
-
|
|
|
- TestFileStore(FileStore delegate, String scheme, Path path) {
|
|
|
- super(delegate, scheme);
|
|
|
- this.path = path;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return "fake"; // Lucene's is-spinning-disk check expects the device name here
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long getTotalSpace() throws IOException {
|
|
|
- final long totalSpace = this.totalSpace;
|
|
|
- if (totalSpace == -1) {
|
|
|
- return super.getTotalSpace();
|
|
|
- } else {
|
|
|
- return totalSpace;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void setTotalSpace(long totalSpace) {
|
|
|
- assertThat(totalSpace, anyOf(is(-1L), greaterThan(0L)));
|
|
|
- this.totalSpace = totalSpace;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long getUsableSpace() throws IOException {
|
|
|
- final long totalSpace = this.totalSpace;
|
|
|
- if (totalSpace == -1) {
|
|
|
- return super.getUsableSpace();
|
|
|
- } else {
|
|
|
- return Math.max(0L, totalSpace - getTotalFileSize(path));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long getUnallocatedSpace() throws IOException {
|
|
|
- final long totalSpace = this.totalSpace;
|
|
|
- if (totalSpace == -1) {
|
|
|
- return super.getUnallocatedSpace();
|
|
|
- } else {
|
|
|
- return Math.max(0L, totalSpace - getTotalFileSize(path));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static long getTotalFileSize(Path path) throws IOException {
|
|
|
- if (Files.isRegularFile(path)) {
|
|
|
- try {
|
|
|
- return Files.size(path);
|
|
|
- } catch (NoSuchFileException | FileNotFoundException e) {
|
|
|
- // probably removed
|
|
|
- return 0L;
|
|
|
- }
|
|
|
- } else if (path.getFileName().toString().equals("_state") || path.getFileName().toString().equals("translog")) {
|
|
|
- // ignore metadata and translog, since the disk threshold decider only cares about the store size
|
|
|
- return 0L;
|
|
|
- } else {
|
|
|
- try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(path)) {
|
|
|
- long total = 0L;
|
|
|
- for (Path subpath : directoryStream) {
|
|
|
- total += getTotalFileSize(subpath);
|
|
|
- }
|
|
|
- return total;
|
|
|
- } catch (NotDirectoryException | NoSuchFileException | FileNotFoundException e) {
|
|
|
- // probably removed
|
|
|
- return 0L;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private static class TestFileSystemProvider extends FilterFileSystemProvider {
|
|
|
- private final Map<Path, TestFileStore> trackedPaths = newConcurrentMap();
|
|
|
- private final Path rootDir;
|
|
|
-
|
|
|
- TestFileSystemProvider(FileSystem delegateInstance, Path rootDir) {
|
|
|
- super("diskthreshold://", delegateInstance);
|
|
|
- this.rootDir = new FilterPath(rootDir, fileSystem);
|
|
|
- }
|
|
|
-
|
|
|
- Path getRootDir() {
|
|
|
- return rootDir;
|
|
|
- }
|
|
|
-
|
|
|
- void addTrackedPath(Path path) {
|
|
|
- assertTrue(path + " starts with " + rootDir, path.startsWith(rootDir));
|
|
|
- final FileStore fileStore;
|
|
|
- try {
|
|
|
- fileStore = super.getFileStore(path);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new AssertionError("unexpected", e);
|
|
|
- }
|
|
|
- assertNull(trackedPaths.put(path, new TestFileStore(fileStore, getScheme(), path)));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public FileStore getFileStore(Path path) {
|
|
|
- return getTestFileStore(path);
|
|
|
- }
|
|
|
-
|
|
|
- TestFileStore getTestFileStore(Path path) {
|
|
|
- final TestFileStore fileStore = trackedPaths.get(path);
|
|
|
- if (fileStore != null) {
|
|
|
- return fileStore;
|
|
|
- }
|
|
|
-
|
|
|
- // On Linux, and only Linux, Lucene obtains a filestore for the index in order to determine whether it's on a spinning disk or
|
|
|
- // not so it can configure the merge scheduler accordingly
|
|
|
- assertTrue(path + " not tracked and not on Linux", Constants.LINUX);
|
|
|
- final Set<Path> containingPaths = trackedPaths.keySet().stream().filter(path::startsWith).collect(Collectors.toSet());
|
|
|
- assertThat(path + " not contained in a unique tracked path", containingPaths, hasSize(1));
|
|
|
- return trackedPaths.get(containingPaths.iterator().next());
|
|
|
- }
|
|
|
-
|
|
|
- void clearTrackedPaths() throws IOException {
|
|
|
- for (Path path : trackedPaths.keySet()) {
|
|
|
- IOUtils.rm(path);
|
|
|
- }
|
|
|
- trackedPaths.clear();
|
|
|
- }
|
|
|
- }
|
|
|
}
|