|
|
@@ -6,11 +6,15 @@
|
|
|
|
|
|
package org.elasticsearch.xpack.slm;
|
|
|
|
|
|
+import org.elasticsearch.action.ActionFuture;
|
|
|
+import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
|
|
+import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
|
|
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
@@ -18,7 +22,9 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.repositories.RepositoriesService;
|
|
|
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
|
|
|
+import org.elasticsearch.snapshots.SnapshotInfo;
|
|
|
import org.elasticsearch.snapshots.SnapshotMissingException;
|
|
|
+import org.elasticsearch.snapshots.SnapshotState;
|
|
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
|
|
@@ -38,8 +44,13 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
+import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
import static org.hamcrest.Matchers.anyOf;
|
|
|
+import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.greaterThan;
|
|
|
|
|
|
@@ -231,6 +242,137 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testBasicFailureRetention() throws Exception {
|
|
|
+ final String indexName = "test-idx";
|
|
|
+ final String policyId = "test-policy";
|
|
|
+ // Setup
|
|
|
+ logger.info("--> starting two master nodes and two data nodes");
|
|
|
+ internalCluster().startMasterOnlyNodes(2);
|
|
|
+ internalCluster().startDataOnlyNodes(2);
|
|
|
+
|
|
|
+ createAndPopulateIndex(indexName);
|
|
|
+
|
|
|
+ // Create a snapshot repo
|
|
|
+ initializeRepo(REPO);
|
|
|
+
|
|
|
+ createSnapshotPolicy(policyId, "snap", "1 2 3 4 5 ?", REPO, indexName, true,
|
|
|
+ new SnapshotRetentionConfiguration(null, 1, 2));
|
|
|
+
|
|
|
+ // Create a failed snapshot
|
|
|
+ AtomicReference<String> failedSnapshotName = new AtomicReference<>();
|
|
|
+ {
|
|
|
+ logger.info("--> stopping random data node, which should cause shards to go missing");
|
|
|
+ internalCluster().stopRandomDataNode();
|
|
|
+ assertBusy(() ->
|
|
|
+ assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
|
|
+ 30, TimeUnit.SECONDS);
|
|
|
+
|
|
|
+ final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(REPO);
|
|
|
+
|
|
|
+ logger.info("--> start snapshot");
|
|
|
+ ActionFuture<ExecuteSnapshotLifecycleAction.Response> snapshotFuture = client()
|
|
|
+ .execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId));
|
|
|
+
|
|
|
+ logger.info("--> waiting for block to kick in on " + masterNode);
|
|
|
+ waitForBlock(masterNode, REPO, TimeValue.timeValueSeconds(60));
|
|
|
+
|
|
|
+ logger.info("--> stopping master node");
|
|
|
+ internalCluster().stopCurrentMasterNode();
|
|
|
+
|
|
|
+ logger.info("--> wait until the snapshot is done");
|
|
|
+ failedSnapshotName.set(snapshotFuture.get().getSnapshotName());
|
|
|
+ assertNotNull(failedSnapshotName.get());
|
|
|
+
|
|
|
+ logger.info("--> verify that snapshot [{}] failed", failedSnapshotName.get());
|
|
|
+ assertBusy(() -> {
|
|
|
+ GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
|
|
+ .prepareGetSnapshots(REPO).setSnapshots(failedSnapshotName.get()).get();
|
|
|
+ SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0);
|
|
|
+ assertEquals(SnapshotState.FAILED, snapshotInfo.state());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run retention - we'll check the results later to make sure it's had time to run.
|
|
|
+ {
|
|
|
+ logger.info("--> executing SLM retention");
|
|
|
+ assertAcked(client().execute(ExecuteSnapshotRetentionAction.INSTANCE, new ExecuteSnapshotRetentionAction.Request()).get());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Take a successful snapshot
|
|
|
+ AtomicReference<String> successfulSnapshotName = new AtomicReference<>();
|
|
|
+ {
|
|
|
+ logger.info("--> deleting old index [{}], as it is now missing shards", indexName);
|
|
|
+ assertAcked(client().admin().indices().prepareDelete(indexName).get());
|
|
|
+ createAndPopulateIndex(indexName);
|
|
|
+
|
|
|
+ logger.info("--> unblocking snapshots");
|
|
|
+ unblockRepo(REPO);
|
|
|
+ unblockAllDataNodes(REPO);
|
|
|
+
|
|
|
+ logger.info("--> taking new snapshot");
|
|
|
+
|
|
|
+ ActionFuture<ExecuteSnapshotLifecycleAction.Response> snapshotResponse = client()
|
|
|
+ .execute(ExecuteSnapshotLifecycleAction.INSTANCE, new ExecuteSnapshotLifecycleAction.Request(policyId));
|
|
|
+ logger.info("--> waiting for snapshot to complete");
|
|
|
+ successfulSnapshotName.set(snapshotResponse.get().getSnapshotName());
|
|
|
+ assertNotNull(successfulSnapshotName.get());
|
|
|
+ Thread.sleep(TimeValue.timeValueSeconds(10).millis());
|
|
|
+ logger.info("--> verify that snapshot [{}] succeeded", successfulSnapshotName.get());
|
|
|
+ assertBusy(() -> {
|
|
|
+ GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
|
|
+ .prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).get();
|
|
|
+ SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0);
|
|
|
+ assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check that the failed snapshot from before still exists, now that retention has run
|
|
|
+ {
|
|
|
+ logger.info("--> verify that snapshot [{}] still exists", failedSnapshotName.get());
|
|
|
+ GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
|
|
+ .prepareGetSnapshots(REPO).setSnapshots(failedSnapshotName.get()).get();
|
|
|
+ SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0);
|
|
|
+ assertEquals(SnapshotState.FAILED, snapshotInfo.state());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Run retention again and make sure the failure was deleted
|
|
|
+ {
|
|
|
+ logger.info("--> executing SLM retention");
|
|
|
+ assertAcked(client().execute(ExecuteSnapshotRetentionAction.INSTANCE, new ExecuteSnapshotRetentionAction.Request()).get());
|
|
|
+ logger.info("--> waiting for failed snapshot [{}] to be deleted", failedSnapshotName.get());
|
|
|
+ assertBusy(() -> {
|
|
|
+ try {
|
|
|
+ GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
|
|
+ .prepareGetSnapshots(REPO).setSnapshots(failedSnapshotName.get()).get();
|
|
|
+ assertThat(snapshotsStatusResponse.getSnapshots(REPO), empty());
|
|
|
+ } catch (SnapshotMissingException e) {
|
|
|
+ // This is what we want to happen
|
|
|
+ }
|
|
|
+ logger.info("--> failed snapshot [{}] has been deleted, checking successful snapshot [{}] still exists",
|
|
|
+ failedSnapshotName.get(), successfulSnapshotName.get());
|
|
|
+ GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
|
|
+ .prepareGetSnapshots(REPO).setSnapshots(successfulSnapshotName.get()).get();
|
|
|
+ SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots(REPO).get(0);
|
|
|
+ assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createAndPopulateIndex(String indexName) throws InterruptedException {
|
|
|
+ logger.info("--> creating and populating index [{}]", indexName);
|
|
|
+ assertAcked(prepareCreate(indexName, 0, Settings.builder()
|
|
|
+ .put("number_of_shards", 6).put("number_of_replicas", 0)));
|
|
|
+ ensureGreen();
|
|
|
+
|
|
|
+ final int numdocs = randomIntBetween(50, 100);
|
|
|
+ IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
|
|
|
+ for (int i = 0; i < builders.length; i++) {
|
|
|
+ builders[i] = client().prepareIndex(indexName, SINGLE_MAPPING_NAME, Integer.toString(i)).setSource("field1", "bar " + i);
|
|
|
+ }
|
|
|
+ indexRandom(true, builders);
|
|
|
+ flushAndRefresh();
|
|
|
+ }
|
|
|
+
|
|
|
private void initializeRepo(String repoName) {
|
|
|
client().admin().cluster().preparePutRepository(repoName)
|
|
|
.setType("mock")
|
|
|
@@ -314,4 +456,17 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
|
|
|
((MockRepository)repositoriesService.repository(repository)).unblock();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
|
|
|
+ MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
|
|
|
+ while (System.currentTimeMillis() - start < timeout.millis()) {
|
|
|
+ if (mockRepository.blocked()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Thread.sleep(100);
|
|
|
+ }
|
|
|
+ fail("Timeout waiting for node [" + node + "] to be blocked");
|
|
|
+ }
|
|
|
}
|