SoftDeletesPolicy.java 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.index.engine;
  20. import org.apache.lucene.document.LongPoint;
  21. import org.apache.lucene.search.Query;
  22. import org.elasticsearch.common.collect.Tuple;
  23. import org.elasticsearch.common.lease.Releasable;
  24. import org.elasticsearch.index.mapper.SeqNoFieldMapper;
  25. import org.elasticsearch.index.seqno.RetentionLease;
  26. import org.elasticsearch.index.seqno.RetentionLeases;
  27. import org.elasticsearch.index.seqno.SequenceNumbers;
  28. import org.elasticsearch.index.translog.Translog;
  29. import java.util.Objects;
  30. import java.util.concurrent.atomic.AtomicBoolean;
  31. import java.util.function.LongSupplier;
  32. import java.util.function.Supplier;
  33. /**
  34. * A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose.
  35. */
  36. final class SoftDeletesPolicy {
  37. private final LongSupplier globalCheckpointSupplier;
  38. private long localCheckpointOfSafeCommit;
  39. // This lock count is used to prevent `minRetainedSeqNo` from advancing.
  40. private int retentionLockCount;
  41. // The extra number of operations before the global checkpoint are retained
  42. private long retentionOperations;
  43. // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
  44. private long minRetainedSeqNo;
  45. // provides the retention leases used to calculate the minimum sequence number to retain
  46. private final Supplier<RetentionLeases> retentionLeasesSupplier;
  47. SoftDeletesPolicy(
  48. final LongSupplier globalCheckpointSupplier,
  49. final long minRetainedSeqNo,
  50. final long retentionOperations,
  51. final Supplier<RetentionLeases> retentionLeasesSupplier) {
  52. this.globalCheckpointSupplier = globalCheckpointSupplier;
  53. this.retentionOperations = retentionOperations;
  54. this.minRetainedSeqNo = minRetainedSeqNo;
  55. this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
  56. this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
  57. this.retentionLockCount = 0;
  58. }
  59. /**
  60. * Updates the number of soft-deleted documents prior to the global checkpoint to be retained
  61. * See {@link org.elasticsearch.index.IndexSettings#INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING}
  62. */
  63. synchronized void setRetentionOperations(long retentionOperations) {
  64. this.retentionOperations = retentionOperations;
  65. }
  66. /**
  67. * Sets the local checkpoint of the current safe commit
  68. */
  69. synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
  70. if (newCheckpoint < this.localCheckpointOfSafeCommit) {
  71. throw new IllegalArgumentException("Local checkpoint can't go backwards; " +
  72. "new checkpoint [" + newCheckpoint + "]," + "current checkpoint [" + localCheckpointOfSafeCommit + "]");
  73. }
  74. this.localCheckpointOfSafeCommit = newCheckpoint;
  75. }
  76. /**
  77. * Acquires a lock on soft-deleted documents to prevent them from cleaning up in merge processes. This is necessary to
  78. * make sure that all operations that are being retained will be retained until the lock is released.
  79. * This is a analogy to the translog's retention lock; see {@link Translog#acquireRetentionLock()}
  80. */
  81. synchronized Releasable acquireRetentionLock() {
  82. assert retentionLockCount >= 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
  83. retentionLockCount++;
  84. final AtomicBoolean released = new AtomicBoolean();
  85. return () -> {
  86. if (released.compareAndSet(false, true)) {
  87. releaseRetentionLock();
  88. }
  89. };
  90. }
  91. private synchronized void releaseRetentionLock() {
  92. assert retentionLockCount > 0 : "Invalid number of retention locks [" + retentionLockCount + "]";
  93. retentionLockCount--;
  94. }
  95. /**
  96. * Returns the min seqno that is retained in the Lucene index.
  97. * Operations whose seq# is least this value should exist in the Lucene index.
  98. */
  99. synchronized long getMinRetainedSeqNo() {
  100. return getRetentionPolicy().v1();
  101. }
  102. public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
  103. /*
  104. * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
  105. * locked for peer recovery.
  106. */
  107. final RetentionLeases retentionLeases = retentionLeasesSupplier.get();
  108. // do not advance if the retention lock is held
  109. if (retentionLockCount == 0) {
  110. /*
  111. * This policy retains operations for two purposes: peer-recovery and querying changes history.
  112. * - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
  113. * then sends operations after the local checkpoint of that commit. This requires keeping all ops after
  114. * localCheckpointOfSafeCommit.
  115. * - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
  116. * prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
  117. * checkpoint are exposed in the the changes APIs.
  118. */
  119. // calculate the minimum sequence number to retain based on retention leases
  120. final long minimumRetainingSequenceNumber = retentionLeases
  121. .leases()
  122. .stream()
  123. .mapToLong(RetentionLease::retainingSequenceNumber)
  124. .min()
  125. .orElse(Long.MAX_VALUE);
  126. /*
  127. * The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations
  128. * below the global checkpoint to retain (index.soft_deletes.retention.operations). The additional increments on the global
  129. * checkpoint and the local checkpoint of the safe commit are due to the fact that we want to retain all operations above
  130. * those checkpoints.
  131. */
  132. final long minSeqNoForQueryingChanges =
  133. Math.min(1 + globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
  134. final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, 1 + localCheckpointOfSafeCommit);
  135. /*
  136. * We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
  137. * the addition of leases with a retaining sequence number lower than previous retaining sequence numbers.
  138. */
  139. minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
  140. }
  141. return Tuple.tuple(minRetainedSeqNo, retentionLeases);
  142. }
  143. /**
  144. * Returns a soft-deletes retention query that will be used in {@link org.apache.lucene.index.SoftDeletesRetentionMergePolicy}
  145. * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
  146. */
  147. Query getRetentionQuery() {
  148. return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
  149. }
  150. }