Browse Source

Bubble up exception when processing NoOp (#39338)

Today we do not bubble up exceptions when processing NoOps but always
treat them as document-level failures. This incorrect treatment causes
the assert_no_failure being tripped in peer-recovery if IndexWriter was
closed exceptionally before.

Closes #38898
Nhat Nguyen 6 năm trước cách đây
mục cha
commit
abf4c384ae

+ 1 - 1
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -400,7 +400,7 @@ public abstract class Engine implements Closeable {
      */
     public abstract DeleteResult delete(Delete delete) throws IOException;
 
-    public abstract NoOpResult noOp(NoOp noOp);
+    public abstract NoOpResult noOp(NoOp noOp) throws IOException;
 
     /**
      * Base class for index and delete operation results

+ 9 - 3
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1449,13 +1449,19 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public NoOpResult noOp(final NoOp noOp) {
-        NoOpResult noOpResult;
+    public NoOpResult noOp(final NoOp noOp) throws IOException {
+        final NoOpResult noOpResult;
         try (ReleasableLock ignored = readLock.acquire()) {
+            ensureOpen();
             markSeqNoAsSeen(noOp.seqNo());
             noOpResult = innerNoOp(noOp);
         } catch (final Exception e) {
-            noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
+            try {
+                maybeFailEngine("noop", e);
+            } catch (Exception inner) {
+                e.addSuppressed(inner);
+            }
+            throw e;
         }
         return noOpResult;
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -807,7 +807,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return noOp(engine, noOp);
     }
 
-    private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
+    private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) throws IOException {
         active.set(true);
         if (logger.isTraceEnabled()) {
             logger.trace("noop (seq# [{}])", noOp.seqNo());

+ 8 - 11
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -3232,7 +3232,7 @@ public class InternalEngineTests extends EngineTestCase {
             final ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_1, null);
 
             AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
-            try (Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
+            try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
                 (directory, iwc) -> {
                   throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
                   return throwingIndexWriter.get();
@@ -3297,16 +3297,13 @@ public class InternalEngineTests extends EngineTestCase {
                     engine.close();
                 }
                 // now the engine is closed check we respond correctly
-                try {
-                    if (randomBoolean()) {
-                        engine.index(indexForDoc(doc1));
-                    } else {
-                        engine.delete(new Engine.Delete("test", "", newUid(doc1), primaryTerm.get()));
-                    }
-                    fail("engine should be closed");
-                } catch (Exception e) {
-                    assertThat(e, instanceOf(AlreadyClosedException.class));
-                }
+                expectThrows(AlreadyClosedException.class, () -> engine.index(indexForDoc(doc1)));
+                expectThrows(AlreadyClosedException.class,
+                    () -> engine.delete(new Engine.Delete("test", "", newUid(doc1), primaryTerm.get())));
+                expectThrows(AlreadyClosedException.class, () -> engine.noOp(
+                    new Engine.NoOp(engine.getLocalCheckpointTracker().generateSeqNo(),
+                        engine.config().getPrimaryTermSupplier().getAsLong(),
+                        randomFrom(Engine.Operation.Origin.values()), randomNonNegativeLong(), "test")));
             }
         }
     }