|
|
@@ -18,6 +18,7 @@
|
|
|
*/
|
|
|
package org.elasticsearch.discovery;
|
|
|
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionFuture;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
|
|
@@ -27,11 +28,14 @@ import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
|
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
|
|
+import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
|
|
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
|
|
|
+import org.elasticsearch.snapshots.SnapshotException;
|
|
|
import org.elasticsearch.snapshots.SnapshotInfo;
|
|
|
import org.elasticsearch.snapshots.SnapshotMissingException;
|
|
|
import org.elasticsearch.snapshots.SnapshotState;
|
|
|
@@ -49,6 +53,9 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.hamcrest.Matchers.either;
|
|
|
+import static org.hamcrest.Matchers.endsWith;
|
|
|
+import static org.hamcrest.Matchers.is;
|
|
|
|
|
|
/**
|
|
|
* Tests snapshot operations during disruptions.
|
|
|
@@ -156,6 +163,95 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
|
|
assertAllSnapshotsCompleted();
|
|
|
}
|
|
|
|
|
|
+ public void testDisruptionAfterFinalization() throws Exception {
|
|
|
+ final String idxName = "test";
|
|
|
+ final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
|
|
|
+ final String dataNode = internalCluster().startDataOnlyNode();
|
|
|
+ ensureStableCluster(4);
|
|
|
+
|
|
|
+ createRandomIndex(idxName);
|
|
|
+
|
|
|
+ logger.info("--> creating repository");
|
|
|
+ assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
|
|
+ .setType("fs").setSettings(Settings.builder()
|
|
|
+ .put("location", randomRepoPath())
|
|
|
+ .put("compress", randomBoolean())
|
|
|
+ .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
|
|
+
|
|
|
+ final String masterNode1 = internalCluster().getMasterName();
|
|
|
+ Set<String> otherNodes = new HashSet<>(allMasterEligibleNodes);
|
|
|
+ otherNodes.remove(masterNode1);
|
|
|
+ otherNodes.add(dataNode);
|
|
|
+
|
|
|
+ NetworkDisruption networkDisruption =
|
|
|
+ new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
|
|
|
+ new NetworkDisruption.NetworkUnresponsive());
|
|
|
+ internalCluster().setDisruptionScheme(networkDisruption);
|
|
|
+
|
|
|
+ ClusterService clusterService = internalCluster().clusterService(masterNode1);
|
|
|
+ CountDownLatch disruptionStarted = new CountDownLatch(1);
|
|
|
+ clusterService.addListener(new ClusterStateListener() {
|
|
|
+ @Override
|
|
|
+ public void clusterChanged(ClusterChangedEvent event) {
|
|
|
+ SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
|
|
|
+ if (snapshots != null && snapshots.entries().size() > 0) {
|
|
|
+ final SnapshotsInProgress.Entry snapshotEntry = snapshots.entries().get(0);
|
|
|
+ if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
|
|
|
+ final RepositoriesMetaData repoMeta =
|
|
|
+ event.state().metaData().custom(RepositoriesMetaData.TYPE);
|
|
|
+ final RepositoryMetaData metaData = repoMeta.repository("test-repo");
|
|
|
+ if (metaData.generation() == metaData.pendingGeneration()
|
|
|
+ && metaData.generation() > snapshotEntry.repositoryStateId()) {
|
|
|
+ logger.info("--> starting disruption");
|
|
|
+ networkDisruption.startDisrupting();
|
|
|
+ clusterService.removeListener(this);
|
|
|
+ disruptionStarted.countDown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ final String snapshot = "test-snap";
|
|
|
+
|
|
|
+ logger.info("--> starting snapshot");
|
|
|
+ ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
|
|
|
+ .prepareCreateSnapshot("test-repo", snapshot).setWaitForCompletion(true)
|
|
|
+ .setIndices(idxName).execute();
|
|
|
+
|
|
|
+ logger.info("--> waiting for disruption to start");
|
|
|
+ assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
|
|
|
+
|
|
|
+ assertAllSnapshotsCompleted();
|
|
|
+
|
|
|
+ logger.info("--> verify that snapshot was successful or no longer exist");
|
|
|
+ assertBusy(() -> {
|
|
|
+ try {
|
|
|
+ assertSnapshotExists("test-repo", snapshot);
|
|
|
+ } catch (SnapshotMissingException exception) {
|
|
|
+ logger.info("--> done verifying, snapshot doesn't exist");
|
|
|
+ }
|
|
|
+ }, 1, TimeUnit.MINUTES);
|
|
|
+
|
|
|
+ logger.info("--> stopping disrupting");
|
|
|
+ networkDisruption.stopDisrupting();
|
|
|
+ ensureStableCluster(4, masterNode1);
|
|
|
+ logger.info("--> done");
|
|
|
+
|
|
|
+ try {
|
|
|
+ future.get();
|
|
|
+ fail("Should have failed because the node disconnected from the cluster during snapshot finalization");
|
|
|
+ } catch (Exception ex) {
|
|
|
+ final SnapshotException sne = (SnapshotException) ExceptionsHelper.unwrap(ex, SnapshotException.class);
|
|
|
+ assertNotNull(sne);
|
|
|
+ assertThat(
|
|
|
+ sne.getMessage(), either(endsWith(" Failed to remove snapshot from cluster state")).or(endsWith(" no longer master")));
|
|
|
+ assertThat(sne.getSnapshotName(), is(snapshot));
|
|
|
+ }
|
|
|
+
|
|
|
+ assertAllSnapshotsCompleted();
|
|
|
+ }
|
|
|
+
|
|
|
private void assertAllSnapshotsCompleted() throws Exception {
|
|
|
logger.info("--> wait until the snapshot is done");
|
|
|
assertBusy(() -> {
|