|
|
@@ -644,6 +644,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
InternalEngine engine = createEngine(store, translog);
|
|
|
engine.close();
|
|
|
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
engine = new InternalEngine(engine.config());
|
|
|
assertTrue(engine.isRecovering());
|
|
|
engine.recoverFromTranslog();
|
|
|
@@ -659,6 +660,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
engine.index(indexForDoc(doc));
|
|
|
engine.close();
|
|
|
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
engine = new InternalEngine(engine.config());
|
|
|
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
|
|
|
assertTrue(engine.isRecovering());
|
|
|
@@ -690,18 +692,14 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
} finally {
|
|
|
IOUtils.close(engine);
|
|
|
}
|
|
|
-
|
|
|
- Engine recoveringEngine = null;
|
|
|
- try {
|
|
|
- recoveringEngine = new InternalEngine(engine.config());
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
+ try (Engine recoveringEngine = new InternalEngine(engine.config())){
|
|
|
recoveringEngine.recoverFromTranslog();
|
|
|
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
|
|
|
final TotalHitCountCollector collector = new TotalHitCountCollector();
|
|
|
searcher.searcher().search(new MatchAllDocsQuery(), collector);
|
|
|
assertThat(collector.getTotalHits(), equalTo(operations.get(operations.size() - 1) instanceof Engine.Delete ? 0 : 1));
|
|
|
}
|
|
|
- } finally {
|
|
|
- IOUtils.close(recoveringEngine);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -722,6 +720,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
Engine recoveringEngine = null;
|
|
|
try {
|
|
|
final AtomicBoolean committed = new AtomicBoolean();
|
|
|
+ trimUnsafeCommits(initialEngine.config());
|
|
|
recoveringEngine = new InternalEngine(initialEngine.config()) {
|
|
|
|
|
|
@Override
|
|
|
@@ -1151,6 +1150,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
|
|
|
store.associateIndexWithNewTranslog(translogUUID);
|
|
|
}
|
|
|
+ trimUnsafeCommits(config);
|
|
|
engine = new InternalEngine(config);
|
|
|
engine.recoverFromTranslog();
|
|
|
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
|
|
|
@@ -2054,9 +2054,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
IOUtils.close(initialEngine);
|
|
|
}
|
|
|
|
|
|
- InternalEngine recoveringEngine = null;
|
|
|
- try {
|
|
|
- recoveringEngine = new InternalEngine(initialEngine.config());
|
|
|
+ trimUnsafeCommits(initialEngine.engineConfig);
|
|
|
+ try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){
|
|
|
recoveringEngine.recoverFromTranslog();
|
|
|
|
|
|
assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
|
|
|
@@ -2075,8 +2074,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(primarySeqNo));
|
|
|
assertThat(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo));
|
|
|
assertThat(recoveringEngine.getLocalCheckpointTracker().generateSeqNo(), equalTo(primarySeqNo + 1));
|
|
|
- } finally {
|
|
|
- IOUtils.close(recoveringEngine);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -2389,6 +2386,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
// open and recover tlog
|
|
|
{
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
+ trimUnsafeCommits(config);
|
|
|
try (InternalEngine engine = new InternalEngine(config)) {
|
|
|
assertTrue(engine.isRecovering());
|
|
|
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
|
|
|
@@ -2413,6 +2411,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
final String translogUUID =
|
|
|
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
|
|
|
store.associateIndexWithNewTranslog(translogUUID);
|
|
|
+ trimUnsafeCommits(config);
|
|
|
try (InternalEngine engine = new InternalEngine(config)) {
|
|
|
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
|
|
|
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
|
|
|
@@ -2426,6 +2425,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
// open and recover tlog with empty tlog
|
|
|
{
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
+ trimUnsafeCommits(config);
|
|
|
try (InternalEngine engine = new InternalEngine(config)) {
|
|
|
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
|
|
|
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
|
|
|
@@ -2487,6 +2487,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
boolean started = false;
|
|
|
InternalEngine engine = null;
|
|
|
try {
|
|
|
+ trimUnsafeCommits(config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null));
|
|
|
engine = createEngine(store, translogPath);
|
|
|
started = true;
|
|
|
} catch (EngineException | IOException e) {
|
|
|
@@ -2567,6 +2568,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
}
|
|
|
assertVisibleCount(engine, numDocs);
|
|
|
engine.close();
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
engine = new InternalEngine(engine.config());
|
|
|
engine.skipTranslogRecovery();
|
|
|
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
|
|
@@ -2608,6 +2610,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
parser.mappingUpdate = dynamicUpdate();
|
|
|
|
|
|
engine.close();
|
|
|
+ trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
|
|
|
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
|
|
|
engine.recoverFromTranslog();
|
|
|
|
|
|
@@ -3685,6 +3688,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
final BiFunction<Long, Long, LocalCheckpointTracker> supplier = (ms, lcp) -> new LocalCheckpointTracker(
|
|
|
maxSeqNo,
|
|
|
localCheckpoint);
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
noOpEngine = new InternalEngine(engine.config(), supplier) {
|
|
|
@Override
|
|
|
protected long doGenerateSeqNoForOperation(Operation operation) {
|
|
|
@@ -3832,6 +3836,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
completedSeqNos.add(seqNo);
|
|
|
}
|
|
|
};
|
|
|
+ trimUnsafeCommits(engine.config());
|
|
|
actualEngine = new InternalEngine(engine.config(), supplier);
|
|
|
final int operations = randomIntBetween(0, 1024);
|
|
|
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
|
|
|
@@ -3902,6 +3907,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|
|
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint());
|
|
|
assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
|
|
|
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
|
|
|
+ trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
|
|
|
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
|
|
|
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
|
|
|
recoveringEngine.recoverFromTranslog();
|