Răsfoiți Sursa

Fix deadlock in ThreadPoolMergeScheduler when a failing merge closes the IndexWriter (#134656) (#135173)

This change fixes a bug that causes a deadlock in the thread pool merge scheduler when a merge fails due to a tragic event.

The deadlock occurs because Lucene aborts running merges when failing with a tragic event and then waits for them to complete. But those "running" merges might in fact be waiting in the Elasticsearch's thread pool merge scheduler tasks queue, or they might be waiting in the backlogged merge tasks queue because the per-shard concurrent merges count limit has been reached, or they might simply be waiting for enough disk space to be executed. In which cases the merge thread that is failing waits indefinitely.

The proposed fix in this change uses the merge thread that is failing due to a tragic event to abort all other enqueued and backlogged merge tasks of the same shard, before pursuing with the closing of the IndexWriter. This way Lucene won't have to wait for any running merges as they would have all be aborted upfront.

Relates ES-12664
Tanguy Leroux 2 săptămâni în urmă
părinte
comite
c2d65bfcba

+ 6 - 0
docs/changelog/134656.yaml

@@ -0,0 +1,6 @@
+pr: 134656
+summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the
+  `IndexWriter`
+area: Engine
+type: bug
+issues: []

+ 425 - 0
server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java

@@ -0,0 +1,425 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.codecs.DocValuesProducer;
+import org.apache.lucene.index.CodecReader;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FilterCodecReader;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.MergeTrigger;
+import org.apache.lucene.index.NumericDocValues;
+import org.apache.lucene.index.OneMergeWrappingMergePolicy;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.env.ShardLockObtainFailedException;
+import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.MergeSchedulerConfig;
+import org.elasticsearch.index.codec.FilterDocValuesProducer;
+import org.elasticsearch.index.merge.OnGoingMerge;
+import org.elasticsearch.index.shard.IndexEventListener;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.plugins.EnginePlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
+
+public class MergeWithFailureIT extends ESIntegTestCase {
+
+    private static final String FAILING_MERGE_ON_PURPOSE = "Failing merge on purpose";
+
+    public static class TestPlugin extends Plugin implements EnginePlugin {
+
+        // Number of queued merges in the thread pool. Lucene considers those as "running" and blocks waiting on them to complete in case
+        // of a merge failure.
+        private final Set<MergePolicy.OneMerge> pendingMerges = ConcurrentCollections.newConcurrentSet();
+
+        // Number of running merges in the thread pool
+        private final AtomicInteger runningMergesCount = new AtomicInteger();
+
+        // Latch to unblock merges
+        private final CountDownLatch runMerges = new CountDownLatch(1);
+
+        // Reference to the ThreadPoolMergeExecutorService
+        private final AtomicReference<ThreadPoolMergeExecutorService> threadPoolMergeExecutorServiceReference = new AtomicReference<>();
+
+        // This future is completed once the shard that is expected to fail has its store closed
+        private final PlainActionFuture<Void> shardStoreClosedListener = new PlainActionFuture<>();
+
+        private final boolean isDataNode;
+
+        public TestPlugin(Settings settings) {
+            this.isDataNode = DiscoveryNode.hasDataRole(settings);
+        }
+
+        @Override
+        public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
+            if (isDataNode == false) {
+                return Optional.of(InternalEngine::new);
+            }
+            return Optional.of(
+                config -> new TestEngine(
+                    EngineTestCase.copy(
+                        config,
+                        new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> new MergePolicy.OneMerge(toWrap) {
+                            @Override
+                            public CodecReader wrapForMerge(CodecReader reader) {
+                                return new FilterCodecReader(reader) {
+                                    final AtomicBoolean failOnce = new AtomicBoolean(false);
+
+                                    @Override
+                                    public CacheHelper getCoreCacheHelper() {
+                                        return in.getCoreCacheHelper();
+                                    }
+
+                                    @Override
+                                    public CacheHelper getReaderCacheHelper() {
+                                        return in.getReaderCacheHelper();
+                                    }
+
+                                    @Override
+                                    public DocValuesProducer getDocValuesReader() {
+                                        return new FilterDocValuesProducer(super.getDocValuesReader()) {
+                                            @Override
+                                            public NumericDocValues getNumeric(FieldInfo field) throws IOException {
+                                                safeAwait(runMerges, TimeValue.ONE_MINUTE);
+                                                if (failOnce.compareAndSet(false, true)) {
+                                                    throw new IOException(FAILING_MERGE_ON_PURPOSE);
+                                                }
+                                                return super.getNumeric(field);
+                                            }
+                                        };
+                                    }
+                                };
+                            }
+                        })
+                    )
+                )
+            );
+        }
+
+        private class TestEngine extends InternalEngine {
+
+            TestEngine(EngineConfig engineConfig) {
+                super(engineConfig);
+            }
+
+            @Override
+            protected ElasticsearchMergeScheduler createMergeScheduler(
+                ShardId shardId,
+                IndexSettings indexSettings,
+                ThreadPoolMergeExecutorService executor,
+                MergeMetrics metrics
+            ) {
+                threadPoolMergeExecutorServiceReference.set(Objects.requireNonNull(executor));
+                return new ThreadPoolMergeScheduler(shardId, indexSettings, executor, merge -> 0L, metrics) {
+
+                    @Override
+                    public void merge(MergeSource mergeSource, MergeTrigger trigger) {
+                        var wrapped = wrapMergeSource(mergeSource);
+                        super.merge(wrapped, trigger);
+                    }
+
+                    private MergeSource wrapMergeSource(MergeSource delegate) {
+                        // Wraps the merge source to know which merges were pulled from Lucene by the IndexWriter
+                        return new MergeSource() {
+                            @Override
+                            public MergePolicy.OneMerge getNextMerge() {
+                                var merge = delegate.getNextMerge();
+                                if (merge != null) {
+                                    if (pendingMerges.add(merge) == false) {
+                                        throw new AssertionError("Merge already pending " + merge);
+                                    }
+                                }
+                                return merge;
+                            }
+
+                            @Override
+                            public void onMergeFinished(MergePolicy.OneMerge merge) {
+                                delegate.onMergeFinished(merge);
+                            }
+
+                            @Override
+                            public boolean hasPendingMerges() {
+                                return delegate.hasPendingMerges();
+                            }
+
+                            @Override
+                            public void merge(MergePolicy.OneMerge merge) throws IOException {
+                                runningMergesCount.incrementAndGet();
+                                if (pendingMerges.remove(merge) == false) {
+                                    throw new AssertionError("Pending merge not found " + merge);
+                                }
+                                delegate.merge(merge);
+                            }
+                        };
+                    }
+
+                    @Override
+                    protected void handleMergeException(final Throwable exc) {
+                        mergeException(exc);
+                    }
+                };
+            }
+        }
+
+        @Override
+        public void onIndexModule(IndexModule indexModule) {
+            if (isDataNode) {
+                indexModule.addIndexEventListener(new IndexEventListener() {
+                    @Override
+                    public void onStoreClosed(ShardId shardId) {
+                        shardStoreClosedListener.onResponse(null);
+                    }
+                });
+            }
+        }
+
+        public void registerMergeEventListener(MergeEventListener listener) {
+            var threadPoolMergeExecutorService = Objects.requireNonNull(threadPoolMergeExecutorServiceReference.get());
+            threadPoolMergeExecutorService.registerMergeEventListener(listener);
+        }
+    }
+
+    @Override
+    protected boolean addMockInternalEngine() {
+        return false;
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopy(super.nodePlugins(), TestPlugin.class);
+    }
+
+    public void testFailedMergeDeadlock() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        final int maxMergeThreads = randomIntBetween(1, 3);
+        final int indexMaxThreadCount = randomBoolean() ? randomIntBetween(1, 10) : Integer.MAX_VALUE;
+
+        final var dataNode = internalCluster().startDataOnlyNode(
+            Settings.builder()
+                .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
+                .put("thread_pool." + ThreadPool.Names.MERGE + ".max", maxMergeThreads)
+                .build()
+        );
+
+        final var plugin = getTestPlugin(dataNode);
+        assertThat(plugin, notNullValue());
+
+        final var indexName = randomIdentifier();
+        createIndex(
+            indexName,
+            indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".name", dataNode)
+                .put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1)
+                // when indexMaxThreadCount is small so merge tasks might be backlogged
+                .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), indexMaxThreadCount)
+                // no merge throttling
+                .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), Integer.MAX_VALUE)
+                .build()
+        );
+
+        final var mergesListener = new AssertingMergeEventListener();
+        plugin.registerMergeEventListener(mergesListener);
+
+        // Kick off enough merges to block the thread pool
+        var maxRunningThreads = Math.min(maxMergeThreads, indexMaxThreadCount);
+        indexDocsInManySegmentsUntil(indexName, () -> plugin.runningMergesCount.get() == maxRunningThreads);
+        assertThat(plugin.runningMergesCount.get(), equalTo(maxRunningThreads));
+
+        // Now pull more merges so they are queued in the merge thread pool, but Lucene thinks they are running
+        final int pendingMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5);
+        indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > pendingMerges);
+
+        var mergeThreadPool = asInstanceOf(
+            ThreadPoolExecutor.class,
+            internalCluster().clusterService(dataNode).threadPool().executor(ThreadPool.Names.MERGE)
+        );
+        assertThat(mergeThreadPool.getActiveCount(), greaterThanOrEqualTo(maxRunningThreads));
+
+        // More merges in the hope to have backlogged merges
+        if (indexMaxThreadCount != Integer.MAX_VALUE) {
+            final int backloggedMerges = plugin.pendingMerges.size() + randomIntBetween(1, 5);
+            indexDocsInManySegmentsUntil(indexName, () -> plugin.pendingMerges.size() > backloggedMerges);
+        }
+
+        // Sometime closes the shard concurrently with the tragic failure
+        Thread closingThread = null;
+        if (rarely()) {
+            closingThread = new Thread(() -> {
+                safeAwait(plugin.runMerges, TimeValue.ONE_MINUTE);
+                client().admin().indices().prepareClose(indexName).get();
+            });
+            closingThread.start();
+        }
+
+        // unblock merges, one merge will fail the IndexWriter
+        plugin.runMerges.countDown();
+
+        // Deadlock sample:
+        //
+        // "elasticsearch[node_s5][merge][T#1]@16690" tid=0x8e nid=NA waiting
+        // java.lang.Thread.State: WAITING
+        // at java.lang.Object.wait0(Object.java:-1)
+        // at java.lang.Object.wait(Object.java:389)
+        // at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5531)
+        // at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2733)
+        // at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommi(IndexWriter.java:2488)
+        // at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2457)
+        // - locked <0x429a> (a java.lang.Object)
+        // at org.apache.lucene.index.IndexWriter.maybeCloseOnTragicEvent(IndexWriter.java:5765)
+        // at org.apache.lucene.index.IndexWriter.tragicEvent(IndexWriter.java:5755)
+        // at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4780)
+        // at org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6567)
+        // at org.elasticsearch.index.engine.MergeWithFailureIT$TestPlugin$TestEngine$1$1.merge(MergeWithFailureIT.java:178)
+        // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler.doMerge(ThreadPoolMergeScheduler.java:347)
+        // at org.elasticsearch.index.engine.ThreadPoolMergeScheduler$MergeTask.run(ThreadPoolMergeScheduler.java:459)
+        // at org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.runMergeTask(ThreadPoolMergeExecutorService.java:364)
+
+        ensureRed(indexName);
+
+        // verify that the shard store is effectively closed
+        assertTrue(plugin.shardStoreClosedListener.isDone());
+
+        if (closingThread != null) {
+            closingThread.join();
+        }
+
+        final var shardId = new ShardId(resolveIndex(indexName), 0);
+        var nodeEnvironment = internalCluster().getInstance(NodeEnvironment.class, dataNode);
+        try {
+            var shardLock = nodeEnvironment.shardLock(shardId, getTestName(), 10_000L);
+            shardLock.close();
+        } catch (ShardLockObtainFailedException ex) {
+            throw new AssertionError("Shard " + shardId + " is still locked after 10 seconds", ex);
+        }
+
+        // check the state of the shard
+        var routingTable = internalCluster().clusterService(dataNode).state().routingTable(ProjectId.DEFAULT);
+        var indexRoutingTable = routingTable.index(shardId.getIndex());
+        var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(shardId.id())).primaryShard();
+        assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED));
+        assertThat(primary.unassignedInfo(), notNullValue());
+        assertThat(primary.unassignedInfo().reason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
+        var failure = ExceptionsHelper.unwrap(primary.unassignedInfo().failure(), IOException.class);
+        assertThat(failure, notNullValue());
+        assertThat(failure.getMessage(), containsString(FAILING_MERGE_ON_PURPOSE));
+
+        // verify the number of queued, completed and aborted merges
+        mergesListener.verify();
+
+        assertAcked(indicesAdmin().prepareDelete(indexName));
+    }
+
+    private void indexDocsInManySegmentsUntil(String indexName, Supplier<Boolean> stopCondition) {
+        indexDocsInManySegmentsUntil(indexName, stopCondition, TimeValue.THIRTY_SECONDS);
+    }
+
+    private void indexDocsInManySegmentsUntil(String indexName, Supplier<Boolean> stopCondition, TimeValue timeout) {
+        long millisWaited = 0L;
+        do {
+            if (millisWaited >= timeout.millis()) {
+                logger.warn(format("timed out after waiting for [%d]", millisWaited));
+                return;
+            }
+            var client = client();
+            for (int request = 0; request < 10; request++) {
+                var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+                for (int doc = 0; doc < 10; doc++) {
+                    bulkRequest.add(client.prepareIndex(indexName).setCreate(true).setSource("value", randomIntBetween(0, 1024)));
+                }
+                bulkRequest.get();
+            }
+            // Sleep a bit to wait for merges to kick in
+            long sleepInMillis = randomLongBetween(50L, 200L);
+            safeSleep(sleepInMillis);
+            millisWaited += sleepInMillis;
+        } while (stopCondition.get() == false);
+    }
+
+    private static TestPlugin getTestPlugin(String dataNode) {
+        return internalCluster().getInstance(PluginsService.class, dataNode).filterPlugins(TestPlugin.class).findFirst().get();
+    }
+
+    private static void ensureRed(String indexName) throws Exception {
+        assertBusy(() -> {
+            var healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT, indexName)
+                .setWaitForStatus(ClusterHealthStatus.RED)
+                .setWaitForEvents(Priority.LANGUID)
+                .get();
+            assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
+        });
+    }
+
+    private static class AssertingMergeEventListener implements MergeEventListener {
+
+        private final AtomicInteger mergesQueued = new AtomicInteger();
+        private final AtomicInteger mergesCompleted = new AtomicInteger();
+        private final AtomicInteger mergesAborted = new AtomicInteger();
+
+        @Override
+        public void onMergeQueued(OnGoingMerge merge, long estimateMergeMemoryBytes) {
+            mergesQueued.incrementAndGet();
+        }
+
+        @Override
+        public void onMergeCompleted(OnGoingMerge merge) {
+            mergesCompleted.incrementAndGet();
+        }
+
+        @Override
+        public void onMergeAborted(OnGoingMerge merge) {
+            mergesAborted.incrementAndGet();
+        }
+
+        private void verify() {
+            int queued = mergesQueued.get();
+            int completed = mergesCompleted.get();
+            int aborted = mergesAborted.get();
+            var error = format("Queued merges mismatch (queued=%d, completed=%d, aborted=%d)", queued, completed, aborted);
+            assertThat(error, queued, equalTo(completed + aborted));
+        }
+    }
+}

+ 42 - 14
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -2728,11 +2728,7 @@ public class InternalEngine extends Engine {
 
     // protected for testing
     protected IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
-        if (Assertions.ENABLED) {
-            return new AssertingIndexWriter(directory, iwc);
-        } else {
-            return new IndexWriter(directory, iwc);
-        }
+        return new ElasticsearchIndexWriter(directory, iwc, logger);
     }
 
     // with tests.verbose, lucene sets this up: plumb to align with filesystem stream
@@ -2876,8 +2872,10 @@ public class InternalEngine extends Engine {
         return indexWriter.getConfig();
     }
 
-    private void maybeFlushAfterMerge(OnGoingMerge merge) {
-        if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
+    protected void maybeFlushAfterMerge(OnGoingMerge merge) {
+        if (indexWriter.getTragicException() == null
+            && indexWriter.hasPendingMerges() == false
+            && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
             // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the
             // writer
             // we deadlock on engine#close for instance.
@@ -3333,19 +3331,49 @@ public class InternalEngine extends Engine {
         return commitData;
     }
 
-    private static class AssertingIndexWriter extends IndexWriter {
-        AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
-            super(d, conf);
+    private static class ElasticsearchIndexWriter extends IndexWriter {
+
+        private final Logger logger;
+
+        ElasticsearchIndexWriter(Directory directory, IndexWriterConfig indexWriterConfig, Logger logger) throws IOException {
+            super(directory, indexWriterConfig);
+            this.logger = logger;
+        }
+
+        @Override
+        public void onTragicEvent(Throwable tragedy, String location) {
+            assert tragedy != null;
+            try {
+                if (getConfig().getMergeScheduler() instanceof ThreadPoolMergeScheduler mergeScheduler) {
+                    try {
+                        // Must be executed before calling IndexWriter#onTragicEvent
+                        mergeScheduler.onTragicEvent(tragedy);
+                    } catch (Exception e) {
+                        logger.warn("Exception thrown when notifying the merge scheduler of a tragic event", e);
+                        if (tragedy != e) {
+                            tragedy.addSuppressed(e);
+                        }
+                    }
+                }
+            } finally {
+                super.onTragicEvent(tragedy, location);
+            }
         }
 
         @Override
-        public long deleteDocuments(Term... terms) {
-            throw new AssertionError("must not hard delete documents");
+        public long deleteDocuments(Term... terms) throws IOException {
+            if (Assertions.ENABLED) {
+                throw new AssertionError("must not hard delete documents");
+            }
+            return super.deleteDocuments(terms);
         }
 
         @Override
-        public long tryDeleteDocument(IndexReader readerIn, int docID) {
-            throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
+        public long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
+            if (Assertions.ENABLED) {
+                throw new AssertionError("tryDeleteDocument is not supported. See Lucene#DirectoryReaderWithAllLiveDocs");
+            }
+            return super.tryDeleteDocument(readerIn, docID);
         }
     }
 

+ 43 - 2
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java

@@ -31,7 +31,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -48,6 +50,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.LongUnaryOperator;
+import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 
 import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_MAX_HEADROOM_SETTING;
@@ -372,7 +375,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
         }
     }
 
-    private void abortMergeTask(MergeTask mergeTask) {
+    void abortMergeTask(MergeTask mergeTask) {
         assert mergeTask.hasStartedRunning() == false;
         assert runningMergeTasks.contains(mergeTask) == false;
         try {
@@ -385,6 +388,25 @@ public class ThreadPoolMergeExecutorService implements Closeable {
         }
     }
 
+    private void abortMergeTasks(Collection<MergeTask> mergeTasks) {
+        if (mergeTasks != null && mergeTasks.isEmpty() == false) {
+            for (var mergeTask : mergeTasks) {
+                abortMergeTask(mergeTask);
+            }
+        }
+    }
+
+    /**
+     * Removes all {@link MergeTask} that match the predicate and aborts them.
+     * @param predicate             the predicate to filter merge tasks to be aborted
+     */
+    void abortQueuedMergeTasks(Predicate<MergeTask> predicate) {
+        final var queuedMergesToAbort = new HashSet<MergeTask>();
+        if (queuedMergeTasks.drainMatchingElementsTo(predicate, queuedMergesToAbort) > 0) {
+            abortMergeTasks(queuedMergesToAbort);
+        }
+    }
+
     /**
      * Start monitoring the available disk space, and update the available budget for running merge tasks
      * Note: this doesn't work correctly for nodes with multiple data paths, as it only considers the data path with the MOST
@@ -675,6 +697,25 @@ public class ThreadPoolMergeExecutorService implements Closeable {
             }
         }
 
+        int drainMatchingElementsTo(Predicate<E> predicate, Collection<? super E> c) {
+            int removed = 0;
+            final ReentrantLock lock = this.lock;
+            lock.lock();
+            try {
+                for (Iterator<Tuple<E, Long>> iterator = enqueuedByBudget.iterator(); iterator.hasNext();) {
+                    E item = iterator.next().v1();
+                    if (predicate.test(item)) {
+                        iterator.remove();
+                        c.add(item);
+                        removed++;
+                    }
+                }
+                return removed;
+            } finally {
+                lock.unlock();
+            }
+        }
+
         /**
          * Updates the available budged given the passed-in argument, from which it deducts the budget hold up by taken elements
          * that are still in use. The elements budget is also updated by re-applying the budget function.
@@ -704,7 +745,7 @@ public class ThreadPoolMergeExecutorService implements Closeable {
 
         void postBudgetUpdate() {
             assert lock.isHeldByCurrentThread();
-        };
+        }
 
         private void updateBudgetOfEnqueuedElementsAndReorderQueue() {
             assert this.lock.isHeldByCurrentThread();

+ 157 - 18
server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

@@ -23,6 +23,7 @@ import org.apache.lucene.store.RateLimitedIndexOutput;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.MergeSchedulerConfig;
@@ -76,9 +77,18 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
     // how many {@link MergeTask}s have kicked off (this is used to name them).
     private final AtomicLong submittedMergeTaskCount = new AtomicLong();
     private final AtomicLong doneMergeTaskCount = new AtomicLong();
+    private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
+
+    // Merge pulled from Lucene that is not yet submitted to the merge thread pool tasks queue
+    record PendingMerge(MergeSource source, MergePolicy.OneMerge merge, MergeTrigger trigger) {}
+
     private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1);
     private volatile boolean closed = false;
-    private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
+
+    // Tragic event that causes the IndexWriter and ThreadPoolMergeScheduler to be closed
+    private record TragicEvent(Throwable throwable, CountDownLatch latch) {}
+
+    private volatile TragicEvent tragedy = null;
 
     /**
      * Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
@@ -135,21 +145,26 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
 
     @Override
     public void merge(MergeSource mergeSource, MergeTrigger trigger) {
-        if (closed) {
+        if (closed || tragedy != null) {
             // avoid pulling from the merge source when closing
             return;
         }
-        MergePolicy.OneMerge merge = null;
+        PendingMerge pendingMerge = null;
         try {
-            merge = mergeSource.getNextMerge();
+            // From this point on Lucene considers the OneMerge as "running",
+            // but it's not yet in the thread pool executor tasks queue!
+            var merge = mergeSource.getNextMerge();
+            if (merge != null) {
+                pendingMerge = new PendingMerge(mergeSource, merge, trigger);
+            }
         } catch (IllegalStateException e) {
             if (verbose()) {
                 message("merge task poll failed, likely that index writer is failed");
             }
             // ignore exception, we expect the IW failure to be logged elsewhere
         }
-        if (merge != null) {
-            submitNewMergeTask(mergeSource, merge, trigger);
+        if (pendingMerge != null) {
+            submitNewMergeTask(pendingMerge);
         }
     }
 
@@ -226,14 +241,27 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         throw new MergePolicy.MergeException(t);
     }
 
-    // package-private for tests
-    boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
+    private void submitNewMergeTask(PendingMerge pendingMerge) {
+        boolean queued = false;
         try {
-            MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
-            mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes());
-            mergeQueued(mergeTask.onGoingMerge);
-            return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
+            // note that estimating the size of the merge might open a searcher
+            final var mergeTask = newMergeTask(pendingMerge.source(), pendingMerge.merge(), pendingMerge.trigger());
+            if (tragedy == null) {
+                mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes());
+                mergeQueued(mergeTask.onGoingMerge);
+
+                queued = threadPoolMergeExecutorService.submitMergeTask(mergeTask); // may abort the merge immediately
+                // TODO Enable the following assertions once unit tests are fixed to not use Mockito
+                // assert queued || pendingMerge.merge().isAborted();
+            } else {
+                // merge scheduler is failing due to a tragic event
+                mergeTask.abort();
+            }
         } finally {
+            if (queued && tragedy != null) {
+                // ensure that if `onTragicEvent` races with this, we still abort what we just submitted.
+                abortQueuedMergesAfterTragedy(null);
+            }
             checkMergeTaskThrottling();
         }
     }
@@ -244,12 +272,15 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1;
         // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting
         long estimateMergeMemoryBytes = mergeMemoryEstimateProvider.estimateMergeMemoryBytes(merge);
+        // used for reference equality in case the task must be aborted after a tragic event
+        var owner = this;
         return new MergeTask(
             mergeSource,
             merge,
             isAutoThrottle && isAutoThrottle(),
             "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
-            estimateMergeMemoryBytes
+            estimateMergeMemoryBytes,
+            owner
         );
     }
 
@@ -284,7 +315,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
     // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically
     synchronized Schedule schedule(MergeTask mergeTask) {
         assert mergeTask.hasStartedRunning() == false;
-        if (closed) {
+        if (closed || tragedy != null) {
             // do not run or backlog tasks when closing the merge scheduler, instead abort them
             return Schedule.ABORT;
         } else if (shouldSkipMerge()) {
@@ -297,7 +328,6 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
             assert added : "starting merge task [" + mergeTask + "] registered as already running";
             return Schedule.RUN;
         } else {
-            assert mergeTask.hasStartedRunning() == false;
             backloggedMergeTasks.add(mergeTask);
             return Schedule.BACKLOG;
         }
@@ -320,10 +350,112 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         checkMergeTaskThrottling();
     }
 
-    private synchronized void maybeSignalAllMergesDoneAfterClose() {
-        if (closed && runningMergeTasks.isEmpty()) {
+    private void maybeSignalAllMergesDoneAfterClose() {
+        assert Thread.holdsLock(this);
+        if ((closed || tragedy != null) && runningMergeTasks.isEmpty()) {
+            closedWithNoRunningMerges.countDown();
+        }
+    }
+
+    public void onTragicEvent(Throwable tragedy) {
+        assert tragedy != null;
+        assert tragedy instanceof MergePolicy.MergeAbortedException == false;
+
+        TragicEvent tragicEvent;
+        boolean shouldAbort = false;
+        // Sets the tragic event if not already set
+        synchronized (this) {
+            tragicEvent = this.tragedy;
+            if (tragicEvent == null) {
+                tragicEvent = new TragicEvent(tragedy, new CountDownLatch(1));
+                this.tragedy = tragicEvent;
+                shouldAbort = true;
+            }
+        }
+        if (shouldAbort) {
+            abortQueuedMergesAfterTragedy(tragedy);
             closedWithNoRunningMerges.countDown();
+            tragicEvent.latch().countDown();
+            return;
         }
+        try {
+            // the merge scheduler is being failed by another thread, wait for non-executed merges to be aborted
+            tragicEvent.latch().await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            tragedy.addSuppressed(e);
+        }
+    }
+
+    private void abortQueuedMergesAfterTragedy(@Nullable Throwable throwable) {
+        assert this.tragedy != null;
+        try {
+            // Merges that have been pulled from Lucene using MergePolicy#getNextMerge before the tragic exception was set require special
+            // handling, because Lucene considers them as "running" and will wait for those to complete in IndexWriter#abortMerges when
+            // failing the IndexWriter with IndexWriter#maybeCloseOnTragicEvent. If at some point those merges are executed by a different
+            // thread (than the current thread we're in here) then it is OK, the merges will be aborted or failed almost immediately and
+            // there will be no running merges to wait for.
+            //
+            // But the thread pool executor offer no guarantee the those merges will be executed by another thread because:
+            // - the thread pool may have only 1 core thread,
+            // - or all other threads may be busy failing merges for different shards too, and can also be blocked waiting for their own
+            // queued merges to complete,
+            // - or there is not enough budget to execute the merge(s).
+            //
+            // In order to avoid waiting indefinitely in IndexWriter#abortMerges for merges that won't be executed, the current thread is
+            // used to abort all remaining non-executed merges:
+            // - the merge tasks in backloggedMergeTasks that are waiting to be re-enqueued,
+            // - the merge tasks in the thread pool executor task queue that are waiting to be executed.
+            //
+            // Note that only merges pulled from the current merge scheduler instance are aborted. These abortions are all executed in a
+            // synchronized block to ensure that no other concurrent merge thread can also fail due to a tragic event and set the
+            // IndexWriter#tragedy before we abort merges here. This is important because if the IndexWriter#tragedy is set, any upcoming
+            // merge execution/abortion would re-enter this method in order to fail the IndexWriter again (and ultimately also deadlock in
+            // IndexWriter#maybeCloseOnTragicEvent).
+            synchronized (this) {
+                // Abort backlogged merges
+                abortBackloggedMergeTasks();
+                // Abort all queued tasks that have been created by this merge scheduler
+                threadPoolMergeExecutorService.abortQueuedMergeTasks(mergeTask -> mergeTask.owner == this);
+            }
+        } catch (Exception e) {
+            logger.warn("exception when aborting non-running merge tasks", e);
+            if (throwable != null) {
+                throwable.addSuppressed(e);
+            }
+        }
+    }
+
+    private int abortBackloggedMergeTasks() throws Exception {
+        assert tragedy != null;
+        assert Thread.holdsLock(this);
+
+        int count = 0;
+        int maxExceptions = 10;
+        Exception firstException = null;
+        MergeTask backlogged;
+        while ((backlogged = backloggedMergeTasks.poll()) != null) {
+            try {
+                abortMergeTask(backlogged);
+                count += 1;
+            } catch (Exception e) {
+                assert false : e;
+                if (firstException != null && maxExceptions-- >= 0) {
+                    firstException.addSuppressed(e);
+                } else {
+                    firstException = e;
+                }
+            }
+        }
+        if (firstException != null) {
+            throw firstException;
+        }
+        return count;
+    }
+
+    private void abortMergeTask(MergeTask mergeTask) {
+        // abort immediately using the thread pool executor to handle throttling
+        threadPoolMergeExecutorService.abortMergeTask(mergeTask);
     }
 
     private synchronized void enqueueBackloggedTasks() {
@@ -348,6 +480,10 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         } catch (Throwable t) {
             // OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does.
             if (t instanceof MergePolicy.MergeAbortedException == false) {
+                // A merge thread that thrown a tragic exception that closed the IndexWriter causes other merge threads to be aborted, but
+                // it is not itself aborted: instead the current merge is just completed and the thrown exception is set in the package
+                // private OneMerge#error field. Here we set such merge as aborted too so that it is not considered as successful later.
+                oneMerge.setAborted();
                 handleMergeException(t);
             }
         }
@@ -390,13 +526,15 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
         private final MergeRateLimiter rateLimiter;
         private final boolean supportsIOThrottling;
         private final long mergeMemoryEstimateBytes;
+        private final Object owner;
 
         MergeTask(
             MergeSource mergeSource,
             MergePolicy.OneMerge merge,
             boolean supportsIOThrottling,
             String name,
-            long mergeMemoryEstimateBytes
+            long mergeMemoryEstimateBytes,
+            Object owner
         ) {
             this.name = name;
             this.mergeStartTimeNS = new AtomicLong();
@@ -405,6 +543,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
             this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress());
             this.supportsIOThrottling = supportsIOThrottling;
             this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes;
+            this.owner = owner;
         }
 
         Schedule schedule() {

+ 5 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1351,7 +1351,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         if (engine == null) {
             return 0;
         }
-        return engine.getWritingBytes();
+        try {
+            return engine.getWritingBytes();
+        } catch (AlreadyClosedException ex) {
+            return 0L;
+        }
     }
 
     public RefreshStats refreshStats() {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -355,7 +355,7 @@ public abstract class EngineTestCase extends ESTestCase {
         );
     }
 
-    public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
+    public static EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
         return new EngineConfig(
             config.getShardId(),
             config.getThreadPool(),