|
@@ -19,6 +19,7 @@
|
|
|
package org.elasticsearch.repositories.blobstore;
|
|
|
|
|
|
import org.elasticsearch.action.ActionRunnable;
|
|
|
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
|
import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
@@ -26,13 +27,16 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.repositories.RepositoriesService;
|
|
|
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
|
|
|
+import org.elasticsearch.snapshots.SnapshotState;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
|
|
+import static org.hamcrest.Matchers.is;
|
|
|
|
|
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
|
|
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
|
|
@@ -107,4 +111,40 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
|
|
|
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
|
|
|
return masterNode;
|
|
|
}
|
|
|
+
|
|
|
+ public void testCleanupOldIndexN() throws ExecutionException, InterruptedException {
|
|
|
+ internalCluster().startNodes(Settings.EMPTY);
|
|
|
+
|
|
|
+ final String repoName = "test-repo";
|
|
|
+ logger.info("--> creating repository");
|
|
|
+ assertAcked(client().admin().cluster().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder()
|
|
|
+ .put("location", randomRepoPath())
|
|
|
+ .put("compress", randomBoolean())
|
|
|
+ .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
|
|
+
|
|
|
+ logger.info("--> create three snapshots");
|
|
|
+ for (int i = 0; i < 3; ++i) {
|
|
|
+ CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap-" + i)
|
|
|
+ .setWaitForCompletion(true).get();
|
|
|
+ assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
|
|
|
+ }
|
|
|
+
|
|
|
+ final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
|
|
+ final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
|
|
|
+
|
|
|
+ logger.info("--> write two outdated index-N blobs");
|
|
|
+ for (int i = 0; i < 2; ++i) {
|
|
|
+ final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();
|
|
|
+ final int generation = i;
|
|
|
+ repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore()
|
|
|
+ .blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation,
|
|
|
+ new ByteArrayInputStream(new byte[1]), 1, true)));
|
|
|
+ createOldIndexNFuture.get();
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("--> cleanup repository");
|
|
|
+ client().admin().cluster().prepareCleanupRepository(repoName).get();
|
|
|
+
|
|
|
+ BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
|
|
|
+ }
|
|
|
}
|