Ver Fonte

Hide AlreadyClosedException on IndexCommit release (#57986)

Today `InternalEngine#releaseIndexCommit` fails with an
`AlreadyClosedException` if the engine is closed before the index commit is
released. This can happen if, for example, a node leaves and rejoins the
cluster and acquires an index commit for replica shard allocation concurrently
with shutting the shard down.

There's no need to fail the operation like this: if the engine is shut down
then we will clean up the unreferenced files when it's restarted (or if it's
allocated elsewhere) so we can suppress an `AlreadyClosedException` in this
case. This commit does so.

Fixes #57797
David Turner há 5 anos atrás
pai
commit
4263de790a

+ 7 - 4
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1909,10 +1909,13 @@ public class InternalEngine extends Engine {
     private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
         // Revisit the deletion policy if we can clean up the snapshotting commit.
         if (combinedDeletionPolicy.releaseCommit(snapshot)) {
-            ensureOpen();
-            // Here we don't have to trim translog because snapshotting an index commit
-            // does not lock translog or prevents unreferenced files from trimming.
-            indexWriter.deleteUnusedFiles();
+            try {
+                // Here we don't have to trim translog because snapshotting an index commit
+                // does not lock translog or prevents unreferenced files from trimming.
+                indexWriter.deleteUnusedFiles();
+            } catch (AlreadyClosedException ignored) {
+                // That's ok, we'll clean up unused files the next time it's opened.
+            }
         }
     }
 

+ 13 - 5
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -4623,6 +4623,8 @@ public class InternalEngineTests extends EngineTestCase {
         IOUtils.close(engine, store);
         store = createStore();
         final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
+        final Engine.IndexCommitRef snapshot;
+        final boolean closeSnapshotBeforeEngine = randomBoolean();
         try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
             int numDocs = between(1, 20);
             for (int i = 0; i < numDocs; i++) {
@@ -4633,7 +4635,6 @@ public class InternalEngineTests extends EngineTestCase {
             }
             final boolean flushFirst = randomBoolean();
             final boolean safeCommit = randomBoolean();
-            final Engine.IndexCommitRef snapshot;
             if (safeCommit) {
                 snapshot = engine.acquireSafeIndexCommit();
             } else {
@@ -4650,10 +4651,17 @@ public class InternalEngineTests extends EngineTestCase {
                 assertThat(reader.numDocs(), equalTo(flushFirst && safeCommit == false ? numDocs : 0));
             }
             assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2));
-            snapshot.close();
-            // check it's clean up
-            engine.flush(true, true);
-            assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
+
+            if (closeSnapshotBeforeEngine) {
+                snapshot.close();
+                // check it's clean up
+                engine.flush(true, true);
+                assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
+            }
+        }
+
+        if (closeSnapshotBeforeEngine == false) {
+            snapshot.close(); // shouldn't throw AlreadyClosedException
         }
     }