|
@@ -524,7 +524,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTotalTests() {
|
|
|
- final TranslogStats total = new TranslogStats(0, 0, 0, 0, 1);
|
|
|
+ final TranslogStats total =
|
|
|
+ new TranslogStats(0, 0, 0, 0, 1);
|
|
|
final int n = randomIntBetween(0, 16);
|
|
|
final List<TranslogStats> statsList = new ArrayList<>(n);
|
|
|
for (int i = 0; i < n; i++) {
|
|
@@ -552,21 +553,27 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testNegativeNumberOfOperations() {
|
|
|
- IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1, 1, 1, 1));
|
|
|
+ IllegalArgumentException e =
|
|
|
+ expectThrows(IllegalArgumentException.class,
|
|
|
+ () -> new TranslogStats(-1, 1, 1, 1, 1));
|
|
|
assertThat(e, hasToString(containsString("numberOfOperations must be >= 0")));
|
|
|
- e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, -1, 1, 1));
|
|
|
+ e = expectThrows(IllegalArgumentException.class,
|
|
|
+ () -> new TranslogStats(1, 1, -1, 1, 1));
|
|
|
assertThat(e, hasToString(containsString("uncommittedOperations must be >= 0")));
|
|
|
}
|
|
|
|
|
|
public void testNegativeSizeInBytes() {
|
|
|
- IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1, 1));
|
|
|
+ IllegalArgumentException e =
|
|
|
+ expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1, 1));
|
|
|
assertThat(e, hasToString(containsString("translogSizeInBytes must be >= 0")));
|
|
|
- e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, -1, 1));
|
|
|
+ e = expectThrows(IllegalArgumentException.class,
|
|
|
+ () -> new TranslogStats(1, 1, 1, -1, 1));
|
|
|
assertThat(e, hasToString(containsString("uncommittedSizeInBytes must be >= 0")));
|
|
|
}
|
|
|
|
|
|
public void testOldestEntryInSeconds() {
|
|
|
- IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, 1, -1));
|
|
|
+ IllegalArgumentException e =
|
|
|
+ expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, 1, -1));
|
|
|
assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0")));
|
|
|
}
|
|
|
|
|
@@ -687,7 +694,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
List<Long> batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList());
|
|
|
Randomness.shuffle(batch);
|
|
|
for (long seqNo : batch) {
|
|
|
- Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1});
|
|
|
+ Translog.Index op =
|
|
|
+ new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1});
|
|
|
translog.add(op);
|
|
|
}
|
|
|
translog.rollGeneration();
|
|
@@ -767,7 +775,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
final CountDownLatch downLatch = new CountDownLatch(1);
|
|
|
for (int i = 0; i < threadCount; i++) {
|
|
|
final int threadId = i;
|
|
|
- threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions);
|
|
|
+ threads[i] =
|
|
|
+ new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions);
|
|
|
threads[i].setDaemon(true);
|
|
|
threads[i].start();
|
|
|
}
|
|
@@ -832,7 +841,9 @@ public class TranslogTests extends ESTestCase {
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
String ascii = randomAlphaOfLengthBetween(1, 50);
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8"))));
|
|
|
+ locations.add(
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8")))
|
|
|
+ );
|
|
|
}
|
|
|
translog.close();
|
|
|
|
|
@@ -859,7 +870,9 @@ public class TranslogTests extends ESTestCase {
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
String ascii = randomAlphaOfLengthBetween(1, 50);
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8"))));
|
|
|
+ locations.add(translog.add(
|
|
|
+ new Translog.Index("test", "" + op, op, primaryTerm.get(), ascii.getBytes("UTF-8")))
|
|
|
+ );
|
|
|
}
|
|
|
translog.sync();
|
|
|
|
|
@@ -1118,13 +1131,16 @@ public class TranslogTests extends ESTestCase {
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
int seqNo = ++count;
|
|
|
final Translog.Location location =
|
|
|
- translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(),
|
|
|
+ Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
|
|
|
if (randomBoolean()) {
|
|
|
assertTrue("at least one operation pending", translog.syncNeeded());
|
|
|
assertTrue("this operation has not been synced", translog.ensureSynced(location));
|
|
|
- assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
|
|
|
+ // we are the last location so everything should be synced
|
|
|
+ assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
|
|
|
seqNo = ++count;
|
|
|
- translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, seqNo, primaryTerm.get(),
|
|
|
+ Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))));
|
|
|
assertTrue("one pending operation", translog.syncNeeded());
|
|
|
assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
|
|
|
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
|
|
@@ -1153,17 +1169,20 @@ public class TranslogTests extends ESTestCase {
|
|
|
rollAndCommit(translog); // do this first so that there is at least one pending tlog entry
|
|
|
}
|
|
|
final Translog.Location location =
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
|
|
|
locations.add(location);
|
|
|
}
|
|
|
Collections.shuffle(locations, random());
|
|
|
if (randomBoolean()) {
|
|
|
assertTrue("at least one operation pending", translog.syncNeeded());
|
|
|
assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
|
|
|
- assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
|
|
|
+ // we are the last location so everything should be synced
|
|
|
+ assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
|
|
|
} else if (rarely()) {
|
|
|
rollAndCommit(translog);
|
|
|
- assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now
|
|
|
+ // not syncing now
|
|
|
+ assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream()));
|
|
|
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
|
|
|
} else {
|
|
|
translog.sync();
|
|
@@ -1181,7 +1200,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
int count = 0;
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
locations.add(
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op,
|
|
|
+ primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (rarely() && translogOperations > op + 1) {
|
|
|
rollAndCommit(translog);
|
|
|
}
|
|
@@ -1218,7 +1238,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
int lastSynced = -1;
|
|
|
long lastSyncedGlobalCheckpoint = globalCheckpoint.get();
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (randomBoolean()) {
|
|
|
globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16));
|
|
|
}
|
|
@@ -1233,7 +1254,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
|
|
|
|
|
|
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
|
|
|
- try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
|
|
|
+ try (TranslogReader reader =
|
|
|
+ translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
|
|
|
assertEquals(lastSynced + 1, reader.totalOperations());
|
|
|
TranslogSnapshot snapshot = reader.newSnapshot();
|
|
|
|
|
@@ -1276,7 +1298,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
writer.sync();
|
|
|
|
|
|
- final BaseTranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
|
|
|
+ final BaseTranslogReader reader = randomBoolean() ? writer :
|
|
|
+ translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
|
|
|
for (int i = 0; i < numOps; i++) {
|
|
|
ByteBuffer buffer = ByteBuffer.allocate(4);
|
|
|
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
|
|
@@ -1354,7 +1377,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
int minUncommittedOp = -1;
|
|
|
final boolean commitOften = randomBoolean();
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
final boolean commit = commitOften ? frequently() : rarely();
|
|
|
if (commit && op < translogOperations - 1) {
|
|
|
rollAndCommit(translog);
|
|
@@ -1375,8 +1399,10 @@ public class TranslogTests extends ESTestCase {
|
|
|
assertNull(snapshot.next());
|
|
|
}
|
|
|
} else {
|
|
|
- translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
- assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
|
|
|
+ translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(),
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
+ assertEquals("lastCommitted must be 1 less than current",
|
|
|
+ translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
|
|
|
assertFalse(translog.syncNeeded());
|
|
|
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) {
|
|
|
for (int i = minUncommittedOp; i < translogOperations; i++) {
|
|
@@ -1397,7 +1423,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
Translog.TranslogGeneration translogGeneration = null;
|
|
|
final boolean sync = randomBoolean();
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (op == prepareOp) {
|
|
|
translogGeneration = translog.getGeneration();
|
|
|
translog.rollGeneration();
|
|
@@ -1414,9 +1441,11 @@ public class TranslogTests extends ESTestCase {
|
|
|
TranslogConfig config = translog.getConfig();
|
|
|
final String translogUUID = translog.getTranslogUUID();
|
|
|
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertNotNull(translogGeneration);
|
|
|
- assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
+ assertEquals("lastCommitted must be 2 less than current - we never finished the commit",
|
|
|
+ translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
assertFalse(translog.syncNeeded());
|
|
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
|
|
int upTo = sync ? translogOperations : prepareOp;
|
|
@@ -1428,7 +1457,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
if (randomBoolean()) { // recover twice
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertNotNull(translogGeneration);
|
|
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice",
|
|
|
translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
|
@@ -1438,7 +1468,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
for (int i = 0; i < upTo; i++) {
|
|
|
Translog.Operation next = snapshot.next();
|
|
|
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
|
|
|
- assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
|
|
|
+ assertEquals("payload mismatch, synced: " + sync, i,
|
|
|
+ Integer.parseInt(next.getSource().source.utf8ToString()));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1453,7 +1484,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
Translog.TranslogGeneration translogGeneration = null;
|
|
|
final boolean sync = randomBoolean();
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (op == prepareOp) {
|
|
|
translogGeneration = translog.getGeneration();
|
|
|
translog.rollGeneration();
|
|
@@ -1474,9 +1506,11 @@ public class TranslogTests extends ESTestCase {
|
|
|
|
|
|
final String translogUUID = translog.getTranslogUUID();
|
|
|
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertNotNull(translogGeneration);
|
|
|
- assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
+ assertEquals("lastCommitted must be 2 less than current - we never finished the commit",
|
|
|
+ translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
assertFalse(translog.syncNeeded());
|
|
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
|
|
int upTo = sync ? translogOperations : prepareOp;
|
|
@@ -1489,7 +1523,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
if (randomBoolean()) { // recover twice
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertNotNull(translogGeneration);
|
|
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice",
|
|
|
translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
|
@@ -1499,7 +1534,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
for (int i = 0; i < upTo; i++) {
|
|
|
Translog.Operation next = snapshot.next();
|
|
|
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
|
|
|
- assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString()));
|
|
|
+ assertEquals("payload mismatch, synced: " + sync, i,
|
|
|
+ Integer.parseInt(next.getSource().source.utf8ToString()));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1513,7 +1549,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
Translog.TranslogGeneration translogGeneration = null;
|
|
|
final boolean sync = randomBoolean();
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op,
|
|
|
+ primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (op == prepareOp) {
|
|
|
translogGeneration = translog.getGeneration();
|
|
|
translog.rollGeneration();
|
|
@@ -1528,21 +1565,28 @@ public class TranslogTests extends ESTestCase {
|
|
|
TranslogConfig config = translog.getConfig();
|
|
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
|
|
Checkpoint read = Checkpoint.read(ckp);
|
|
|
- Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbers.NO_OPS_PERFORMED, 0);
|
|
|
- Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
|
|
+ Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0,
|
|
|
+ SequenceNumbers.NO_OPS_PERFORMED, 0);
|
|
|
+ Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)),
|
|
|
+ corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
|
|
final String translogUUID = translog.getTranslogUUID();
|
|
|
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
|
|
- try (Translog ignored = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog ignored = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
fail("corrupted");
|
|
|
} catch (IllegalStateException ex) {
|
|
|
assertEquals("Checkpoint file translog-3.ckp already exists but has corrupted content expected: Checkpoint{offset=3025, " +
|
|
|
- "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} but got: Checkpoint{offset=0, numOps=0, " +
|
|
|
- "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage());
|
|
|
- }
|
|
|
- Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ "numOps=55, generation=3, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-1, minTranslogGeneration=1, trimmedAboveSeqNo=-2} " +
|
|
|
+ "but got: Checkpoint{offset=0, numOps=0, generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-1, " +
|
|
|
+ "minTranslogGeneration=0, trimmedAboveSeqNo=-2}", ex.getMessage());
|
|
|
+ }
|
|
|
+ Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)),
|
|
|
+ read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertNotNull(translogGeneration);
|
|
|
- assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
+ assertEquals("lastCommitted must be 2 less than current - we never finished the commit",
|
|
|
+ translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
|
|
assertFalse(translog.syncNeeded());
|
|
|
try (Translog.Snapshot snapshot = new SortedSnapshot(translog.newSnapshot())) {
|
|
|
int upTo = sync ? translogOperations : prepareOp;
|
|
@@ -1560,7 +1604,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
List<Translog.Operation> ops = new ArrayList<>();
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- Translog.Index test = new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")));
|
|
|
+ Translog.Index test = new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")));
|
|
|
ops.add(test);
|
|
|
}
|
|
|
Translog.writeOperations(out, ops);
|
|
@@ -1687,7 +1732,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
TranslogConfig config = getTranslogConfig(tempDir);
|
|
|
List<FileChannel> fileChannels = new ArrayList<>();
|
|
|
final Translog failableTLog =
|
|
|
- getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy(), fileChannels);
|
|
|
+ getFailableTranslog(fail, config, randomBoolean(),
|
|
|
+ false, null, createTranslogDeletionPolicy(), fileChannels);
|
|
|
|
|
|
IOException expectedException = null;
|
|
|
int translogOperations = 0;
|
|
@@ -1761,8 +1807,10 @@ public class TranslogTests extends ESTestCase {
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
try (Translog translog2 = create(createTempDir())) {
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
- locations2.add(translog2.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations2.add(translog2.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
}
|
|
|
int iters = randomIntBetween(10, 100);
|
|
|
for (int i = 0; i < iters; i++) {
|
|
@@ -1788,7 +1836,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
int translogOperations = randomIntBetween(1, 10);
|
|
|
int firstUncommitted = 0;
|
|
|
for (int op = 0; op < translogOperations; op++) {
|
|
|
- locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
|
|
if (randomBoolean()) {
|
|
|
rollAndCommit(translog);
|
|
|
firstUncommitted = op + 1;
|
|
@@ -1820,10 +1869,12 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testFailOnClosedWrite() throws IOException {
|
|
|
- translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(),
|
|
|
+ Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
translog.close();
|
|
|
try {
|
|
|
- translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(),
|
|
|
+ Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
fail("closed");
|
|
|
} catch (AlreadyClosedException ex) {
|
|
|
// all is well
|
|
@@ -1843,7 +1894,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
final AtomicLong seqNoGenerator = new AtomicLong();
|
|
|
for (int i = 0; i < threadCount; i++) {
|
|
|
final int threadId = i;
|
|
|
- threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId, writtenOperations, seqNoGenerator, threadExceptions);
|
|
|
+ threads[i] = new TranslogThread(translog, downLatch, opsPerThread, threadId,
|
|
|
+ writtenOperations, seqNoGenerator, threadExceptions);
|
|
|
threads[i].setDaemon(true);
|
|
|
threads[i].start();
|
|
|
}
|
|
@@ -1941,7 +1993,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
while (failed == false) {
|
|
|
try {
|
|
|
locations.add(translog.add(
|
|
|
- new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(), Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(),
|
|
|
+ Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
|
|
translog.sync();
|
|
|
opsSynced++;
|
|
|
} catch (MockDirectoryWrapper.FakeIOException ex) {
|
|
@@ -1962,7 +2015,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
if (randomBoolean()) {
|
|
|
try {
|
|
|
locations.add(translog.add(
|
|
|
- new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(), Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
|
|
+ new Translog.Index("test", "" + opsSynced, opsSynced, primaryTerm.get(),
|
|
|
+ Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
|
|
fail("we are already closed");
|
|
|
} catch (AlreadyClosedException ex) {
|
|
|
assertNotNull(ex.getCause());
|
|
@@ -1996,14 +2050,17 @@ public class TranslogTests extends ESTestCase {
|
|
|
translog.close(); // we are closed
|
|
|
final String translogUUID = translog.getTranslogUUID();
|
|
|
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
|
|
- try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
- assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
|
|
|
+ try (Translog tlog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ assertEquals("lastCommitted must be 1 less than current",
|
|
|
+ translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
|
|
|
assertFalse(tlog.syncNeeded());
|
|
|
|
|
|
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
|
|
assertEquals(opsSynced, snapshot.totalOperations());
|
|
|
for (int i = 0; i < opsSynced; i++) {
|
|
|
- assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1,
|
|
|
+ assertEquals("expected operation" + i + " to be in the previous translog but wasn't",
|
|
|
+ tlog.currentFileGeneration() - 1,
|
|
|
locations.get(i).generation);
|
|
|
Translog.Operation next = snapshot.next();
|
|
|
assertNotNull("operation " + i + " must be non-null", next);
|
|
@@ -2019,11 +2076,13 @@ public class TranslogTests extends ESTestCase {
|
|
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer borders regularly
|
|
|
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
|
|
locations.add(translog.add(
|
|
|
- new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
|
|
|
+ new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(),
|
|
|
+ lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
|
|
|
try (Translog.Snapshot snapshot = this.translog.newSnapshot()) {
|
|
|
assertEquals(opsAdded + 1, snapshot.totalOperations());
|
|
|
for (int i = 0; i < opsAdded; i++) {
|
|
|
- assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(),
|
|
|
+ assertEquals("expected operation" + i + " to be in the current translog but wasn't",
|
|
|
+ translog.currentFileGeneration(),
|
|
|
locations.get(i).generation);
|
|
|
Translog.Operation next = snapshot.next();
|
|
|
assertNotNull("operation " + i + " must be non-null", next);
|
|
@@ -2036,13 +2095,16 @@ public class TranslogTests extends ESTestCase {
|
|
|
Path tempDir = createTempDir();
|
|
|
final FailSwitch fail = new FailSwitch();
|
|
|
TranslogConfig config = getTranslogConfig(tempDir);
|
|
|
- Translog translog = getFailableTranslog(fail, config, false, true, null, createTranslogDeletionPolicy());
|
|
|
+ Translog translog = getFailableTranslog(fail, config, false, true, null,
|
|
|
+ createTranslogDeletionPolicy());
|
|
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
|
|
- translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "1", 0, primaryTerm.get(),
|
|
|
+ lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
|
|
fail.failAlways();
|
|
|
try {
|
|
|
Translog.Location location = translog.add(
|
|
|
- new Translog.Index("test", "2", 1, primaryTerm.get(), lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
|
|
+ new Translog.Index("test", "2", 1, primaryTerm.get(),
|
|
|
+ lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
|
|
if (randomBoolean()) {
|
|
|
translog.ensureSynced(location);
|
|
|
} else {
|
|
@@ -2076,7 +2138,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
List<LocationOperation> writtenOperations = Collections.synchronizedList(new ArrayList<>());
|
|
|
for (int i = 0; i < threadCount; i++) {
|
|
|
final int threadId = i;
|
|
|
- threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, seqNoGenerator, threadExceptions) {
|
|
|
+ threads[i] = new TranslogThread(translog, downLatch, 200, threadId,
|
|
|
+ writtenOperations, seqNoGenerator, threadExceptions) {
|
|
|
@Override
|
|
|
protected Translog.Location add(Translog.Operation op) throws IOException {
|
|
|
Translog.Location add = super.add(op);
|
|
@@ -2132,7 +2195,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
}
|
|
|
try (Translog tlog =
|
|
|
- new Translog(config, translogUUID, createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
+ new Translog(config, translogUUID, createTranslogDeletionPolicy(),
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
|
|
if (writtenOperations.size() != snapshot.totalOperations()) {
|
|
|
for (int i = 0; i < threadCount; i++) {
|
|
@@ -2143,7 +2207,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
assertEquals(writtenOperations.size(), snapshot.totalOperations());
|
|
|
for (int i = 0; i < writtenOperations.size(); i++) {
|
|
|
- assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
|
|
|
+ assertEquals("expected operation" + i + " to be in the previous translog but wasn't",
|
|
|
+ tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
|
|
|
Translog.Operation next = snapshot.next();
|
|
|
assertNotNull("operation " + i + " must be non-null", next);
|
|
|
assertEquals(next, writtenOperations.get(i).operation);
|
|
@@ -2159,7 +2224,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
public void testRecoveryFromAFutureGenerationCleansUp() throws IOException {
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
for (int op = 0; op < translogOperations / 2; op++) {
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
if (rarely()) {
|
|
|
translog.rollGeneration();
|
|
|
}
|
|
@@ -2167,7 +2233,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
translog.rollGeneration();
|
|
|
long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration());
|
|
|
for (int op = translogOperations / 2; op < translogOperations; op++) {
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
if (rarely()) {
|
|
|
translog.rollGeneration();
|
|
|
}
|
|
@@ -2178,7 +2245,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
|
|
|
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
|
|
|
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
|
|
|
- translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
+ translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
assertThat(translog.getMinFileGeneration(), equalTo(1L));
|
|
|
// no trimming done yet, just recovered
|
|
|
for (long gen = 1; gen < translog.currentFileGeneration(); gen++) {
|
|
@@ -2209,7 +2277,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
translogUUID = translog.getTranslogUUID();
|
|
|
int translogOperations = randomIntBetween(10, 100);
|
|
|
for (int op = 0; op < translogOperations / 2; op++) {
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
if (rarely()) {
|
|
|
translog.rollGeneration();
|
|
|
}
|
|
@@ -2217,7 +2286,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
translog.rollGeneration();
|
|
|
comittedGeneration = randomLongBetween(2, translog.currentFileGeneration());
|
|
|
for (int op = translogOperations / 2; op < translogOperations; op++) {
|
|
|
- translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
|
|
|
+ Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
|
|
|
if (rarely()) {
|
|
|
translog.rollGeneration();
|
|
|
}
|
|
@@ -2234,7 +2304,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1);
|
|
|
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
|
|
|
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
|
|
|
- try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
// we don't know when things broke exactly
|
|
|
assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L));
|
|
|
assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration));
|
|
@@ -2300,8 +2371,10 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
boolean success = false;
|
|
|
try {
|
|
|
- final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
|
|
|
- ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel);
|
|
|
+ // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation
|
|
|
+ final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp");
|
|
|
+ ThrowingFileChannel throwingFileChannel =
|
|
|
+ new ThrowingFileChannel(fail, isCkpFile ? false : partialWrites, throwUnknownException, channel);
|
|
|
success = true;
|
|
|
return throwingFileChannel;
|
|
|
} finally {
|
|
@@ -2337,7 +2410,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
private final boolean partialWrite;
|
|
|
private final boolean throwUnknownException;
|
|
|
|
|
|
- public ThrowingFileChannel(FailSwitch fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) throws MockDirectoryWrapper.FakeIOException {
|
|
|
+ public ThrowingFileChannel(FailSwitch fail, boolean partialWrite,
|
|
|
+ boolean throwUnknownException, FileChannel delegate) throws MockDirectoryWrapper.FakeIOException {
|
|
|
super(delegate);
|
|
|
this.fail = fail;
|
|
|
this.partialWrite = partialWrite;
|
|
@@ -2426,7 +2500,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
translog.add(new Translog.Index("test", "boom", 0, primaryTerm.get(), "boom".getBytes(Charset.forName("UTF-8"))));
|
|
|
translog.close();
|
|
|
try {
|
|
|
- new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) {
|
|
|
+ new Translog(config, translog.getTranslogUUID(), createTranslogDeletionPolicy(),
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get) {
|
|
|
@Override
|
|
|
protected TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint)
|
|
|
throws IOException {
|
|
@@ -2441,7 +2516,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testRecoverWithUnbackedNextGen() throws IOException {
|
|
|
- translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(),
|
|
|
+ Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
translog.close();
|
|
|
TranslogConfig config = translog.getConfig();
|
|
|
|
|
@@ -2457,7 +2533,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
assertNotNull("operation 1 must be non-null", op);
|
|
|
assertEquals("payload mismatch for operation 1", 1, Integer.parseInt(op.getSource().source.utf8ToString()));
|
|
|
|
|
|
- tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(), Integer.toString(2).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(),
|
|
|
+ Integer.toString(2).getBytes(Charset.forName("UTF-8"))));
|
|
|
}
|
|
|
|
|
|
try (Translog tlog = openTranslog(config, translog.getTranslogUUID());
|
|
@@ -2466,16 +2543,19 @@ public class TranslogTests extends ESTestCase {
|
|
|
|
|
|
Translog.Operation secondOp = snapshot.next();
|
|
|
assertNotNull("operation 2 must be non-null", secondOp);
|
|
|
- assertEquals("payload mismatch for operation 2", Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2);
|
|
|
+ assertEquals("payload mismatch for operation 2",
|
|
|
+ Integer.parseInt(secondOp.getSource().source.utf8ToString()), 2);
|
|
|
|
|
|
Translog.Operation firstOp = snapshot.next();
|
|
|
assertNotNull("operation 1 must be non-null", firstOp);
|
|
|
- assertEquals("payload mismatch for operation 1", Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1);
|
|
|
+ assertEquals("payload mismatch for operation 1",
|
|
|
+ Integer.parseInt(firstOp.getSource().source.utf8ToString()), 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException {
|
|
|
- translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(),
|
|
|
+ Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
|
|
translog.close();
|
|
|
TranslogConfig config = translog.getConfig();
|
|
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
|
@@ -2483,13 +2563,15 @@ public class TranslogTests extends ESTestCase {
|
|
|
// don't copy the new file
|
|
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
|
|
|
|
|
- TranslogException ex = expectThrows(TranslogException.class, () -> new Translog(config, translog.getTranslogUUID(), translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get));
|
|
|
+ TranslogException ex = expectThrows(TranslogException.class, () -> new Translog(config, translog.getTranslogUUID(),
|
|
|
+ translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get));
|
|
|
assertEquals(ex.getMessage(), "failed to create new translog file");
|
|
|
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
|
|
|
}
|
|
|
|
|
|
public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
|
|
|
- translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(), Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ translog.add(new Translog.Index("test", "" + 0, 0, primaryTerm.get(),
|
|
|
+ Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
|
|
translog.close();
|
|
|
TranslogConfig config = translog.getConfig();
|
|
|
final String translogUUID = translog.getTranslogUUID();
|
|
@@ -2501,7 +2583,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
|
|
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
|
|
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
|
|
|
- try (Translog tlog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
+ try (Translog tlog = new Translog(config, translogUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
|
|
assertFalse(tlog.syncNeeded());
|
|
|
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
|
|
for (int i = 0; i < 1; i++) {
|
|
@@ -2510,7 +2593,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.utf8ToString()));
|
|
|
}
|
|
|
}
|
|
|
- tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(), Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
+ tlog.add(new Translog.Index("test", "" + 1, 1, primaryTerm.get(),
|
|
|
+ Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
|
|
}
|
|
|
|
|
|
TranslogException ex = expectThrows(TranslogException.class,
|
|
@@ -2541,12 +2625,14 @@ public class TranslogTests extends ESTestCase {
|
|
|
String generationUUID = null;
|
|
|
try {
|
|
|
boolean committing = false;
|
|
|
- final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, createTranslogDeletionPolicy());
|
|
|
+ final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false,
|
|
|
+ generationUUID, createTranslogDeletionPolicy());
|
|
|
try {
|
|
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
|
|
|
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
|
|
String doc = lineFileDocs.nextDoc().toString();
|
|
|
- failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(), doc.getBytes(Charset.forName("UTF-8"))));
|
|
|
+ failableTLog.add(new Translog.Index("test", "" + opsAdded, opsAdded, primaryTerm.get(),
|
|
|
+ doc.getBytes(Charset.forName("UTF-8"))));
|
|
|
unsynced.add(doc);
|
|
|
if (randomBoolean()) {
|
|
|
failableTLog.sync();
|
|
@@ -2554,7 +2640,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
unsynced.clear();
|
|
|
}
|
|
|
if (randomFloat() < 0.1) {
|
|
|
- failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails
|
|
|
+ // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails
|
|
|
+ failableTLog.sync();
|
|
|
syncedDocs.addAll(unsynced);
|
|
|
unsynced.clear();
|
|
|
failableTLog.rollGeneration();
|
|
@@ -2620,9 +2707,11 @@ public class TranslogTests extends ESTestCase {
|
|
|
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
|
|
|
if (generationUUID == null) {
|
|
|
// we never managed to successfully create a translog, make it
|
|
|
- generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
|
|
|
+ generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(),
|
|
|
+ SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
|
|
|
}
|
|
|
- try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
+ try (Translog translog = new Translog(config, generationUUID, deletionPolicy,
|
|
|
+ () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
|
|
|
Translog.Snapshot snapshot = translog.newSnapshotFromGen(
|
|
|
new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) {
|
|
|
assertEquals(syncedDocs.size(), snapshot.totalOperations());
|
|
@@ -2655,7 +2744,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
public void testCheckpointOnDiskFull() throws IOException {
|
|
|
final Checkpoint checkpoint = randomCheckpoint();
|
|
|
Path tempDir = createTempDir();
|
|
|
- Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
|
|
+ Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint,
|
|
|
+ StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
|
|
final Checkpoint checkpoint2 = randomCheckpoint();
|
|
|
try {
|
|
|
Checkpoint.write((p, o) -> {
|
|
@@ -2719,7 +2809,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
document.add(seqID.seqNo);
|
|
|
document.add(seqID.seqNoDocValue);
|
|
|
document.add(seqID.primaryTerm);
|
|
|
- ParsedDocument doc = new ParsedDocument(versionField, seqID, "1", "type", null, Arrays.asList(document), B_1, XContentType.JSON,
|
|
|
+ ParsedDocument doc = new ParsedDocument(versionField, seqID, "1", "type", null,
|
|
|
+ Arrays.asList(document), B_1, XContentType.JSON,
|
|
|
null);
|
|
|
|
|
|
Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm,
|
|
@@ -2948,7 +3039,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
for (int gen = 0; gen < generations; gen++) {
|
|
|
final int operations = randomIntBetween(1, 100);
|
|
|
for (int i = 0; i < operations; i++) {
|
|
|
- Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo.getAndIncrement(), primaryTerm.get(), new byte[]{1});
|
|
|
+ Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10),
|
|
|
+ seqNo.getAndIncrement(), primaryTerm.get(), new byte[]{1});
|
|
|
translog.add(op);
|
|
|
views.peek().add(op);
|
|
|
}
|
|
@@ -2973,7 +3065,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
List<Long> batch = LongStream.rangeClosed(0, between(0, 500)).boxed().collect(Collectors.toList());
|
|
|
Randomness.shuffle(batch);
|
|
|
for (Long seqNo : batch) {
|
|
|
- Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1});
|
|
|
+ Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10),
|
|
|
+ seqNo, primaryTerm.get(), new byte[]{1});
|
|
|
translog.add(op);
|
|
|
latestOperations.put(op.seqNo(), op);
|
|
|
}
|
|
@@ -3006,7 +3099,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
public void testTranslogCloseInvariant() throws IOException {
|
|
|
assumeTrue("test only works with assertions enabled", Assertions.ENABLED);
|
|
|
class MisbehavingTranslog extends Translog {
|
|
|
- MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException {
|
|
|
+ MisbehavingTranslog(TranslogConfig config, String translogUUID, TranslogDeletionPolicy deletionPolicy,
|
|
|
+ LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier) throws IOException {
|
|
|
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier);
|
|
|
}
|
|
|
|
|
@@ -3035,7 +3129,8 @@ public class TranslogTests extends ESTestCase {
|
|
|
final TranslogConfig translogConfig = getTranslogConfig(path);
|
|
|
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
|
|
|
final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
|
|
|
- MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy, () -> globalCheckpoint.get(), primaryTerm::get);
|
|
|
+ MisbehavingTranslog misbehavingTranslog = new MisbehavingTranslog(translogConfig, translogUUID, deletionPolicy,
|
|
|
+ () -> globalCheckpoint.get(), primaryTerm::get);
|
|
|
|
|
|
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseDirectly());
|
|
|
expectThrows(AssertionError.class, () -> misbehavingTranslog.callCloseUsingIOUtils());
|