Преглед изворни кода

Add commits listener for InternalEngine and CombinedDeletionPolicy (#92017)

This committ introduces a listener-based mechanism to the InternalEngine 
and CombinedDeletionPolicy that allows to listen to newly created or 
deleted index commits.

The listener can be configured in the IndexModule. It allows to listen to 
all commits creation (or deletion) as soon as an index shard is created 
(thus it also captures the Lucene commits executed when the shard is 
bootstrapped).

When a listener is defined, the CombinedDeletionPolicy automatically 
acquires the new IndexCommit. This is important to ensure that the 
files of the commit won't be deleted by Lucene while a listener is 
working on the commit. For that reason the acquired commit must 
be released as soon as possible since it retains files on disk.

By default, no listener is defined.
Tanguy Leroux пре 2 година
родитељ
комит
2e34bb61d7
19 измењених фајлова са 515 додато и 56 уклоњено
  1. 5 0
      docs/changelog/92017.yaml
  2. 2 1
      server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java
  3. 2 1
      server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java
  4. 8 1
      server/src/main/java/org/elasticsearch/index/IndexModule.java
  5. 6 2
      server/src/main/java/org/elasticsearch/index/IndexService.java
  6. 44 4
      server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
  7. 22 0
      server/src/main/java/org/elasticsearch/index/engine/Engine.java
  8. 11 1
      server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  9. 41 28
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  10. 6 2
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  11. 104 0
      server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
  12. 116 1
      server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java
  13. 122 2
      server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  14. 2 1
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  15. 2 1
      server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java
  17. 16 8
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  18. 2 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  19. 2 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

+ 5 - 0
docs/changelog/92017.yaml

@@ -0,0 +1,5 @@
+pr: 92017
+summary: Add commits listener for `InternalEngine` and `CombinedDeletionPolicy`
+area: Engine
+type: enhancement
+issues: []

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

@@ -660,7 +660,8 @@ public class IndexShardIT extends ESSingleNodeTestCase {
             RetentionLeaseSyncer.EMPTY,
             cbs,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-            System::nanoTime
+            System::nanoTime,
+            null
         );
     }
 

+ 2 - 1
server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

@@ -78,7 +78,8 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
                 config.getPrimaryTermSupplier(),
                 config.getSnapshotCommitSupplier(),
                 config.getLeafSorter(),
-                config.getRelativeTimeInNanosSupplier()
+                config.getRelativeTimeInNanosSupplier(),
+                config.getIndexCommitListener()
             );
         }
 

+ 8 - 1
server/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -165,6 +165,7 @@ public final class IndexModule {
     private final AtomicBoolean frozen = new AtomicBoolean(false);
     private final BooleanSupplier allowExpensiveQueries;
     private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
+    private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
 
     /**
      * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -370,6 +371,11 @@ public final class IndexModule {
         this.indexDirectoryWrapper.set(Objects.requireNonNull(wrapper));
     }
 
+    public void setIndexCommitListener(Engine.IndexCommitListener listener) {
+        ensureNotFrozen();
+        this.indexCommitListener.set(Objects.requireNonNull(listener));
+    }
+
     IndexEventListener freeze() { // pkg private for testing
         if (this.frozen.compareAndSet(false, true)) {
             return new CompositeIndexEventListener(indexSettings, indexEventListeners);
@@ -517,7 +523,8 @@ public final class IndexModule {
                 valuesSourceRegistry,
                 recoveryStateFactory,
                 indexFoldersDeletionListener,
-                snapshotCommitSupplier
+                snapshotCommitSupplier,
+                indexCommitListener.get()
             );
             success = true;
             return indexService;

+ 6 - 2
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -112,6 +112,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
     private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
     private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
+    private final Engine.IndexCommitListener indexCommitListener;
     private final IndexCache indexCache;
     private final MapperService mapperService;
     private final XContentParserConfiguration parserConfiguration;
@@ -175,7 +176,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         ValuesSourceRegistry valuesSourceRegistry,
         IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
         IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
-        IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier
+        IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
+        Engine.IndexCommitListener indexCommitListener
     ) {
         super(indexSettings);
         this.allowExpensiveQueries = allowExpensiveQueries;
@@ -242,6 +244,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         this.readerWrapper = wrapperFactory.apply(this);
         this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
         this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
+        this.indexCommitListener = indexCommitListener;
         try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
             // kick off async ops for the first shard in this index
             this.refreshTask = new AsyncRefreshTask(this);
@@ -516,7 +519,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 retentionLeaseSyncer,
                 circuitBreakerService,
                 snapshotCommitSupplier,
-                System::nanoTime
+                System::nanoTime,
+                indexCommitListener
             );
             eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
             eventListener.afterIndexShardCreated(indexShard);

+ 44 - 4
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -13,12 +13,14 @@ import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.index.SegmentInfos;
 import org.elasticsearch.common.lucene.FilterIndexCommit;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogDeletionPolicy;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -38,6 +40,17 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
     private final SoftDeletesPolicy softDeletesPolicy;
     private final LongSupplier globalCheckpointSupplier;
     private final Map<IndexCommit, Integer> snapshottedCommits; // Number of snapshots held against each commit point.
+
+    interface CommitsListener {
+
+        void onNewAcquiredCommit(IndexCommit commit);
+
+        void onDeletedCommit(IndexCommit commit);
+    }
+
+    @Nullable
+    private final CommitsListener commitsListener;
+
     private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
     private volatile long maxSeqNoOfNextSafeCommit;
     private volatile IndexCommit lastCommit; // the most recent commit point
@@ -47,12 +60,14 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         Logger logger,
         TranslogDeletionPolicy translogDeletionPolicy,
         SoftDeletesPolicy softDeletesPolicy,
-        LongSupplier globalCheckpointSupplier
+        LongSupplier globalCheckpointSupplier,
+        @Nullable CommitsListener commitsListener
     ) {
         this.logger = logger;
         this.translogDeletionPolicy = translogDeletionPolicy;
         this.softDeletesPolicy = softDeletesPolicy;
         this.globalCheckpointSupplier = globalCheckpointSupplier;
+        this.commitsListener = commitsListener;
         this.snapshottedCommits = new HashMap<>();
     }
 
@@ -86,11 +101,14 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
             logger.info("failed to get the total docs from the safe commit; use the total docs from the previous safe commit", ex);
             totalDocsOfSafeCommit = safeCommitInfo.docCount;
         }
+        IndexCommit newCommit = null;
+        List<IndexCommit> deletedCommits = null;
         synchronized (this) {
             this.safeCommitInfo = new SafeCommitInfo(
                 Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
                 totalDocsOfSafeCommit
             );
+            final IndexCommit previousLastCommit = this.lastCommit;
             this.lastCommit = commits.get(commits.size() - 1);
             this.safeCommit = safeCommit;
             updateRetentionPolicy();
@@ -99,13 +117,31 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
             } else {
                 this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
             }
+            if (commitsListener != null && previousLastCommit != this.lastCommit) {
+                newCommit = acquireIndexCommit(false);
+            }
             for (int i = 0; i < keptPosition; i++) {
-                if (snapshottedCommits.containsKey(commits.get(i)) == false) {
-                    deleteCommit(commits.get(i));
+                final IndexCommit commit = commits.get(i);
+                if (snapshottedCommits.containsKey(commit) == false) {
+                    deleteCommit(commit);
+                    if (deletedCommits == null) {
+                        deletedCommits = new ArrayList<>();
+                    }
+                    deletedCommits.add(commit);
                 }
             }
         }
         assert assertSafeCommitUnchanged(safeCommit);
+        if (commitsListener != null) {
+            if (newCommit != null) {
+                commitsListener.onNewAcquiredCommit(newCommit);
+            }
+            if (deletedCommits != null) {
+                for (IndexCommit deletedCommit : deletedCommits) {
+                    commitsListener.onDeletedCommit(deletedCommit);
+                }
+            }
+        }
     }
 
     private boolean assertSafeCommitUnchanged(IndexCommit safeCommit) {
@@ -154,7 +190,11 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         assert lastCommit != null : "Last commit is not initialized yet";
         final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
         snapshottedCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
-        return new SnapshotIndexCommit(snapshotting);
+        return wrapCommit(snapshotting);
+    }
+
+    protected IndexCommit wrapCommit(IndexCommit indexCommit) {
+        return new SnapshotIndexCommit(indexCommit);
     }
 
     /**

+ 22 - 0
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -214,6 +214,28 @@ public abstract class Engine implements Closeable {
         }
     }
 
+    public interface IndexCommitListener {
+
+        /**
+         * This method is invoked each time a new Lucene commit is created through this engine. There is no guarantee that a listener will
+         * be notified of the commits in order, ie newer commits may appear before older ones. The {@link IndexCommitRef} prevents the
+         * {@link IndexCommitRef} files to be deleted from disk until the reference is closed. As such, the listener must close the
+         * reference as soon as it is done with it.
+         *
+         * @param indexCommitRef a reference on the newly created index commit
+         */
+        void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef);
+
+        /**
+         * This method is invoked after the policy deleted the given {@link IndexCommit}. A listener is never notified of a deleted commit
+         * until the corresponding {@link Engine.IndexCommitRef} received through {@link #onNewCommit(ShardId, IndexCommitRef)} has been
+         * closed; closing which in turn can call this method directly.
+         *
+         * @param deletedCommit the deleted {@link IndexCommit}
+         */
+        void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit);
+    }
+
     /**
      * A throttling class that can be activated, causing the
      * {@code acquireThrottle} method to block on a lock when throttling

+ 11 - 1
server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -125,6 +125,9 @@ public final class EngineConfig {
 
     private final LongSupplier relativeTimeInNanosSupplier;
 
+    @Nullable
+    private final Engine.IndexCommitListener indexCommitListener;
+
     /**
      * Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
      */
@@ -152,7 +155,8 @@ public final class EngineConfig {
         LongSupplier primaryTermSupplier,
         IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
         Comparator<LeafReader> leafSorter,
-        LongSupplier relativeTimeInNanosSupplier
+        LongSupplier relativeTimeInNanosSupplier,
+        Engine.IndexCommitListener indexCommitListener
     ) {
         this.shardId = shardId;
         this.indexSettings = indexSettings;
@@ -193,6 +197,7 @@ public final class EngineConfig {
         this.snapshotCommitSupplier = snapshotCommitSupplier;
         this.leafSorter = leafSorter;
         this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
+        this.indexCommitListener = indexCommitListener;
     }
 
     /**
@@ -395,4 +400,9 @@ public final class EngineConfig {
     public LongSupplier getRelativeTimeInNanosSupplier() {
         return relativeTimeInNanosSupplier;
     }
+
+    @Nullable
+    public Engine.IndexCommitListener getIndexCommitListener() {
+        return indexCommitListener;
+    }
 }

+ 41 - 28
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -105,6 +105,7 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.LongConsumer;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -226,7 +227,8 @@ public class InternalEngine extends Engine {
                     logger,
                     translogDeletionPolicy,
                     softDeletesPolicy,
-                    translog::getLastSyncedGlobalCheckpoint
+                    translog::getLastSyncedGlobalCheckpoint,
+                    newCommitsListener()
                 );
                 this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
                 writer = createWriter();
@@ -322,6 +324,27 @@ public class InternalEngine extends Engine {
         );
     }
 
+    @Nullable
+    private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
+        final Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
+        if (listener != null) {
+            return new CombinedDeletionPolicy.CommitsListener() {
+                @Override
+                public void onNewAcquiredCommit(final IndexCommit commit) {
+                    final IndexCommitRef indexCommitRef = acquireIndexCommitRef(() -> commit);
+                    assert indexCommitRef.getIndexCommit() == commit;
+                    listener.onNewCommit(shardId, indexCommitRef);
+                }
+
+                @Override
+                public void onDeletedCommit(IndexCommit commit) {
+                    listener.onIndexCommitDelete(shardId, commit);
+                }
+            };
+        }
+        return null;
+    }
+
     @Override
     public CompletionStats completionStats(String... fieldNamePatterns) {
         return completionStatsCache.get(fieldNamePatterns);
@@ -2158,22 +2181,14 @@ public class InternalEngine extends Engine {
         }
     }
 
-    @Override
-    public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
-        // we have to flush outside of the readlock otherwise we might have a problem upgrading
-        // the to a write lock when we fail the engine in this operation
-        if (flushFirst) {
-            logger.trace("start flush for snapshot");
-            flush(false, true);
-            logger.trace("finish flush for snapshot");
-        }
+    private IndexCommitRef acquireIndexCommitRef(final Supplier<IndexCommit> indexCommitSupplier) {
         store.incRef();
         boolean success = false;
         try {
-            final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
+            final IndexCommit indexCommit = indexCommitSupplier.get();
             final IndexCommitRef commitRef = new IndexCommitRef(
-                lastCommit,
-                () -> IOUtils.close(() -> releaseIndexCommit(lastCommit), store::decRef)
+                indexCommit,
+                () -> IOUtils.close(() -> releaseIndexCommit(indexCommit), store::decRef)
             );
             success = true;
             return commitRef;
@@ -2185,22 +2200,20 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
-        store.incRef();
-        boolean success = false;
-        try {
-            final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
-            final IndexCommitRef commitRef = new IndexCommitRef(
-                safeCommit,
-                () -> IOUtils.close(() -> releaseIndexCommit(safeCommit), store::decRef)
-            );
-            success = true;
-            return commitRef;
-        } finally {
-            if (success == false) {
-                store.decRef();
-            }
+    public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
+        // we have to flush outside of the readlock otherwise we might have a problem upgrading
+        // the to a write lock when we fail the engine in this operation
+        if (flushFirst) {
+            logger.trace("start flush for snapshot");
+            flush(false, true);
+            logger.trace("finish flush for snapshot");
         }
+        return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
+    }
+
+    @Override
+    public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
+        return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true));
     }
 
     private void releaseIndexCommit(IndexCommit snapshot) throws IOException {

+ 6 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -222,6 +222,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     private final PendingReplicationActions pendingReplicationActions;
     private final ReplicationTracker replicationTracker;
     private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier;
+    private final Engine.IndexCommitListener indexCommitListener;
 
     protected volatile ShardRouting shardRouting;
     protected volatile IndexShardState state;
@@ -311,7 +312,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         final RetentionLeaseSyncer retentionLeaseSyncer,
         final CircuitBreakerService circuitBreakerService,
         final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
-        final LongSupplier relativeTimeInNanosSupplier
+        final LongSupplier relativeTimeInNanosSupplier,
+        final Engine.IndexCommitListener indexCommitListener
     ) throws IOException {
         super(shardRouting.shardId(), indexSettings);
         assert shardRouting.initializing();
@@ -390,6 +392,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         this.refreshPendingLocationListener = new RefreshPendingLocationListener();
         this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
         this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
+        this.indexCommitListener = indexCommitListener;
     }
 
     public ThreadPool getThreadPool() {
@@ -3270,7 +3273,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             this::getOperationPrimaryTerm,
             snapshotCommitSupplier,
             isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
-            relativeTimeInNanosSupplier
+            relativeTimeInNanosSupplier,
+            indexCommitListener
         );
     }
 

+ 104 - 0
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -11,6 +11,7 @@ import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardTokenizer;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.FieldInvertState;
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.CollectionStatistics;
 import org.apache.lucene.search.QueryCachingPolicy;
@@ -23,13 +24,17 @@ import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.tests.index.AssertingDirectoryReader;
 import org.apache.lucene.util.SetOnce.AlreadySetException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.RecoverySource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -51,12 +56,15 @@ import org.elasticsearch.index.cache.query.IndexQueryCache;
 import org.elasticsearch.index.cache.query.QueryCache;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.EngineTestCase;
+import org.elasticsearch.index.engine.InternalEngine;
 import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.mapper.Uid;
+import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
 import org.elasticsearch.index.shard.IndexEventListener;
+import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.shard.SearchOperationListener;
 import org.elasticsearch.index.shard.ShardId;
@@ -84,24 +92,31 @@ import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.Matchers;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Mockito.mock;
 
 public class IndexModuleTests extends ESTestCase {
@@ -415,6 +430,7 @@ public class IndexModuleTests extends ESTestCase {
         assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setReaderWrapper(null)).getMessage());
         assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheProvider(null)).getMessage());
         assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setDirectoryWrapper(null)).getMessage());
+        assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setIndexCommitListener(null)).getMessage());
     }
 
     public void testSetupUnknownSimilarity() {
@@ -610,6 +626,94 @@ public class IndexModuleTests extends ESTestCase {
         indexService.close("closing", false);
     }
 
+    public void testIndexCommitListenerIsBound() throws IOException, ExecutionException, InterruptedException {
+        IndexModule module = new IndexModule(
+            indexSettings,
+            emptyAnalysisRegistry,
+            InternalEngine::new,
+            Collections.emptyMap(),
+            () -> true,
+            indexNameExpressionResolver,
+            Collections.emptyMap()
+        );
+
+        final AtomicReference<Engine.IndexCommitRef> lastAcquiredCommit = new AtomicReference<>();
+        final AtomicReference<IndexCommit> lastDeletedCommit = new AtomicReference<>();
+
+        module.setIndexCommitListener(new Engine.IndexCommitListener() {
+            @Override
+            public void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef) {
+                lastAcquiredCommit.set(indexCommitRef);
+            }
+
+            @Override
+            public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
+                lastDeletedCommit.set(deletedCommit);
+            }
+        });
+
+        final List<Closeable> closeables = new ArrayList<>();
+        try {
+            ShardId shardId = new ShardId("index", UUIDs.randomBase64UUID(random()), 0);
+            ShardRouting shardRouting = ShardRouting.newUnassigned(
+                shardId,
+                true,
+                RecoverySource.EmptyStoreRecoverySource.INSTANCE,
+                new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
+            ).initialize("_node_id", null, -1);
+
+            IndexService indexService = newIndexService(module);
+            closeables.add(() -> indexService.close("close index service at end of test", false));
+
+            IndexShard indexShard = indexService.createShard(shardRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
+            closeables.add(() -> indexShard.close("close shard at end of test", true));
+            indexShard.markAsRecovering(
+                "test",
+                new RecoveryState(
+                    shardRouting,
+                    new DiscoveryNode(
+                        "_node_id",
+                        "_node_id",
+                        buildNewFakeTransportAddress(),
+                        Collections.emptyMap(),
+                        DiscoveryNodeRole.roles(),
+                        Version.CURRENT
+                    ),
+                    null
+                )
+            );
+
+            final PlainActionFuture<Boolean> recoveryFuture = PlainActionFuture.newFuture();
+            indexShard.recoverFromStore(recoveryFuture);
+            recoveryFuture.get();
+
+            Engine.IndexCommitRef lastCommitRef = lastAcquiredCommit.get();
+            assertThat(lastCommitRef, notNullValue());
+            IndexCommit lastCommit = lastCommitRef.getIndexCommit();
+            assertThat(lastCommit.getGeneration(), equalTo(2L));
+            IndexCommit lastDeleted = lastDeletedCommit.get();
+            assertThat(lastDeleted, nullValue());
+
+            lastCommitRef.close();
+
+            indexShard.flush(new FlushRequest("index").force(true));
+
+            lastDeleted = lastDeletedCommit.get();
+            assertThat(lastDeleted.getGeneration(), equalTo(lastCommit.getGeneration()));
+            assertThat(lastDeleted.getSegmentsFileName(), equalTo(lastCommit.getSegmentsFileName()));
+            assertThat(lastDeleted.isDeleted(), equalTo(true));
+
+            lastCommitRef = lastAcquiredCommit.get();
+            assertThat(lastCommitRef, notNullValue());
+            lastCommit = lastCommitRef.getIndexCommit();
+            assertThat(lastCommit.getGeneration(), equalTo(3L));
+
+            lastCommitRef.close();
+        } finally {
+            IOUtils.close(closeables);
+        }
+    }
+
     private ShardRouting createInitializedShardRouting() {
         ShardRouting shard = ShardRouting.newUnassigned(
             new ShardId("test", "_na_", 0),

+ 116 - 1
server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.index.engine;
 
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
+import org.elasticsearch.common.lucene.FilterIndexCommit;
 import org.elasticsearch.index.seqno.RetentionLeases;
 import org.elasticsearch.index.seqno.SequenceNumbers;
 import org.elasticsearch.index.translog.Translog;
@@ -26,7 +27,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -266,12 +270,123 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         }
     }
 
+    public void testCommitsListener() throws Exception {
+        final List<IndexCommit> acquiredCommits = new ArrayList<>();
+        final List<IndexCommit> deletedCommits = new ArrayList<>();
+        final CombinedDeletionPolicy.CommitsListener commitsListener = new CombinedDeletionPolicy.CommitsListener() {
+            @Override
+            public void onNewAcquiredCommit(IndexCommit commit) {
+                assertThat(commit, instanceOf(FilterIndexCommit.class));
+                assertThat(acquiredCommits.add(((FilterIndexCommit) commit).getIndexCommit()), equalTo(true));
+            }
+
+            @Override
+            public void onDeletedCommit(IndexCommit commit) {
+                assertThat(acquiredCommits.remove(commit), equalTo(true));
+                assertThat(deletedCommits.add(commit), equalTo(true));
+                assertThat(commit.isDeleted(), equalTo(true));
+            }
+        };
+
+        final AtomicLong globalCheckpoint = new AtomicLong(0L);
+        final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
+        final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(
+            globalCheckpoint::get,
+            NO_OPS_PERFORMED,
+            0L,
+            () -> RetentionLeases.EMPTY
+        );
+        final CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(
+            logger,
+            translogDeletionPolicy,
+            softDeletesPolicy,
+            globalCheckpoint::get,
+            commitsListener
+        ) {
+            @Override
+            protected int getDocCountOfCommit(IndexCommit indexCommit) {
+                return 10;
+            }
+
+            @Override
+            synchronized boolean releaseCommit(IndexCommit indexCommit) {
+                return super.releaseCommit(wrapCommit(indexCommit));
+            }
+        };
+
+        final UUID translogUUID = UUID.randomUUID();
+        final IndexCommit commit0 = mockIndexCommit(NO_OPS_PERFORMED, NO_OPS_PERFORMED, translogUUID);
+        combinedDeletionPolicy.onInit(List.of(commit0));
+
+        assertThat(acquiredCommits, contains(commit0));
+        assertThat(deletedCommits, hasSize(0));
+
+        final IndexCommit commit1 = mockIndexCommit(10L, 10L, translogUUID);
+        combinedDeletionPolicy.onCommit(List.of(commit0, commit1));
+
+        assertThat(acquiredCommits, contains(commit0, commit1));
+        assertThat(deletedCommits, hasSize(0));
+
+        globalCheckpoint.set(10L);
+        final IndexCommit commit2 = mockIndexCommit(20L, 20L, translogUUID);
+        combinedDeletionPolicy.onCommit(List.of(commit0, commit1, commit2));
+
+        assertThat(acquiredCommits, contains(commit0, commit1, commit2));
+        assertThat(deletedCommits, hasSize(0));
+
+        boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0);
+        assertThat(maybeCleanUpCommits, equalTo(true));
+
+        globalCheckpoint.set(20L);
+        final IndexCommit commit3 = mockIndexCommit(30L, 30L, translogUUID);
+        combinedDeletionPolicy.onCommit(List.of(commit0, commit1, commit2, commit3));
+
+        assertThat(acquiredCommits, contains(commit1, commit2, commit3));
+        assertThat(deletedCommits, contains(commit0));
+
+        maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2);
+        assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false));
+
+        globalCheckpoint.set(30L);
+        final IndexCommit commit4 = mockIndexCommit(40L, 40L, translogUUID);
+        combinedDeletionPolicy.onCommit(List.of(commit1, commit2, commit3, commit4));
+
+        assertThat(acquiredCommits, contains(commit1, commit3, commit4));
+        assertThat(deletedCommits, contains(commit0, commit2));
+
+        maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3);
+        assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false));
+
+        maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit4);
+        assertThat("No commits to clean up (commit #4 is the last commit)", maybeCleanUpCommits, equalTo(false));
+
+        maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit1);
+        assertThat(maybeCleanUpCommits, equalTo(true));
+
+        final boolean globalCheckpointCatchUp = randomBoolean();
+        globalCheckpoint.set(globalCheckpointCatchUp ? 50L : 40L);
+
+        final IndexCommit commit5 = mockIndexCommit(50L, 50L, translogUUID);
+        combinedDeletionPolicy.onCommit(List.of(commit1, commit3, commit4, commit5));
+
+        if (globalCheckpointCatchUp) {
+            assertThat(acquiredCommits, contains(commit5));
+            assertThat(deletedCommits, contains(commit0, commit2, commit1, commit3, commit4));
+        } else {
+            assertThat(acquiredCommits, contains(commit4, commit5));
+            assertThat(deletedCommits, contains(commit0, commit2, commit1, commit3));
+        }
+
+        maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5);
+        assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false));
+    }
+
     private CombinedDeletionPolicy newCombinedDeletionPolicy(
         TranslogDeletionPolicy translogPolicy,
         SoftDeletesPolicy softDeletesPolicy,
         AtomicLong globalCheckpoint
     ) {
-        return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get) {
+        return new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get, null) {
             @Override
             protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
                 if (randomBoolean()) {

+ 122 - 2
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -193,6 +193,7 @@ import static org.hamcrest.Matchers.containsInRelativeOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyArray;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -3588,7 +3589,8 @@ public class InternalEngineTests extends EngineTestCase {
             primaryTerm::get,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
             null,
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            null
         );
         expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
 
@@ -7257,7 +7259,8 @@ public class InternalEngineTests extends EngineTestCase {
                 config.getPrimaryTermSupplier(),
                 config.getSnapshotCommitSupplier(),
                 config.getLeafSorter(),
-                config.getRelativeTimeInNanosSupplier()
+                config.getRelativeTimeInNanosSupplier(),
+                config.getIndexCommitListener()
             );
             try (InternalEngine engine = createEngine(configWithWarmer)) {
                 assertThat(warmedUpReaders, empty());
@@ -7471,4 +7474,121 @@ public class InternalEngineTests extends EngineTestCase {
             assertThat(userDataAfterTrimUnsafeCommits.get(ES_VERSION), is(equalTo(Version.CURRENT.toString())));
         }
     }
+
+    public void testIndexCommitsListener() throws Exception {
+        final Map<IndexCommit, Engine.IndexCommitRef> acquiredCommits = new HashMap<>();
+        final List<IndexCommit> deletedCommits = new ArrayList<>();
+
+        final Engine.IndexCommitListener indexCommitListener = new Engine.IndexCommitListener() {
+            @Override
+            public void onNewCommit(ShardId shardId, Engine.IndexCommitRef indexCommitRef) {
+                assertThat(acquiredCommits.put(indexCommitRef.getIndexCommit(), indexCommitRef), nullValue());
+                assertThat(shardId, equalTo(InternalEngineTests.this.shardId));
+            }
+
+            @Override
+            public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
+                assertThat(acquiredCommits.remove(deletedCommit), notNullValue());
+                assertThat(deletedCommits.add(deletedCommit), equalTo(true));
+                assertThat(deletedCommit.isDeleted(), equalTo(true));
+                assertThat(shardId, equalTo(InternalEngineTests.this.shardId));
+            }
+        };
+
+        final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        try (
+            Store store = createStore();
+            InternalEngine engine = createEngine(
+                config(
+                    defaultSettings,
+                    store,
+                    createTempDir(),
+                    NoMergePolicy.INSTANCE,
+                    null,
+                    null,
+                    null,
+                    globalCheckpoint::get,
+                    () -> RetentionLeases.EMPTY,
+                    new NoneCircuitBreakerService(),
+                    indexCommitListener
+                )
+            )
+        ) {
+            assertCommitGenerations(acquiredCommits, List.of(2L));
+            assertCommitGenerations(deletedCommits, List.of());
+
+            engine.index(indexForDoc(createParsedDoc("a", EngineTestCase.randomIdFieldType(), null)));
+            engine.flush();
+
+            assertCommitGenerations(acquiredCommits, List.of(2L, 3L));
+            assertCommitGenerations(deletedCommits, List.of());
+
+            globalCheckpoint.set(0L);
+            engine.index(indexForDoc(createParsedDoc("b", EngineTestCase.randomIdFieldType(), null)));
+            engine.flush();
+
+            assertCommitGenerations(acquiredCommits, List.of(2L, 3L, 4L));
+            assertCommitGenerations(deletedCommits, List.of());
+
+            releaseCommitRef(acquiredCommits, 2L);
+
+            globalCheckpoint.set(1L);
+            engine.index(indexForDoc(createParsedDoc("c", EngineTestCase.randomIdFieldType(), null)));
+            engine.flush();
+
+            assertCommitGenerations(acquiredCommits, List.of(3L, 4L, 5L));
+            assertCommitGenerations(deletedCommits, List.of(2L));
+
+            releaseCommitRef(acquiredCommits, 4L);
+
+            globalCheckpoint.set(2L);
+            engine.index(indexForDoc(createParsedDoc("d", EngineTestCase.randomIdFieldType(), null)));
+            engine.flush();
+
+            assertCommitGenerations(acquiredCommits, List.of(3L, 5L, 6L));
+            assertCommitGenerations(deletedCommits, List.of(2L, 4L));
+
+            releaseCommitRef(acquiredCommits, 5L);
+            releaseCommitRef(acquiredCommits, 6L);
+            releaseCommitRef(acquiredCommits, 3L);
+
+            final boolean globalCheckpointCatchUp = randomBoolean();
+            globalCheckpoint.set(globalCheckpointCatchUp ? 4L : 3L);
+
+            engine.index(indexForDoc(createParsedDoc("e", EngineTestCase.randomIdFieldType(), null)));
+            engine.flush();
+
+            if (globalCheckpointCatchUp) {
+                assertCommitGenerations(acquiredCommits, List.of(7L));
+                assertCommitGenerations(deletedCommits, List.of(2L, 3L, 4L, 5L, 6L));
+            } else {
+                assertCommitGenerations(acquiredCommits, List.of(6L, 7L));
+                assertCommitGenerations(deletedCommits, List.of(2L, 3L, 4L, 5L));
+            }
+
+            releaseCommitRef(acquiredCommits, 7L);
+        }
+    }
+
+    private static void assertCommitGenerations(Map<IndexCommit, Engine.IndexCommitRef> commits, List<Long> expectedGenerations) {
+        assertCommitGenerations(commits.values().stream().map(Engine.IndexCommitRef::getIndexCommit).toList(), expectedGenerations);
+    }
+
+    private static void assertCommitGenerations(List<IndexCommit> commits, List<Long> expectedGenerations) {
+        assertThat(
+            commits.stream().map(IndexCommit::getGeneration).sorted().toList(),
+            expectedGenerations.isEmpty() ? emptyIterable() : equalTo(expectedGenerations)
+        );
+    }
+
+    private static void releaseCommitRef(Map<IndexCommit, Engine.IndexCommitRef> commits, long generation) {
+        var releasable = commits.keySet().stream().filter(c -> c.getGeneration() == generation).findFirst();
+        assertThat(releasable.isPresent(), is(true));
+        Engine.IndexCommitRef indexCommitRef = commits.get(releasable.get());
+        try {
+            indexCommitRef.close();
+        } catch (IOException e) {
+            throw new AssertionError("Failed to release IndexCommitRef for commit generation " + generation, e);
+        }
+    }
 }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -4519,7 +4519,8 @@ public class IndexShardTests extends IndexShardTestCase {
                 config.getPrimaryTermSupplier(),
                 IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
                 config.getLeafSorter(),
-                config.getRelativeTimeInNanosSupplier()
+                config.getRelativeTimeInNanosSupplier(),
+                config.getIndexCommitListener()
             );
             return new InternalEngine(configWithWarmer);
         });

+ 2 - 1
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -152,7 +152,8 @@ public class RefreshListenersTests extends ESTestCase {
             () -> primaryTerm,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
             null,
-            System::nanoTime
+            System::nanoTime,
+            null
         );
         engine = new InternalEngine(config);
         engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);

+ 2 - 1
server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java

@@ -401,7 +401,8 @@ public class IndexingMemoryControllerTests extends IndexShardTestCase {
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
             config.getLeafSorter(),
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            config.getIndexCommitListener()
         );
     }
 

+ 16 - 8
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -274,7 +274,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
             config.getLeafSorter(),
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            config.getIndexCommitListener()
         );
     }
 
@@ -303,7 +304,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
             config.getLeafSorter(),
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            config.getIndexCommitListener()
         );
     }
 
@@ -332,7 +334,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
             config.getLeafSorter(),
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            config.getIndexCommitListener()
         );
     }
 
@@ -754,7 +757,8 @@ public abstract class EngineTestCase extends ESTestCase {
             indexSort,
             globalCheckpointSupplier,
             retentionLeasesSupplier,
-            new NoneCircuitBreakerService()
+            new NoneCircuitBreakerService(),
+            null
         );
     }
 
@@ -779,7 +783,8 @@ public abstract class EngineTestCase extends ESTestCase {
             indexSort,
             maybeGlobalCheckpointSupplier,
             maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY,
-            breakerService
+            breakerService,
+            null
         );
     }
 
@@ -793,7 +798,8 @@ public abstract class EngineTestCase extends ESTestCase {
         final Sort indexSort,
         final @Nullable LongSupplier maybeGlobalCheckpointSupplier,
         final @Nullable Supplier<RetentionLeases> maybeRetentionLeasesSupplier,
-        final CircuitBreakerService breakerService
+        final CircuitBreakerService breakerService,
+        final @Nullable Engine.IndexCommitListener indexCommitListener
     ) {
         final IndexWriterConfig iwc = newIndexWriterConfig();
         final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
@@ -851,7 +857,8 @@ public abstract class EngineTestCase extends ESTestCase {
             primaryTerm,
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
             null,
-            System::nanoTime
+            System::nanoTime,
+            indexCommitListener
         );
     }
 
@@ -888,7 +895,8 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getPrimaryTermSupplier(),
             config.getSnapshotCommitSupplier(),
             config.getLeafSorter(),
-            config.getRelativeTimeInNanosSupplier()
+            config.getRelativeTimeInNanosSupplier(),
+            config.getIndexCommitListener()
         );
     }
 

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -503,7 +503,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
                 retentionLeaseSyncer,
                 breakerService,
                 IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
-                relativeTimeSupplier
+                relativeTimeSupplier,
+                null
             );
             indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
             success = true;

+ 2 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -287,7 +287,8 @@ public class FollowingEngineTests extends ESTestCase {
             () -> primaryTerm.get(),
             IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
             null,
-            System::nanoTime
+            System::nanoTime,
+            null
         );
     }