Bläddra i källkod

Diff the list of filenames that are added by each new commit (#92238)

When a CommitsListener is defined the CombinedDeletionPolicy 
also computes the set of new files added by the last commit 
before passing it to the listener. The list is computed outside the 
lock by diffing the list of file names between the previous last 
commit and the new last commit.
Tanguy Leroux 2 år sedan
förälder
incheckning
f2c20c87f4

+ 5 - 0
docs/changelog/92238.yaml

@@ -0,0 +1,5 @@
+pr: 92238
+summary: Diff the list of filenames that are added by each new commit
+area: Engine
+type: enhancement
+issues: []

+ 13 - 3
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
 
 /**
  * An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
@@ -43,7 +46,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
 
     interface CommitsListener {
 
-        void onNewAcquiredCommit(IndexCommit commit);
+        void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);
 
         void onDeletedCommit(IndexCommit commit);
     }
@@ -102,13 +105,14 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
             totalDocsOfSafeCommit = safeCommitInfo.docCount;
         }
         IndexCommit newCommit = null;
+        IndexCommit previousLastCommit = null;
         List<IndexCommit> deletedCommits = null;
         synchronized (this) {
             this.safeCommitInfo = new SafeCommitInfo(
                 Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
                 totalDocsOfSafeCommit
             );
-            final IndexCommit previousLastCommit = this.lastCommit;
+            previousLastCommit = this.lastCommit;
             this.lastCommit = commits.get(commits.size() - 1);
             this.safeCommit = safeCommit;
             updateRetentionPolicy();
@@ -134,7 +138,8 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         assert assertSafeCommitUnchanged(safeCommit);
         if (commitsListener != null) {
             if (newCommit != null) {
-                commitsListener.onNewAcquiredCommit(newCommit);
+                final Set<String> additionalFiles = listOfNewFileNames(previousLastCommit, newCommit);
+                commitsListener.onNewAcquiredCommit(newCommit, additionalFiles);
             }
             if (deletedCommits != null) {
                 for (IndexCommit deletedCommit : deletedCommits) {
@@ -266,6 +271,11 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         return 0;
     }
 
+    private Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit current) throws IOException {
+        final Set<String> previousFiles = previous != null ? new HashSet<>(previous.getFileNames()) : Set.of();
+        return current.getFileNames().stream().filter(f -> previousFiles.contains(f) == false).collect(Collectors.toUnmodifiableSet());
+    }
+
     /**
      * Checks whether the deletion policy is holding on to snapshotted commits
      */

+ 6 - 5
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -222,16 +222,17 @@ public abstract class Engine implements Closeable {
          * {@link IndexCommitRef} files to be deleted from disk until the reference is closed. As such, the listener must close the
          * reference as soon as it is done with it.
          *
-         * @param shardId the {@link ShardId} of shard
-         * @param primaryTerm the shard's primary term value
-         * @param indexCommitRef a reference on the newly created index commit
+         * @param shardId         the {@link ShardId} of shard
+         * @param primaryTerm     the shard's primary term value
+         * @param indexCommitRef  a reference on the newly created index commit
+         * @param additionalFiles the set of filenames that are added by the new commit
          */
-        void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef);
+        void onNewCommit(ShardId shardId, long primaryTerm, IndexCommitRef indexCommitRef, Set<String> additionalFiles);
 
         /**
          * This method is invoked after the policy deleted the given {@link IndexCommit}. A listener is never notified of a deleted commit
          * until the corresponding {@link Engine.IndexCommitRef} received through
-         * {@link #onNewCommit(ShardId, long, IndexCommitRef)} has been closed; closing which in turn can call this method directly.
+         * {@link #onNewCommit(ShardId, long, IndexCommitRef, Set)} has been closed; closing which in turn can call this method directly.
          *
          * @param shardId the {@link ShardId} of shard
          * @param deletedCommit the deleted {@link IndexCommit}

+ 2 - 2
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -331,10 +331,10 @@ public class InternalEngine extends Engine {
             var primaryTerm = config().getPrimaryTermSupplier().getAsLong();
             return new CombinedDeletionPolicy.CommitsListener() {
                 @Override
-                public void onNewAcquiredCommit(final IndexCommit commit) {
+                public void onNewAcquiredCommit(final IndexCommit commit, final Set<String> additionalFiles) {
                     final IndexCommitRef indexCommitRef = acquireIndexCommitRef(() -> commit);
                     assert indexCommitRef.getIndexCommit() == commit;
-                    listener.onNewCommit(shardId, primaryTerm, indexCommitRef);
+                    listener.onNewCommit(shardId, primaryTerm, indexCommitRef, additionalFiles);
                 }
 
                 @Override

+ 1 - 1
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -644,7 +644,7 @@ public class IndexModuleTests extends ESTestCase {
 
         module.setIndexCommitListener(new Engine.IndexCommitListener() {
             @Override
-            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef) {
+            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef, Set<String> additionalFiles) {
                 lastAcquiredPrimaryTerm.set(primaryTerm);
                 lastAcquiredCommit.set(indexCommitRef);
             }

+ 163 - 7
server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -20,17 +20,22 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -273,11 +278,15 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
     public void testCommitsListener() throws Exception {
         final List<IndexCommit> acquiredCommits = new ArrayList<>();
         final List<IndexCommit> deletedCommits = new ArrayList<>();
+        final Set<String> newCommitFiles = new HashSet<>();
         final CombinedDeletionPolicy.CommitsListener commitsListener = new CombinedDeletionPolicy.CommitsListener() {
             @Override
-            public void onNewAcquiredCommit(IndexCommit commit) {
+            public void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles) {
                 assertThat(commit, instanceOf(FilterIndexCommit.class));
                 assertThat(acquiredCommits.add(((FilterIndexCommit) commit).getIndexCommit()), equalTo(true));
+                assertThat(additionalFiles, not(empty()));
+                newCommitFiles.clear();
+                newCommitFiles.addAll(additionalFiles);
             }
 
             @Override
@@ -315,44 +324,148 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         };
 
         final UUID translogUUID = UUID.randomUUID();
-        final IndexCommit commit0 = mockIndexCommit(NO_OPS_PERFORMED, NO_OPS_PERFORMED, translogUUID);
+        final IndexCommit commit0 = mockIndexCommit(NO_OPS_PERFORMED, NO_OPS_PERFORMED, translogUUID, Set.of("segments_0"));
         combinedDeletionPolicy.onInit(List.of(commit0));
 
         assertThat(acquiredCommits, contains(commit0));
         assertThat(deletedCommits, hasSize(0));
+        assertThat(newCommitFiles, contains("segments_0"));
 
-        final IndexCommit commit1 = mockIndexCommit(10L, 10L, translogUUID);
+        final IndexCommit commit1 = mockIndexCommit(10L, 10L, translogUUID, Set.of("_0.cfe", "_0.si", "_0.cfs", "segments_1"));
         combinedDeletionPolicy.onCommit(List.of(commit0, commit1));
 
         assertThat(acquiredCommits, contains(commit0, commit1));
         assertThat(deletedCommits, hasSize(0));
+        assertThat(newCommitFiles, containsInAnyOrder(equalTo("_0.cfe"), equalTo("_0.si"), equalTo("_0.cfs"), equalTo("segments_1")));
 
         globalCheckpoint.set(10L);
-        final IndexCommit commit2 = mockIndexCommit(20L, 20L, translogUUID);
+        final IndexCommit commit2 = mockIndexCommit(
+            20L,
+            20L,
+            translogUUID,
+            Set.of("_1.cfs", "_0.cfe", "_0.si", "_1.cfe", "_1.si", "_0.cfs", "segments_2")
+        );
         combinedDeletionPolicy.onCommit(List.of(commit0, commit1, commit2));
 
         assertThat(acquiredCommits, contains(commit0, commit1, commit2));
         assertThat(deletedCommits, hasSize(0));
+        assertThat(newCommitFiles, containsInAnyOrder(equalTo("_1.cfe"), equalTo("_1.si"), equalTo("_1.cfs"), equalTo("segments_2")));
 
         boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0);
         assertThat(maybeCleanUpCommits, equalTo(true));
 
         globalCheckpoint.set(20L);
-        final IndexCommit commit3 = mockIndexCommit(30L, 30L, translogUUID);
+        final IndexCommit commit3 = mockIndexCommit(
+            30L,
+            30L,
+            translogUUID,
+            Set.of(
+                "_3.fdx",
+                "_3_Lucene90_0.tip",
+                "_3_Lucene90_0.dvm",
+                "_3.si",
+                "_3_0.tmd",
+                "_3_0.tim",
+                "_3_ES85BloomFilter_0.bfi",
+                "_3_0.pos",
+                "_3_ES85BloomFilter_0.bfm",
+                "_3_0.tip",
+                "_3_Lucene90_0.doc",
+                "_3.nvd",
+                "_3.nvm",
+                "_3.fnm",
+                "_3_0.doc",
+                "segments_3",
+                "_3.kdd",
+                "_3_Lucene90_0.tmd",
+                "_3.fdm",
+                "_3.kdi",
+                "_3_Lucene90_0.dvd",
+                "_3_Lucene90_0.pos",
+                "_3.kdm",
+                "_3.fdt",
+                "_3_Lucene90_0.tim"
+            )
+        );
         combinedDeletionPolicy.onCommit(List.of(commit0, commit1, commit2, commit3));
 
         assertThat(acquiredCommits, contains(commit1, commit2, commit3));
         assertThat(deletedCommits, contains(commit0));
+        assertThat(
+            newCommitFiles,
+            containsInAnyOrder(
+                equalTo("_3.fdx"),
+                equalTo("_3_Lucene90_0.tip"),
+                equalTo("_3_Lucene90_0.dvm"),
+                equalTo("_3.si"),
+                equalTo("_3_0.tmd"),
+                equalTo("_3_0.tim"),
+                equalTo("_3_ES85BloomFilter_0.bfi"),
+                equalTo("_3_0.pos"),
+                equalTo("_3_ES85BloomFilter_0.bfm"),
+                equalTo("_3_0.tip"),
+                equalTo("_3_Lucene90_0.doc"),
+                equalTo("_3.nvd"),
+                equalTo("_3.nvm"),
+                equalTo("_3.fnm"),
+                equalTo("_3_0.doc"),
+                equalTo("segments_3"),
+                equalTo("_3.kdd"),
+                equalTo("_3_Lucene90_0.tmd"),
+                equalTo("_3.fdm"),
+                equalTo("_3.kdi"),
+                equalTo("_3_Lucene90_0.dvd"),
+                equalTo("_3_Lucene90_0.pos"),
+                equalTo("_3.kdm"),
+                equalTo("_3.fdt"),
+                equalTo("_3_Lucene90_0.tim")
+            )
+        );
 
         maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2);
         assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false));
 
         globalCheckpoint.set(30L);
-        final IndexCommit commit4 = mockIndexCommit(40L, 40L, translogUUID);
+        final IndexCommit commit4 = mockIndexCommit(
+            40L,
+            40L,
+            translogUUID,
+            Set.of(
+                "_3.fdx",
+                "_3_Lucene90_0.tip",
+                "_3_Lucene90_0.dvm",
+                "_3.si",
+                "_3_0.tmd",
+                "_3_0.tim",
+                "_3_ES85BloomFilter_0.bfi",
+                "_3_0.pos",
+                "_3_ES85BloomFilter_0.bfm",
+                "_3_0.tip",
+                "_3_Lucene90_0.doc",
+                "_3.nvd",
+                "_4.cfe",
+                "_3.nvm",
+                "_3.fnm",
+                "_3_0.doc",
+                "segments_4",
+                "_3.kdd",
+                "_3_Lucene90_0.tmd",
+                "_3.fdm",
+                "_3.kdi",
+                "_4.cfs",
+                "_3_Lucene90_0.dvd",
+                "_3_Lucene90_0.pos",
+                "_3.kdm",
+                "_4.si",
+                "_3.fdt",
+                "_3_Lucene90_0.tim"
+            )
+        );
         combinedDeletionPolicy.onCommit(List.of(commit1, commit2, commit3, commit4));
 
         assertThat(acquiredCommits, contains(commit1, commit3, commit4));
         assertThat(deletedCommits, contains(commit0, commit2));
+        assertThat(newCommitFiles, containsInAnyOrder(equalTo("_4.cfe"), equalTo("_4.si"), equalTo("_4.cfs"), equalTo("segments_4")));
 
         maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3);
         assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false));
@@ -366,7 +479,44 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final boolean globalCheckpointCatchUp = randomBoolean();
         globalCheckpoint.set(globalCheckpointCatchUp ? 50L : 40L);
 
-        final IndexCommit commit5 = mockIndexCommit(50L, 50L, translogUUID);
+        final IndexCommit commit5 = mockIndexCommit(
+            50L,
+            50L,
+            translogUUID,
+            Set.of(
+                "_3.fdx",
+                "_3_Lucene90_0.tip",
+                "_3_Lucene90_0.dvm",
+                "_3.si",
+                "_3_0.tmd",
+                "_3_0.tim",
+                "_3_ES85BloomFilter_0.bfi",
+                "_3_0.pos",
+                "_3_ES85BloomFilter_0.bfm",
+                "_3_0.tip",
+                "_5.si",
+                "_3_Lucene90_0.doc",
+                "segments_5",
+                "_3.nvd",
+                "_5.cfs",
+                "_4.cfe",
+                "_3.nvm",
+                "_3.fnm",
+                "_3_0.doc",
+                "_3.kdd",
+                "_3_Lucene90_0.tmd",
+                "_3.fdm",
+                "_3.kdi",
+                "_5.cfe",
+                "_4.cfs",
+                "_3_Lucene90_0.dvd",
+                "_3_Lucene90_0.pos",
+                "_3.kdm",
+                "_4.si",
+                "_3.fdt",
+                "_3_Lucene90_0.tim"
+            )
+        );
         combinedDeletionPolicy.onCommit(List.of(commit1, commit3, commit4, commit5));
 
         if (globalCheckpointCatchUp) {
@@ -376,6 +526,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
             assertThat(acquiredCommits, contains(commit4, commit5));
             assertThat(deletedCommits, contains(commit0, commit2, commit1, commit3));
         }
+        assertThat(newCommitFiles, containsInAnyOrder(equalTo("_5.cfe"), equalTo("_5.si"), equalTo("_5.cfs"), equalTo("segments_5")));
 
         maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5);
         assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false));
@@ -399,6 +550,10 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
     }
 
     IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID) throws IOException {
+        return mockIndexCommit(localCheckpoint, maxSeqNo, translogUUID, Set.of());
+    }
+
+    IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, Set<String> fileNames) throws IOException {
         final Map<String, String> userData = new HashMap<>();
         userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
         userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
@@ -407,6 +562,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final Directory directory = mock(Directory.class);
         when(commit.getUserData()).thenReturn(userData);
         when(commit.getDirectory()).thenReturn(directory);
+        when(commit.getFileNames()).thenReturn(fileNames);
         resetDeletion(commit);
         return commit;
     }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -7482,7 +7482,7 @@ public class InternalEngineTests extends EngineTestCase {
 
         final Engine.IndexCommitListener indexCommitListener = new Engine.IndexCommitListener() {
             @Override
-            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef) {
+            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef, Set<String> additionalFiles) {
                 assertThat(acquiredCommits.put(indexCommitRef.getIndexCommit(), indexCommitRef), nullValue());
                 assertThat(shardId, equalTo(InternalEngineTests.this.shardId));
                 assertThat(primaryTerm, greaterThanOrEqualTo(0L));