|
@@ -32,6 +32,7 @@ import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
+import java.util.OptionalLong;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
@@ -98,7 +99,9 @@ public class SoftDeletesPolicyTests extends ESTestCase {
|
|
|
.min()
|
|
|
.orElse(Long.MAX_VALUE);
|
|
|
long retainedSeqNo =
|
|
|
- Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
|
|
|
+ Math.min(
|
|
|
+ 1 + safeCommitCheckpoint,
|
|
|
+ Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps));
|
|
|
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
|
|
|
}
|
|
|
assertThat(retentionQuery.getNumDims(), equalTo(1));
|
|
@@ -113,7 +116,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
|
|
|
.min()
|
|
|
.orElse(Long.MAX_VALUE);
|
|
|
long retainedSeqNo =
|
|
|
- Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
|
|
|
+ Math.min(1 + safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, 1 + globalCheckpoint.get() - retainedOps));
|
|
|
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
|
|
|
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
|
|
|
}
|
|
@@ -141,4 +144,87 @@ public class SoftDeletesPolicyTests extends ESTestCase {
|
|
|
assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0])));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public void testWhenGlobalCheckpointDictatesThePolicy() {
|
|
|
+ final int retentionOperations = randomIntBetween(0, 1024);
|
|
|
+ final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2));
|
|
|
+ final Collection<RetentionLease> leases = new ArrayList<>();
|
|
|
+ final int numberOfLeases = randomIntBetween(0, 16);
|
|
|
+ for (int i = 0; i < numberOfLeases; i++) {
|
|
|
+ // setup leases where the minimum retained sequence number is more than the policy dictated by the global checkpoint
|
|
|
+ leases.add(new RetentionLease(
|
|
|
+ Integer.toString(i),
|
|
|
+ randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE),
|
|
|
+ randomNonNegativeLong(), "test"));
|
|
|
+ }
|
|
|
+ final long primaryTerm = randomNonNegativeLong();
|
|
|
+ final long version = randomNonNegativeLong();
|
|
|
+ final Supplier<RetentionLeases> leasesSupplier =
|
|
|
+ () -> new RetentionLeases(
|
|
|
+ primaryTerm,
|
|
|
+ version,
|
|
|
+ Collections.unmodifiableCollection(new ArrayList<>(leases)));
|
|
|
+ final SoftDeletesPolicy policy =
|
|
|
+ new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
|
|
|
+ // set the local checkpoint of the safe commit to more than the policy dicated by the global checkpoint
|
|
|
+ final long localCheckpointOfSafeCommit = randomLongBetween(1 + globalCheckpoint.get() - retentionOperations + 1, Long.MAX_VALUE);
|
|
|
+ policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
|
|
|
+ assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + globalCheckpoint.get() - retentionOperations));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testWhenLocalCheckpointOfSafeCommitDictatesThePolicy() {
|
|
|
+ final int retentionOperations = randomIntBetween(0, 1024);
|
|
|
+ final long localCheckpointOfSafeCommit = randomLongBetween(-1, Long.MAX_VALUE - retentionOperations - 1);
|
|
|
+ final AtomicLong globalCheckpoint =
|
|
|
+ new AtomicLong(randomLongBetween(Math.max(0, localCheckpointOfSafeCommit + retentionOperations), Long.MAX_VALUE - 1));
|
|
|
+ final Collection<RetentionLease> leases = new ArrayList<>();
|
|
|
+ final int numberOfLeases = randomIntBetween(0, 16);
|
|
|
+ for (int i = 0; i < numberOfLeases; i++) {
|
|
|
+ leases.add(new RetentionLease(
|
|
|
+ Integer.toString(i),
|
|
|
+ randomLongBetween(1 + localCheckpointOfSafeCommit + 1, Long.MAX_VALUE), // leases are for more than the local checkpoint
|
|
|
+ randomNonNegativeLong(), "test"));
|
|
|
+ }
|
|
|
+ final long primaryTerm = randomNonNegativeLong();
|
|
|
+ final long version = randomNonNegativeLong();
|
|
|
+ final Supplier<RetentionLeases> leasesSupplier =
|
|
|
+ () -> new RetentionLeases(
|
|
|
+ primaryTerm,
|
|
|
+ version,
|
|
|
+ Collections.unmodifiableCollection(new ArrayList<>(leases)));
|
|
|
+
|
|
|
+ final SoftDeletesPolicy policy =
|
|
|
+ new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
|
|
|
+ policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
|
|
|
+ assertThat(policy.getMinRetainedSeqNo(), equalTo(1 + localCheckpointOfSafeCommit));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testWhenRetentionLeasesDictateThePolicy() {
|
|
|
+ final int retentionOperations = randomIntBetween(0, 1024);
|
|
|
+ final Collection<RetentionLease> leases = new ArrayList<>();
|
|
|
+ final int numberOfLeases = randomIntBetween(1, 16);
|
|
|
+ for (int i = 0; i < numberOfLeases; i++) {
|
|
|
+ leases.add(new RetentionLease(
|
|
|
+ Integer.toString(i),
|
|
|
+ randomLongBetween(0, Long.MAX_VALUE - retentionOperations - 1),
|
|
|
+ randomNonNegativeLong(), "test"));
|
|
|
+ }
|
|
|
+ final OptionalLong minimumRetainingSequenceNumber = leases.stream().mapToLong(RetentionLease::retainingSequenceNumber).min();
|
|
|
+ assert minimumRetainingSequenceNumber.isPresent() : leases;
|
|
|
+ final long localCheckpointOfSafeCommit = randomLongBetween(minimumRetainingSequenceNumber.getAsLong(), Long.MAX_VALUE - 1);
|
|
|
+ final AtomicLong globalCheckpoint =
|
|
|
+ new AtomicLong(randomLongBetween(minimumRetainingSequenceNumber.getAsLong() + retentionOperations, Long.MAX_VALUE - 1));
|
|
|
+ final long primaryTerm = randomNonNegativeLong();
|
|
|
+ final long version = randomNonNegativeLong();
|
|
|
+ final Supplier<RetentionLeases> leasesSupplier =
|
|
|
+ () -> new RetentionLeases(
|
|
|
+ primaryTerm,
|
|
|
+ version,
|
|
|
+ Collections.unmodifiableCollection(new ArrayList<>(leases)));
|
|
|
+ final SoftDeletesPolicy policy =
|
|
|
+ new SoftDeletesPolicy(globalCheckpoint::get, 0, retentionOperations, leasesSupplier);
|
|
|
+ policy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
|
|
|
+ assertThat(policy.getMinRetainedSeqNo(), equalTo(minimumRetainingSequenceNumber.getAsLong()));
|
|
|
+ }
|
|
|
+
|
|
|
}
|