Browse Source

Separate acquiring safe commit and last commit (#28271)

Previously we introduced a new parameter to `acquireIndexCommit` to
allow acquire either a safe commit or a last commit. However with the
new parameters, callers can provide a nonsense combination - flush first
but acquire the safe commit. This commit separates acquireIndexCommit
method into two different methods to avoid that problem. Moreover, this
change should also improve the readability.

Relates #28038
Nhat Nguyen 7 năm trước cách đây
mục cha
commit
84fd39f5bb

+ 7 - 3
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -868,13 +868,17 @@ public abstract class Engine implements Closeable {
     public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
 
     /**
-     * Snapshots the index and returns a handle to it. If needed will try and "commit" the
+     * Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
      * lucene index to make sure we have a "fresh" copy of the files to snapshot.
      *
-     * @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit.
      * @param flushFirst indicates whether the engine should flush before returning the snapshot
      */
-    public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException;
+    public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;
+
+    /**
+     * Snapshots the most recent safe index commit from the engine.
+     */
+    public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException;
 
     /**
      * fail engine due to some error. the engine will also be closed.

+ 9 - 3
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1719,7 +1719,7 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException {
+    public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
         // we have to flush outside of the readlock otherwise we might have a problem upgrading
         // the to a write lock when we fail the engine in this operation
         if (flushFirst) {
@@ -1727,8 +1727,14 @@ public class InternalEngine extends Engine {
             flush(false, true);
             logger.trace("finish flush for snapshot");
         }
-        final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit);
-        return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit));
+        final IndexCommit lastCommit = combinedDeletionPolicy.acquireIndexCommit(false);
+        return new Engine.IndexCommitRef(lastCommit, () -> combinedDeletionPolicy.releaseCommit(lastCommit));
+    }
+
+    @Override
+    public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
+        final IndexCommit safeCommit = combinedDeletionPolicy.acquireIndexCommit(true);
+        return new Engine.IndexCommitRef(safeCommit, () -> combinedDeletionPolicy.releaseCommit(safeCommit));
     }
 
     private boolean failOnTragicEvent(AlreadyClosedException ex) {

+ 18 - 6
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1077,22 +1077,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     }
 
     /**
-     * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this
+     * Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
      * commit won't be freed until the commit / snapshot is closed.
      *
-     * @param safeCommit <code>true</code> capture the most recent safe commit point; otherwise the most recent commit point.
      * @param flushFirst <code>true</code> if the index should first be flushed to disk / a low level lucene commit should be executed
      */
-    public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException {
-        IndexShardState state = this.state; // one time volatile read
+    public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException {
+        final IndexShardState state = this.state; // one time volatile read
         // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
         if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
-            return getEngine().acquireIndexCommit(safeCommit, flushFirst);
+            return getEngine().acquireLastIndexCommit(flushFirst);
         } else {
             throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
         }
     }
 
+    /**
+     * Snapshots the most recent safe index commit from the currently running engine.
+     * All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
+     */
+    public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
+        final IndexShardState state = this.state; // one time volatile read
+        // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
+        if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) {
+            return getEngine().acquireSafeIndexCommit();
+        } else {
+            throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed");
+        }
+    }
 
     /**
      * gets a {@link Store.MetadataSnapshot} for the current directory. This method is safe to call in all lifecycle of the index shard,
@@ -1121,7 +1133,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                     return store.getMetadata(null, true);
                 }
             }
-            indexCommit = engine.acquireIndexCommit(false, false);
+            indexCommit = engine.acquireLastIndexCommit(false);
             return store.getMetadata(indexCommit.getIndexCommit());
         } finally {
             store.decRef();

+ 1 - 1
server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java

@@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable {
         store.incRef();
         boolean success = false;
         try {
-            indexCommit = shard.acquireIndexCommit(false, true);
+            indexCommit = shard.acquireLastIndexCommit(true);
             success = true;
         } finally {
             if (success == false) {

+ 2 - 2
server/src/main/java/org/elasticsearch/index/store/Store.java

@@ -238,7 +238,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      *
      * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
      * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
-     * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+     * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
      * @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
      *               directory
      * @throws CorruptIndexException      if the lucene index is corrupted. This can be caused by a checksum mismatch or an
@@ -262,7 +262,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
      *
      * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking
      * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard
-     * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
+     * {@link IndexShard#acquireLastIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed
      *
      * @param commit the index commit to read the snapshot from or <code>null</code> if the latest snapshot should be read from the
      *               directory

+ 1 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -159,7 +159,7 @@ public class RecoverySourceHandler {
             } else {
                 final Engine.IndexCommitRef phase1Snapshot;
                 try {
-                    phase1Snapshot = shard.acquireIndexCommit(true, false);
+                    phase1Snapshot = shard.acquireSafeIndexCommit();
                 } catch (final Exception e) {
                     throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
                 }

+ 1 - 1
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -176,7 +176,7 @@ public interface Repository extends LifecycleComponent {
     /**
      * Creates a snapshot of the shard based on the index commit point.
      * <p>
-     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireIndexCommit} method.
+     * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
      * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
      * <p>
      * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check

+ 1 - 1
server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java

@@ -390,7 +390,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
         final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository());
         try {
             // we flush first to make sure we get the latest writes snapshotted
-            try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) {
+            try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
                 repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus);
                 if (logger.isDebugEnabled()) {
                     final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();

+ 9 - 4
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -2121,7 +2121,7 @@ public class InternalEngineTests extends EngineTestCase {
             boolean doneIndexing;
             do {
                 doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS);
-                commits.add(engine.acquireIndexCommit(false, true));
+                commits.add(engine.acquireLastIndexCommit(true));
                 if (commits.size() > commitLimit) { // don't keep on piling up too many commits
                     IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1)));
                     // we increase the wait time to make sure we eventually if things are slow wait for threads to finish.
@@ -4355,7 +4355,12 @@ public class InternalEngineTests extends EngineTestCase {
             }
             final boolean flushFirst = randomBoolean();
             final boolean safeCommit = randomBoolean();
-            Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst);
+            final Engine.IndexCommitRef snapshot;
+            if (safeCommit) {
+                snapshot = engine.acquireSafeIndexCommit();
+            } else {
+                snapshot = engine.acquireLastIndexCommit(flushFirst);
+            }
             int moreDocs = between(1, 20);
             for (int i = 0; i < moreDocs; i++) {
                 index(engine, numDocs + i);
@@ -4363,11 +4368,11 @@ public class InternalEngineTests extends EngineTestCase {
             globalCheckpoint.set(numDocs + moreDocs - 1);
             engine.flush();
             // check that we can still read the commit that we captured
-            try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) {
+            try (IndexReader reader = DirectoryReader.open(snapshot.getIndexCommit())) {
                 assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0));
             }
             assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
-            commit.close();
+            snapshot.close();
             // check it's clean up
             engine.flush(true, true);
             assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));

+ 1 - 1
server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

@@ -397,7 +397,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
         when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
         when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
         when(shard.state()).thenReturn(IndexShardState.RELOCATED);
-        when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
+        when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
         doAnswer(invocation -> {
             ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
             return null;

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -620,7 +620,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
                                  final Snapshot snapshot,
                                  final Repository repository) throws IOException {
         final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
-        try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) {
+        try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
             Index index = shard.shardId().getIndex();
             IndexId indexId = new IndexId(index.getName(), index.getUUID());