|
@@ -19,43 +19,30 @@
|
|
|
|
|
|
package org.elasticsearch.index.shard;
|
|
|
|
|
|
-import org.apache.lucene.index.SegmentInfos;
|
|
|
-import org.elasticsearch.Version;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
|
|
-import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
|
|
-import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
import org.elasticsearch.index.IndexSettings;
|
|
|
-import org.elasticsearch.index.engine.Engine;
|
|
|
-import org.elasticsearch.index.engine.InternalEngine;
|
|
|
import org.elasticsearch.index.engine.InternalEngineFactory;
|
|
|
import org.elasticsearch.index.seqno.RetentionLease;
|
|
|
import org.elasticsearch.index.seqno.RetentionLeaseStats;
|
|
|
import org.elasticsearch.index.seqno.RetentionLeases;
|
|
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
|
-import org.elasticsearch.indices.recovery.RecoveryState;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
-import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
|
|
import static org.hamcrest.Matchers.contains;
|
|
|
-import static org.hamcrest.Matchers.containsInAnyOrder;
|
|
|
import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.hasItem;
|
|
@@ -221,7 +208,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testCommit() throws IOException {
|
|
|
+ public void testPersistence() throws IOException {
|
|
|
final Settings settings = Settings.builder()
|
|
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
|
|
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS)
|
|
@@ -242,19 +229,17 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|
|
|
|
|
currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE));
|
|
|
|
|
|
- // force a commit
|
|
|
- indexShard.flush(new FlushRequest().force(true));
|
|
|
+ // force the retention leases to persist
|
|
|
+ indexShard.persistRetentionLeases();
|
|
|
|
|
|
- // the committed retention leases should equal our current retention leases
|
|
|
- final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo();
|
|
|
- assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES));
|
|
|
+ // the written retention leases should equal our current retention leases
|
|
|
final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
|
|
|
- final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos);
|
|
|
+ final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases();
|
|
|
if (retentionLeases.leases().isEmpty()) {
|
|
|
- assertThat(committedRetentionLeases.version(), equalTo(0L));
|
|
|
- assertThat(committedRetentionLeases.leases(), empty());
|
|
|
+ assertThat(writtenRetentionLeases.version(), equalTo(0L));
|
|
|
+ assertThat(writtenRetentionLeases.leases(), empty());
|
|
|
} else {
|
|
|
- assertThat(committedRetentionLeases.version(), equalTo((long) length));
|
|
|
+ assertThat(writtenRetentionLeases.version(), equalTo((long) length));
|
|
|
assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
|
|
|
}
|
|
|
|
|
@@ -304,76 +289,6 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
|
|
|
- final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
|
|
|
- final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
|
|
|
- config -> new InternalEngine(config) {
|
|
|
- @Override
|
|
|
- public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
|
|
|
- long recoverUpToSeqNo) throws IOException {
|
|
|
- if (throwDuringRecoverFromTranslog.get()) {
|
|
|
- throw new RuntimeException("crashed before recover from translog is completed");
|
|
|
- }
|
|
|
- return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
|
|
|
- }
|
|
|
- });
|
|
|
- final List<RetentionLease> leases = new ArrayList<>();
|
|
|
- long version = randomLongBetween(0, 100);
|
|
|
- long primaryTerm = randomLongBetween(1, 100);
|
|
|
- final int iterations = randomIntBetween(1, 10);
|
|
|
- for (int i = 0; i < iterations; i++) {
|
|
|
- if (randomBoolean()) {
|
|
|
- indexDoc(shard, "_doc", Integer.toString(i));
|
|
|
- } else {
|
|
|
- leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
|
|
|
- randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
|
|
|
- }
|
|
|
- if (randomBoolean()) {
|
|
|
- if (randomBoolean()) {
|
|
|
- version += randomLongBetween(1, 100);
|
|
|
- primaryTerm += randomLongBetween(0, 100);
|
|
|
- shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
|
|
|
- shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
|
|
- }
|
|
|
- }
|
|
|
- if (randomBoolean()) {
|
|
|
- shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
|
|
|
- flushShard(shard);
|
|
|
- }
|
|
|
- }
|
|
|
- version += randomLongBetween(1, 100);
|
|
|
- primaryTerm += randomLongBetween(0, 100);
|
|
|
- shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
|
|
|
- shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
|
|
- closeShard(shard, false);
|
|
|
-
|
|
|
- final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
|
|
|
- shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
|
|
|
- RecoverySource.ExistingStoreRecoverySource.INSTANCE));
|
|
|
- final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
|
|
|
- Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
|
|
|
- failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
|
|
|
- throwDuringRecoverFromTranslog.set(true);
|
|
|
- expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
|
|
|
- closeShards(failedShard);
|
|
|
-
|
|
|
- final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
|
|
|
- shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
|
|
|
- RecoverySource.ExistingStoreRecoverySource.INSTANCE));
|
|
|
- newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
|
|
|
- throwDuringRecoverFromTranslog.set(false);
|
|
|
- assertTrue(newShard.recoverFromStore());
|
|
|
- final RetentionLeases retentionLeases = newShard.getRetentionLeases();
|
|
|
- assertThat(retentionLeases.version(), equalTo(version));
|
|
|
- assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
|
|
|
- if (leases.isEmpty()) {
|
|
|
- assertThat(retentionLeases.leases(), empty());
|
|
|
- } else {
|
|
|
- assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
|
|
|
- }
|
|
|
- closeShards(newShard);
|
|
|
- }
|
|
|
-
|
|
|
private void assertRetentionLeases(
|
|
|
final IndexShard indexShard,
|
|
|
final int size,
|