|
@@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
|
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
|
|
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
|
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
|
|
import org.elasticsearch.action.support.ActiveShardCount;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
@@ -40,6 +41,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
|
import org.elasticsearch.cluster.NamedDiff;
|
|
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
|
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|
|
+import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
|
@@ -61,6 +63,9 @@ import org.elasticsearch.common.util.set.Sets;
|
|
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
|
import org.elasticsearch.common.xcontent.XContentParser;
|
|
|
import org.elasticsearch.env.Environment;
|
|
|
+import org.elasticsearch.index.seqno.RetentionLeaseActions;
|
|
|
+import org.elasticsearch.index.seqno.RetentionLeases;
|
|
|
+import org.elasticsearch.index.shard.ShardId;
|
|
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
|
|
import org.elasticsearch.node.Node;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
@@ -94,12 +99,15 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.List;
|
|
|
+import java.util.Locale;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
+import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
|
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
|
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
|
|
import static org.hamcrest.Matchers.allOf;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
@@ -1227,6 +1235,79 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|
|
}, 60L, TimeUnit.SECONDS);
|
|
|
}
|
|
|
|
|
|
+ public void testRetentionLeasesClearedOnRestore() throws Exception {
|
|
|
+ final String repoName = "test-repo-retention-leases";
|
|
|
+ assertAcked(client().admin().cluster().preparePutRepository(repoName)
|
|
|
+ .setType("fs")
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put("location", randomRepoPath())
|
|
|
+ .put("compress", randomBoolean())));
|
|
|
+
|
|
|
+ final String indexName = "index-retention-leases";
|
|
|
+ final int shardCount = randomIntBetween(1, 5);
|
|
|
+ assertAcked(client().admin().indices().prepareCreate(indexName)
|
|
|
+ .setSettings(Settings.builder()
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)
|
|
|
+ .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
|
|
|
+ .get());
|
|
|
+ final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
|
|
|
+
|
|
|
+ final int snapshotDocCount = iterations(10, 1000);
|
|
|
+ logger.debug("--> indexing {} docs into {}", snapshotDocCount, indexName);
|
|
|
+ IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[snapshotDocCount];
|
|
|
+ for (int i = 0; i < snapshotDocCount; i++) {
|
|
|
+ indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value");
|
|
|
+ }
|
|
|
+ indexRandom(true, indexRequestBuilders);
|
|
|
+ assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
|
|
|
+
|
|
|
+ final String leaseId = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
|
|
+ logger.debug("--> adding retention lease with id {} to {}", leaseId, shardId);
|
|
|
+ client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
|
|
|
+ shardId, leaseId, RETAIN_ALL, "test")).actionGet();
|
|
|
+
|
|
|
+ final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
|
|
|
+ .filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
|
|
|
+ final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
|
|
|
+ assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
|
|
|
+
|
|
|
+ final String snapshotName = "snapshot-retention-leases";
|
|
|
+ logger.debug("--> create snapshot {}:{}", repoName, snapshotName);
|
|
|
+ CreateSnapshotResponse createResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
|
|
+ .setWaitForCompletion(true).setIndices(indexName).get();
|
|
|
+ assertThat(createResponse.getSnapshotInfo().successfulShards(), equalTo(shardCount));
|
|
|
+ assertThat(createResponse.getSnapshotInfo().failedShards(), equalTo(0));
|
|
|
+
|
|
|
+ if (randomBoolean()) {
|
|
|
+ final int extraDocCount = iterations(10, 1000);
|
|
|
+ logger.debug("--> indexing {} extra docs into {}", extraDocCount, indexName);
|
|
|
+ indexRequestBuilders = new IndexRequestBuilder[extraDocCount];
|
|
|
+ for (int i = 0; i < extraDocCount; i++) {
|
|
|
+ indexRequestBuilders[i] = client().prepareIndex(indexName, "_doc").setSource("field", "value");
|
|
|
+ }
|
|
|
+ indexRandom(true, indexRequestBuilders);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for green so the close does not fail in the edge case of coinciding with a shard recovery that hasn't fully synced yet
|
|
|
+ ensureGreen();
|
|
|
+ logger.debug("--> close index {}", indexName);
|
|
|
+ assertAcked(client().admin().indices().prepareClose(indexName));
|
|
|
+
|
|
|
+ logger.debug("--> restore index {} from snapshot", indexName);
|
|
|
+ RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
|
|
|
+ .setWaitForCompletion(true).get();
|
|
|
+ assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(shardCount));
|
|
|
+ assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0));
|
|
|
+
|
|
|
+ ensureGreen();
|
|
|
+ assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
|
|
|
+
|
|
|
+ final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
|
|
|
+ .getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
|
|
|
+ .getRetentionLeaseStats().retentionLeases();
|
|
|
+ assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
|
|
|
+ }
|
|
|
+
|
|
|
private long calculateTotalFilesSize(List<Path> files) {
|
|
|
return files.stream().mapToLong(f -> {
|
|
|
try {
|
|
@@ -1237,7 +1318,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|
|
}).sum();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
|
|
|
List<Path> files = new ArrayList<>();
|
|
|
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
|