Răsfoiți Sursa

Add primary term supplier to Engine.IndexCommitListener (#92101)

This change allows IndexCommitListener to have access to the 
index shard's primary term when they are invoked.

Relates #92017
Tanguy Leroux 2 ani în urmă
părinte
comite
2d8c93a9a6

+ 5 - 0
docs/changelog/92101.yaml

@@ -0,0 +1,5 @@
+pr: 92101
+summary: Add primary term supplier to Engine.IndexCommitListener
+area: Engine
+type: enhancement
+issues: []

+ 6 - 3
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -222,15 +222,18 @@ public abstract class Engine implements Closeable {
          * {@link IndexCommitRef} files to be deleted from disk until the reference is closed. As such, the listener must close the
          * reference as soon as it is done with it.
          *
+         * @param shardId the {@link ShardId} of shard
+         * @param primaryTerm the shard's primary term value
          * @param indexCommitRef a reference on the newly created index commit
          */
-        void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef);
+        void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef);
 
         /**
          * This method is invoked after the policy deleted the given {@link IndexCommit}. A listener is never notified of a deleted commit
-         * until the corresponding {@link Engine.IndexCommitRef} received through {@link #onNewCommit(ShardId, IndexCommitRef)} has been
-         * closed; closing which in turn can call this method directly.
+         * until the corresponding {@link Engine.IndexCommitRef} received through
+         * {@link #onNewCommit(ShardId, long, IndexCommitRef)} has been closed; closing which in turn can call this method directly.
          *
+         * @param shardId the {@link ShardId} of shard
          * @param deletedCommit the deleted {@link IndexCommit}
          */
         void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit);

+ 2 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -328,12 +328,13 @@ public class InternalEngine extends Engine {
     private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
         final Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
         if (listener != null) {
+            var primaryTerm = config().getPrimaryTermSupplier().getAsLong();
             return new CombinedDeletionPolicy.CommitsListener() {
                 @Override
                 public void onNewAcquiredCommit(final IndexCommit commit) {
                     final IndexCommitRef indexCommitRef = acquireIndexCommitRef(() -> commit);
                     assert indexCommitRef.getIndexCommit() == commit;
-                    listener.onNewCommit(shardId, indexCommitRef);
+                    listener.onNewCommit(shardId, primaryTerm, indexCommitRef);
                 }
 
                 @Override

+ 5 - 1
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -104,6 +104,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyMap;
@@ -637,12 +638,14 @@ public class IndexModuleTests extends ESTestCase {
             Collections.emptyMap()
         );
 
+        final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong();
         final AtomicReference<Engine.IndexCommitRef> lastAcquiredCommit = new AtomicReference<>();
         final AtomicReference<IndexCommit> lastDeletedCommit = new AtomicReference<>();
 
         module.setIndexCommitListener(new Engine.IndexCommitListener() {
             @Override
-            public void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef) {
+            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef) {
+                lastAcquiredPrimaryTerm.set(primaryTerm);
                 lastAcquiredCommit.set(indexCommitRef);
             }
 
@@ -687,6 +690,7 @@ public class IndexModuleTests extends ESTestCase {
             indexShard.recoverFromStore(recoveryFuture);
             recoveryFuture.get();
 
+            assertThat(lastAcquiredPrimaryTerm.get(), equalTo(indexShard.getOperationPrimaryTerm()));
             Engine.IndexCommitRef lastCommitRef = lastAcquiredCommit.get();
             assertThat(lastCommitRef, notNullValue());
             IndexCommit lastCommit = lastCommitRef.getIndexCommit();

+ 7 - 1
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -7478,12 +7478,15 @@ public class InternalEngineTests extends EngineTestCase {
     public void testIndexCommitsListener() throws Exception {
         final Map<IndexCommit, Engine.IndexCommitRef> acquiredCommits = new HashMap<>();
         final List<IndexCommit> deletedCommits = new ArrayList<>();
+        final List<Long> acquiredPrimaryTerms = new ArrayList<>();
 
         final Engine.IndexCommitListener indexCommitListener = new Engine.IndexCommitListener() {
             @Override
-            public void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef) {
+            public void onNewCommit(ShardId shardId, long primaryTerm, Engine.IndexCommitRef indexCommitRef) {
                 assertThat(acquiredCommits.put(indexCommitRef.getIndexCommit(), indexCommitRef), nullValue());
                 assertThat(shardId, equalTo(InternalEngineTests.this.shardId));
+                assertThat(primaryTerm, greaterThanOrEqualTo(0L));
+                acquiredPrimaryTerms.add(primaryTerm);
             }
 
             @Override
@@ -7567,6 +7570,9 @@ public class InternalEngineTests extends EngineTestCase {
             }
 
             releaseCommitRef(acquiredCommits, 7L);
+
+            final long primaryTerm = engine.config().getPrimaryTermSupplier().getAsLong();
+            assertThat(acquiredPrimaryTerms.stream().allMatch(value -> value == primaryTerm), is(true));
         }
     }