|
@@ -207,6 +207,7 @@ import static java.util.Collections.emptySet;
|
|
|
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
|
|
|
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
|
|
|
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
|
|
+import static org.hamcrest.Matchers.contains;
|
|
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
import static org.hamcrest.Matchers.either;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
@@ -517,6 +518,92 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testConcurrentSnapshotRestoreAndDeleteOther() {
|
|
|
+ setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
|
|
+
|
|
|
+ String repoName = "repo";
|
|
|
+ String snapshotName = "snapshot";
|
|
|
+ final String index = "test";
|
|
|
+ final int shards = randomIntBetween(1, 10);
|
|
|
+
|
|
|
+ TestClusterNodes.TestClusterNode masterNode =
|
|
|
+ testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
|
|
+
|
|
|
+ final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ final int documentsFirstSnapshot = randomIntBetween(0, 100);
|
|
|
+
|
|
|
+ continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> indexNDocuments(
|
|
|
+ documentsFirstSnapshot, index, () -> client().admin().cluster()
|
|
|
+ .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createSnapshotResponseStepListener)));
|
|
|
+
|
|
|
+ final int documentsSecondSnapshot = randomIntBetween(0, 100);
|
|
|
+
|
|
|
+ final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ final String secondSnapshotName = "snapshot-2";
|
|
|
+ continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> indexNDocuments(
|
|
|
+ documentsSecondSnapshot, index, () -> client().admin().cluster().prepareCreateSnapshot(repoName, secondSnapshotName)
|
|
|
+ .setWaitForCompletion(true).execute(createOtherSnapshotResponseStepListener)));
|
|
|
+
|
|
|
+ final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
|
|
|
+ final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(createOtherSnapshotResponseStepListener,
|
|
|
+ createSnapshotResponse -> {
|
|
|
+ scheduleNow(
|
|
|
+ () -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener));
|
|
|
+ scheduleNow(() -> client().admin().cluster().restoreSnapshot(
|
|
|
+ new RestoreSnapshotRequest(repoName, secondSnapshotName).waitForCompletion(true)
|
|
|
+ .renamePattern("(.+)").renameReplacement("restored_$1"),
|
|
|
+ restoreSnapshotResponseListener));
|
|
|
+ });
|
|
|
+
|
|
|
+ final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
|
|
|
+ continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
|
|
|
+ assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
|
|
|
+ client().search(new SearchRequest("restored_" + index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)),
|
|
|
+ searchResponseListener);
|
|
|
+ });
|
|
|
+
|
|
|
+ deterministicTaskQueue.runAllRunnableTasks();
|
|
|
+
|
|
|
+ assertEquals(documentsFirstSnapshot + documentsSecondSnapshot,
|
|
|
+ Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value);
|
|
|
+ assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true));
|
|
|
+ assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0));
|
|
|
+
|
|
|
+ final Repository repository = masterNode.repositoriesService.repository(repoName);
|
|
|
+ Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
|
|
+ assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId()));
|
|
|
+
|
|
|
+ for (SnapshotId snapshotId : snapshotIds) {
|
|
|
+ final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
|
|
+ assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
|
|
+ assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
|
|
|
+ assertEquals(shards, snapshotInfo.successfulShards());
|
|
|
+ assertEquals(0, snapshotInfo.failedShards());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void indexNDocuments(int documents, String index, Runnable afterIndexing) {
|
|
|
+ if (documents == 0) {
|
|
|
+ afterIndexing.run();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
|
|
+ for (int i = 0; i < documents; ++i) {
|
|
|
+ bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
|
|
|
+ }
|
|
|
+ final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
|
|
|
+ client().bulk(bulkRequest, bulkResponseStepListener);
|
|
|
+ continueOrDie(bulkResponseStepListener, bulkResponse -> {
|
|
|
+ assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
|
|
|
+ assertEquals(documents, bulkResponse.getItems().length);
|
|
|
+ afterIndexing.run();
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
|
|
|
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
|
|
|