Browse Source

Pull blocking up from recoverFromTranslogInternal (#96607)

Working towards removing this blocking from the recovery process, but
there's lots of callers to adjust so it makes sense to break it into
steps.
David Turner 2 years ago
parent
commit
300c011c3e

+ 43 - 1
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
+import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.core.CheckedRunnable;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
@@ -77,6 +78,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -1917,7 +1919,47 @@ public abstract class Engine implements Closeable {
      * @param translogRecoveryRunner the translog recovery runner
      * @param recoverUpToSeqNo       the upper bound, inclusive, of sequence number to be recovered
      */
-    public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
+    // TODO move this blocking implementation into tests (adding a timeout) and make all the production usages fully async
+    public final void recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
+        final var future = new PlainActionFuture<Void>();
+        recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo, future);
+        try {
+            future.get();
+        } catch (ExecutionException e) {
+            // This is a (temporary) adapter between the older synchronous (blocking) code and the newer (async) API. Callers expect
+            // exceptions to be thrown directly, but Future#get adds an ExecutionException wrapper which we must remove to preserve the
+            // expected exception semantics.
+            if (e.getCause() instanceof IOException ioException) {
+                throw ioException;
+            } else if (e.getCause() instanceof RuntimeException runtimeException) {
+                throw runtimeException;
+            } else {
+                // the old code was "throws IOException" so we shouldn't see any other exception types here
+                logger.error("checked non-IOException unexpectedly thrown", e);
+                assert false : e;
+                throw new UncategorizedExecutionException("recoverFromTranslog", e);
+            }
+        } catch (InterruptedException e) {
+            // We don't really use interrupts in this area so this is somewhat unexpected (unless perhaps we're shutting down), just treat
+            // it like any other exception.
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
+     * This operation will close the engine if the recovery fails.
+     *
+     * @param translogRecoveryRunner the translog recovery runner
+     * @param recoverUpToSeqNo       the upper bound, inclusive, of sequence number to be recovered
+     * @param listener               listener notified on completion of the recovery, whether successful or otherwise
+     */
+    public abstract void recoverFromTranslog(
+        TranslogRecoveryRunner translogRecoveryRunner,
+        long recoverUpToSeqNo,
+        ActionListener<Void> listener
+    );
 
     /**
      * Do not replay translog operations, but make the engine be ready.

+ 56 - 44
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -43,6 +43,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
@@ -520,25 +521,24 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
-        try (ReleasableLock lock = readLock.acquire()) {
-            ensureOpen();
-            if (pendingTranslogRecovery.get() == false) {
-                throw new IllegalStateException("Engine has already been recovered");
-            }
-            try {
-                recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
-            } catch (Exception e) {
-                try {
-                    pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
-                    failEngine("failed to recover from translog", e);
-                } catch (Exception inner) {
-                    e.addSuppressed(inner);
+    public void recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo, ActionListener<Void> listener) {
+        ActionListener.run(listener, l -> {
+            try (ReleasableLock lock = readLock.acquire()) {
+                ensureOpen();
+                if (pendingTranslogRecovery.get() == false) {
+                    throw new IllegalStateException("Engine has already been recovered");
                 }
-                throw e;
+                recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo, l.delegateResponse((ll, e) -> {
+                    try {
+                        pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
+                        failEngine("failed to recover from translog", e);
+                    } catch (Exception inner) {
+                        e.addSuppressed(inner);
+                    }
+                    ll.onFailure(e);
+                }));
             }
-        }
-        return this;
+        });
     }
 
     @Override
@@ -547,33 +547,45 @@ public class InternalEngine extends Engine {
         pendingTranslogRecovery.set(false); // we are good - now we can commit
     }
 
-    private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
-        final int opsRecovered;
-        final long localCheckpoint = getProcessedLocalCheckpoint();
-        if (localCheckpoint < recoverUpToSeqNo) {
-            try (Translog.Snapshot snapshot = newTranslogSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
-                opsRecovered = translogRecoveryRunner.run(this, snapshot);
-            } catch (Exception e) {
-                throw new EngineException(shardId, "failed to recover from translog", e);
-            }
-        } else {
-            opsRecovered = 0;
-        }
-        // flush if we recovered something or if we have references to older translogs
-        // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
-        assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
-        pendingTranslogRecovery.set(false); // we are good - now we can commit
-        logger.trace(
-            () -> format(
-                "flushing post recovery from translog: ops recovered [%s], current translog generation [%s]",
-                opsRecovered,
-                translog.currentFileGeneration()
-            )
-        );
-        PlainActionFuture<FlushResult> future = PlainActionFuture.newFuture();
-        flush(false, true, future);
-        future.actionGet();
-        translog.trimUnreferencedReaders();
+    private void recoverFromTranslogInternal(
+        TranslogRecoveryRunner translogRecoveryRunner,
+        long recoverUpToSeqNo,
+        ActionListener<Void> listener
+    ) {
+        ActionListener.run(listener, l -> {
+            final int opsRecovered;
+            final long localCheckpoint = getProcessedLocalCheckpoint();
+            if (localCheckpoint < recoverUpToSeqNo) {
+                try (Translog.Snapshot snapshot = newTranslogSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
+                    opsRecovered = translogRecoveryRunner.run(this, snapshot);
+                } catch (Exception e) {
+                    throw new EngineException(shardId, "failed to recover from translog", e);
+                }
+            } else {
+                opsRecovered = 0;
+            }
+            // flush if we recovered something or if we have references to older translogs
+            // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
+            assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
+            pendingTranslogRecovery.set(false); // we are good - now we can commit
+            logger.trace(
+                () -> format(
+                    "flushing post recovery from translog: ops recovered [%s], current translog generation [%s]",
+                    opsRecovered,
+                    translog.currentFileGeneration()
+                )
+            );
+
+            // flush might do something async and complete the listener on a different thread, from which we must fork back to a generic
+            // thread to continue with recovery, but if it doesn't do anything async then there's no need to fork, hence why we use a
+            // SubscribableListener here
+            final var flushListener = new SubscribableListener<FlushResult>();
+            flush(false, true, flushListener);
+            flushListener.addListener(l.delegateFailureAndWrap((ll, r) -> {
+                translog.trimUnreferencedReaders();
+                ll.onResponse(null);
+            }), engineConfig.getThreadPool().generic(), null);
+        });
     }
 
     protected Translog.Snapshot newTranslogSnapshot(long fromSeqNo, long toSeqNo) throws IOException {

+ 15 - 9
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

@@ -518,16 +518,22 @@ public class ReadOnlyEngine extends Engine {
     }
 
     @Override
-    public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryRunner, final long recoverUpToSeqNo) {
-        try (ReleasableLock lock = readLock.acquire()) {
-            ensureOpen();
-            try {
-                translogRecoveryRunner.run(this, Translog.Snapshot.EMPTY);
-            } catch (final Exception e) {
-                throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
+    public void recoverFromTranslog(
+        final TranslogRecoveryRunner translogRecoveryRunner,
+        final long recoverUpToSeqNo,
+        ActionListener<Void> listener
+    ) {
+        ActionListener.run(listener, l -> {
+            try (ReleasableLock lock = readLock.acquire()) {
+                ensureOpen();
+                try {
+                    translogRecoveryRunner.run(this, Translog.Snapshot.EMPTY);
+                } catch (final Exception e) {
+                    throw new EngineException(shardId, "failed to recover from empty translog snapshot", e);
+                }
             }
-        }
-        return this;
+            l.onResponse(null);
+        });
     }
 
     @Override

+ 12 - 7
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -4182,15 +4182,18 @@ public class IndexShardTests extends IndexShardTestCase {
         CountDownLatch closeDoneLatch = new CountDownLatch(1);
         IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
             @Override
-            public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
-                throws IOException {
+            public void recoverFromTranslog(
+                TranslogRecoveryRunner translogRecoveryRunner,
+                long recoverUpToSeqNo,
+                ActionListener<Void> listener
+            ) {
                 readyToCloseLatch.countDown();
                 try {
                     closeDoneLatch.await();
                 } catch (InterruptedException e) {
                     throw new AssertionError(e);
                 }
-                return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
+                super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo, listener);
             }
         });
 
@@ -4241,16 +4244,18 @@ public class IndexShardTests extends IndexShardTestCase {
         CountDownLatch snapshotDoneLatch = new CountDownLatch(1);
         IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
             @Override
-            public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
-                throws IOException {
-                InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
+            public void recoverFromTranslog(
+                TranslogRecoveryRunner translogRecoveryRunner,
+                long recoverUpToSeqNo,
+                ActionListener<Void> listener
+            ) {
+                super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo, listener);
                 readyToSnapshotLatch.countDown();
                 try {
                     snapshotDoneLatch.await();
                 } catch (InterruptedException e) {
                     throw new AssertionError(e);
                 }
-                return internalEngine;
             }
         });