|
@@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
|
|
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
|
|
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
|
|
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
|
@@ -116,6 +117,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
|
|
|
import org.elasticsearch.cluster.service.MasterService;
|
|
|
+import org.elasticsearch.common.CheckedConsumer;
|
|
|
import org.elasticsearch.common.Nullable;
|
|
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
|
import org.elasticsearch.common.network.NetworkModule;
|
|
@@ -188,7 +190,6 @@ import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.function.Consumer;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
@@ -247,25 +248,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
String repoName = "repo";
|
|
|
String snapshotName = "snapshot";
|
|
|
final String index = "test";
|
|
|
-
|
|
|
final int shards = randomIntBetween(1, 10);
|
|
|
final int documents = randomIntBetween(0, 100);
|
|
|
|
|
|
- final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
|
|
|
-
|
|
|
final TestClusterNode masterNode =
|
|
|
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
|
|
- masterNode.client.admin().cluster().preparePutRepository(repoName)
|
|
|
- .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
|
|
|
- .execute(createRepositoryListener);
|
|
|
-
|
|
|
- final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
|
|
|
- createRepositoryListener.whenComplete(acknowledgedResponse -> masterNode.client.admin().indices().create(
|
|
|
- new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
|
|
|
- createIndexResponseStepListener), SnapshotResiliencyTests::rethrowAssertion);
|
|
|
|
|
|
final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
|
|
|
- createIndexResponseStepListener.whenComplete(createIndexResponse -> {
|
|
|
+
|
|
|
+ continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
|
|
|
final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
.setWaitForCompletion(true).execute(createSnapshotResponseListener);
|
|
|
if (documents == 0) {
|
|
@@ -277,37 +268,35 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
}
|
|
|
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
|
|
|
masterNode.client.bulk(bulkRequest, bulkResponseStepListener);
|
|
|
- bulkResponseStepListener.whenComplete(bulkResponse -> {
|
|
|
+ continueOrDie(bulkResponseStepListener, bulkResponse -> {
|
|
|
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
|
|
|
assertEquals(documents, bulkResponse.getItems().length);
|
|
|
afterIndexing.run();
|
|
|
- }, SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ });
|
|
|
}
|
|
|
- }, SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ });
|
|
|
|
|
|
final StepListener<AcknowledgedResponse> deleteIndexListener = new StepListener<>();
|
|
|
|
|
|
- createSnapshotResponseListener.whenComplete(
|
|
|
- createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener),
|
|
|
- SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ continueOrDie(createSnapshotResponseListener,
|
|
|
+ createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener));
|
|
|
|
|
|
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
|
|
|
- deleteIndexListener.whenComplete(ignored -> masterNode.client.admin().cluster().restoreSnapshot(
|
|
|
- new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener),
|
|
|
- SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot(
|
|
|
+ new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener));
|
|
|
|
|
|
final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
|
|
|
- restoreSnapshotResponseListener.whenComplete(restoreSnapshotResponse -> {
|
|
|
- assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
|
|
|
- masterNode.client.search(
|
|
|
- new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener);
|
|
|
- }, SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
|
|
|
+ assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
|
|
|
+ masterNode.client.search(
|
|
|
+ new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener);
|
|
|
+ });
|
|
|
|
|
|
final AtomicBoolean documentCountVerified = new AtomicBoolean();
|
|
|
- searchResponseListener.whenComplete(r -> {
|
|
|
+ continueOrDie(searchResponseListener, r -> {
|
|
|
assertEquals(documents, Objects.requireNonNull(r.getHits().getTotalHits()).value);
|
|
|
documentCountVerified.set(true);
|
|
|
- }, SnapshotResiliencyTests::rethrowAssertion);
|
|
|
+ });
|
|
|
|
|
|
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
|
|
|
assertNotNull(createSnapshotResponseListener.result());
|
|
@@ -333,59 +322,45 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
String repoName = "repo";
|
|
|
String snapshotName = "snapshot";
|
|
|
final String index = "test";
|
|
|
-
|
|
|
final int shards = randomIntBetween(1, 10);
|
|
|
|
|
|
TestClusterNode masterNode =
|
|
|
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
|
|
- final AtomicBoolean createdSnapshot = new AtomicBoolean();
|
|
|
|
|
|
- final AdminClient masterAdminClient = masterNode.client.admin();
|
|
|
- masterNode.client.admin().cluster().preparePutRepository(repoName)
|
|
|
- .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
|
|
|
- .execute(
|
|
|
- assertNoFailureListener(
|
|
|
- () -> masterNode.client.admin().indices().create(
|
|
|
- new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
|
|
|
- .settings(defaultIndexSettings(shards)),
|
|
|
- assertNoFailureListener(
|
|
|
- () -> {
|
|
|
- for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
|
|
- scheduleNow(this::disconnectRandomDataNode);
|
|
|
- }
|
|
|
- if (randomBoolean()) {
|
|
|
- scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
- }
|
|
|
- masterAdminClient.cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
- .execute(assertNoFailureListener(() -> {
|
|
|
- for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
|
|
- scheduleNow(this::disconnectOrRestartDataNode);
|
|
|
- }
|
|
|
- final boolean disconnectedMaster = randomBoolean();
|
|
|
- if (disconnectedMaster) {
|
|
|
- scheduleNow(this::disconnectOrRestartMasterNode);
|
|
|
- }
|
|
|
- if (disconnectedMaster || randomBoolean()) {
|
|
|
- scheduleSoon(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
- } else if (randomBoolean()) {
|
|
|
- scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
- }
|
|
|
- createdSnapshot.set(true);
|
|
|
- }));
|
|
|
- }))));
|
|
|
+ final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
|
|
|
|
|
- runUntil(() -> {
|
|
|
- final Optional<TestClusterNode> randomMaster = testClusterNodes.randomMasterNode();
|
|
|
- if (randomMaster.isPresent()) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress = randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
|
- return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty();
|
|
|
+ continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
|
|
|
+ for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
|
|
+ scheduleNow(this::disconnectRandomDataNode);
|
|
|
}
|
|
|
- return false;
|
|
|
- }, TimeUnit.MINUTES.toMillis(1L));
|
|
|
+ if (randomBoolean()) {
|
|
|
+ scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
+ }
|
|
|
+ masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener);
|
|
|
+ });
|
|
|
+
|
|
|
+ continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
|
|
|
+ for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
|
|
|
+ scheduleNow(this::disconnectOrRestartDataNode);
|
|
|
+ }
|
|
|
+ final boolean disconnectedMaster = randomBoolean();
|
|
|
+ if (disconnectedMaster) {
|
|
|
+ scheduleNow(this::disconnectOrRestartMasterNode);
|
|
|
+ }
|
|
|
+ if (disconnectedMaster || randomBoolean()) {
|
|
|
+ scheduleSoon(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
+ } else if (randomBoolean()) {
|
|
|
+ scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
|
+ return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty();
|
|
|
+ }).orElse(false), TimeUnit.MINUTES.toMillis(1L));
|
|
|
|
|
|
clearDisruptionsAndAwaitSync();
|
|
|
|
|
|
- assertTrue(createdSnapshot.get());
|
|
|
final TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
|
|
|
.orElseThrow(() -> new AssertionError("expected to find at least one active master node"));
|
|
|
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
@@ -401,32 +376,32 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
String repoName = "repo";
|
|
|
String snapshotName = "snapshot";
|
|
|
final String index = "test";
|
|
|
-
|
|
|
final int shards = randomIntBetween(1, 10);
|
|
|
|
|
|
TestClusterNode masterNode =
|
|
|
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
|
|
- final AtomicBoolean createdSnapshot = new AtomicBoolean();
|
|
|
- masterNode.client.admin().cluster().preparePutRepository(repoName)
|
|
|
- .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
|
|
|
- .execute(
|
|
|
- assertNoFailureListener(
|
|
|
- () -> masterNode.client.admin().indices().create(
|
|
|
- new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
|
|
|
- .settings(defaultIndexSettings(shards)),
|
|
|
- assertNoFailureListener(
|
|
|
- () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
- .execute(assertNoFailureListener(
|
|
|
- () -> masterNode.client.admin().cluster().deleteSnapshot(
|
|
|
- new DeleteSnapshotRequest(repoName, snapshotName),
|
|
|
- assertNoFailureListener(() -> masterNode.client.admin().cluster()
|
|
|
- .prepareCreateSnapshot(repoName, snapshotName).execute(
|
|
|
- assertNoFailureListener(() -> createdSnapshot.set(true))
|
|
|
- )))))))));
|
|
|
+
|
|
|
+ final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
|
|
|
+ createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
+ .execute(createSnapshotResponseStepListener));
|
|
|
+
|
|
|
+ final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
|
|
|
+ new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener));
|
|
|
+
|
|
|
+ final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
|
|
|
+ .prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
|
|
|
+ continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
|
|
|
+ assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
|
|
|
|
|
|
deterministicTaskQueue.runAllRunnableTasks();
|
|
|
|
|
|
- assertTrue(createdSnapshot.get());
|
|
|
+ assertNotNull(createAnotherSnapshotResponseStepListener.result());
|
|
|
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
|
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
|
|
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
|
@@ -458,69 +433,54 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
|
|
final AtomicBoolean createdSnapshot = new AtomicBoolean();
|
|
|
final AdminClient masterAdminClient = masterNode.client.admin();
|
|
|
- masterAdminClient.cluster().preparePutRepository(repoName)
|
|
|
- .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
|
|
|
- .execute(
|
|
|
- assertNoFailureListener(
|
|
|
- () -> masterAdminClient.indices().create(
|
|
|
- new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
|
|
|
- .settings(defaultIndexSettings(shards)),
|
|
|
- assertNoFailureListener(
|
|
|
- () -> masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener(
|
|
|
- clusterStateResponse -> {
|
|
|
- final ShardRouting shardToRelocate =
|
|
|
- clusterStateResponse.getState().routingTable().allShards(index).get(0);
|
|
|
- final TestClusterNode currentPrimaryNode =
|
|
|
- testClusterNodes.nodeById(shardToRelocate.currentNodeId());
|
|
|
- final TestClusterNode otherNode =
|
|
|
- testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
|
|
|
- final Runnable maybeForceAllocate = new Runnable() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener(
|
|
|
- resp -> {
|
|
|
- final ShardRouting shardRouting = resp.getState().routingTable()
|
|
|
- .shardRoutingTable(shardToRelocate.shardId()).primaryShard();
|
|
|
- if (shardRouting.unassigned()
|
|
|
- && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) {
|
|
|
- if (masterNodeCount > 1) {
|
|
|
- scheduleNow(() -> testClusterNodes.stopNode(masterNode));
|
|
|
- }
|
|
|
- testClusterNodes.randomDataNodeSafe().client.admin().cluster()
|
|
|
- .prepareCreateSnapshot(repoName, snapshotName)
|
|
|
- .execute(ActionListener.wrap(() -> {
|
|
|
- testClusterNodes.randomDataNodeSafe().client.admin().cluster()
|
|
|
- .deleteSnapshot(
|
|
|
- new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
|
|
|
- createdSnapshot.set(true);
|
|
|
- }));
|
|
|
- scheduleNow(
|
|
|
- () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute(
|
|
|
- new ClusterRerouteRequest().add(
|
|
|
- new AllocateEmptyPrimaryAllocationCommand(
|
|
|
- index, shardRouting.shardId().id(), otherNode.node.getName(), true)
|
|
|
- ), noopListener()));
|
|
|
- } else {
|
|
|
- scheduleSoon(this);
|
|
|
- }
|
|
|
- }
|
|
|
- ));
|
|
|
- }
|
|
|
- };
|
|
|
- scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode));
|
|
|
- scheduleNow(maybeForceAllocate);
|
|
|
- }
|
|
|
- ))))));
|
|
|
|
|
|
- runUntil(() -> {
|
|
|
- final Optional<TestClusterNode> randomMaster = testClusterNodes.randomMasterNode();
|
|
|
- if (randomMaster.isPresent()) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress =
|
|
|
- randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
|
- return (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) && createdSnapshot.get();
|
|
|
+ final StepListener<ClusterStateResponse> clusterStateResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
|
|
|
+ createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));
|
|
|
+
|
|
|
+ continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
|
|
|
+ final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
|
|
|
+ final TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
|
|
|
+ final TestClusterNode otherNode = testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
|
|
|
+ scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode));
|
|
|
+ scheduleNow(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ final StepListener<ClusterStateResponse> updatedClusterStateResponseStepListener = new StepListener<>();
|
|
|
+ masterAdminClient.cluster().state(new ClusterStateRequest(), updatedClusterStateResponseStepListener);
|
|
|
+ continueOrDie(updatedClusterStateResponseStepListener, updatedClusterState -> {
|
|
|
+ final ShardRouting shardRouting =
|
|
|
+ updatedClusterState.getState().routingTable().shardRoutingTable(shardToRelocate.shardId()).primaryShard();
|
|
|
+ if (shardRouting.unassigned() && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) {
|
|
|
+ if (masterNodeCount > 1) {
|
|
|
+ scheduleNow(() -> testClusterNodes.stopNode(masterNode));
|
|
|
+ }
|
|
|
+ testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
+ .execute(ActionListener.wrap(() -> {
|
|
|
+ createdSnapshot.set(true);
|
|
|
+ testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
|
|
|
+ new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
|
|
|
+ }));
|
|
|
+ scheduleNow(
|
|
|
+ () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute(
|
|
|
+ new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand(
|
|
|
+ index, shardRouting.shardId().id(), otherNode.node.getName(), true)), noopListener()));
|
|
|
+ } else {
|
|
|
+ scheduleSoon(this);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+
|
|
|
+ runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
|
|
|
+ if (createdSnapshot.get() == false) {
|
|
|
+ return false;
|
|
|
}
|
|
|
- return false;
|
|
|
- }, TimeUnit.MINUTES.toMillis(1L));
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
|
|
+ return snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty();
|
|
|
+ }).orElse(false), TimeUnit.MINUTES.toMillis(1L));
|
|
|
|
|
|
clearDisruptionsAndAwaitSync();
|
|
|
|
|
@@ -533,6 +493,23 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
|
|
|
}
|
|
|
|
|
|
+ private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) {
|
|
|
+ final AdminClient adminClient = masterNode.client.admin();
|
|
|
+
|
|
|
+ final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
|
|
|
+
|
|
|
+ adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE)
|
|
|
+ .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener);
|
|
|
+
|
|
|
+ final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
|
|
|
+
|
|
|
+ continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create(
|
|
|
+ new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
|
|
|
+ createIndexResponseStepListener));
|
|
|
+
|
|
|
+ return createIndexResponseStepListener;
|
|
|
+ }
|
|
|
+
|
|
|
private void clearDisruptionsAndAwaitSync() {
|
|
|
testClusterNodes.clearNetworkDisruptions();
|
|
|
runUntil(() -> {
|
|
@@ -620,48 +597,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0).build();
|
|
|
}
|
|
|
|
|
|
- private static void rethrowAssertion(Exception e) {
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
-
|
|
|
- private static <T> ActionListener<T> assertNoFailureListener(Consumer<T> consumer) {
|
|
|
- return new ActionListener<T>() {
|
|
|
- @Override
|
|
|
- public void onResponse(final T t) {
|
|
|
- consumer.accept(t);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(final Exception e) {
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- private static <T> ActionListener<T> assertNoFailureListener(Runnable r) {
|
|
|
- return new ActionListener<T>() {
|
|
|
- @Override
|
|
|
- public void onResponse(final T t) {
|
|
|
- r.run();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(final Exception e) {
|
|
|
- throw new AssertionError(e);
|
|
|
- }
|
|
|
- };
|
|
|
+ private static <T> void continueOrDie(StepListener<T> listener, CheckedConsumer<T, Exception> onResponse) {
|
|
|
+ listener.whenComplete(onResponse, e -> {
|
|
|
+ throw new AssertionError(e);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private static <T> ActionListener<T> noopListener() {
|
|
|
- return new ActionListener<T>() {
|
|
|
- @Override
|
|
|
- public void onResponse(final T t) {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(final Exception e) {
|
|
|
- }
|
|
|
- };
|
|
|
+ return ActionListener.wrap(() -> {});
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -688,7 +631,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
// LinkedHashMap so we have deterministic ordering when iterating over the map in tests
|
|
|
private final Map<String, TestClusterNode> nodes = new LinkedHashMap<>();
|
|
|
|
|
|
- private DisconnectedNodes disruptedLinks = new DisconnectedNodes();
|
|
|
+ private final DisconnectedNodes disruptedLinks = new DisconnectedNodes();
|
|
|
|
|
|
TestClusterNodes(int masterNodes, int dataNodes) {
|
|
|
for (int i = 0; i < masterNodes; ++i) {
|
|
@@ -967,6 +910,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|
|
new BatchedRerouteService(clusterService, allocationService::reroute),
|
|
|
threadPool
|
|
|
);
|
|
|
+ @SuppressWarnings("rawtypes")
|
|
|
Map<ActionType, TransportAction> actions = new HashMap<>();
|
|
|
actions.put(GlobalCheckpointSyncAction.TYPE,
|
|
|
new GlobalCheckpointSyncAction(settings, transportService, clusterService, indicesService,
|