瀏覽代碼

Introduce async closing of `IndexShard` (#108096)

Closing an `IndexShard` may be expensive if the `IndexWriter` takes a
long time to roll back and shut down. This commit adds the facility for
forking the expensive work onto a different thread, although in
production it still runs this work directly.

Relates ES-8334
David Turner 1 年之前
父節點
當前提交
c1f409cc90
共有 19 個文件被更改,包括 505 次插入109 次删除
  1. 8 1
      server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java
  2. 8 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java
  3. 8 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java
  4. 38 0
      server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java
  5. 78 0
      server/src/main/java/org/elasticsearch/index/CloseUtils.java
  6. 83 27
      server/src/main/java/org/elasticsearch/index/IndexService.java
  7. 2 0
      server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  8. 25 10
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  9. 62 31
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  10. 98 23
      server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
  11. 2 0
      server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
  12. 3 1
      server/src/test/java/org/elasticsearch/index/IndexServiceTests.java
  13. 32 0
      server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  14. 4 2
      server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java
  15. 3 1
      server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java
  16. 20 8
      server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java
  17. 10 2
      server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java
  18. 4 0
      test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
  19. 17 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

+ 8 - 1
server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.broadcast.BroadcastResponse;
 import org.elasticsearch.client.internal.Client;
@@ -45,6 +46,7 @@ import org.elasticsearch.common.Numbers;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
@@ -1745,7 +1747,12 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
         assertNotNull("should be at least one node with a primary shard", nodeWithPrimary);
         IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeWithPrimary);
         IndexService indexService = indicesService.indexService(resolveIndex(index));
-        indexService.removeShard(0, "simulate node removal");
+        indexService.removeShard(
+            0,
+            "simulate node removal",
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            ActionTestUtils.assertNoFailureListener(v -> {})
+        );
 
         logger.info("--> unblocking blocked node [{}]", blockedNode);
         unblockNode(repo, blockedNode);

+ 8 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.CloseUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexService;
@@ -197,7 +198,13 @@ public class MetadataIndexAliasesService {
             return currentState;
         } finally {
             for (Index index : indicesToClose) {
-                indicesService.removeIndex(index, NO_LONGER_ASSIGNED, "created for alias processing");
+                indicesService.removeIndex(
+                    index,
+                    NO_LONGER_ASSIGNED,
+                    "created for alias processing",
+                    CloseUtils.NO_SHARDS_CREATED_EXECUTOR,
+                    ActionListener.noop()
+                );
             }
         }
     }

+ 8 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.CloseUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettingProvider;
@@ -1727,7 +1728,13 @@ public class MetadataIndexTemplateService {
 
         } finally {
             if (createdIndex != null) {
-                indicesService.removeIndex(createdIndex, NO_LONGER_ASSIGNED, " created for parsing template mapping");
+                indicesService.removeIndex(
+                    createdIndex,
+                    NO_LONGER_ASSIGNED,
+                    " created for parsing template mapping",
+                    CloseUtils.NO_SHARDS_CREATED_EXECUTOR,
+                    ActionListener.noop()
+                );
             }
         }
     }

+ 38 - 0
server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
@@ -427,6 +428,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
                 logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
             }
             try {
+                setIsApplyingClusterState();
                 applyChanges(previousClusterState, newClusterState, source, stopWatch);
                 TimeValue executionTime = getTimeSince(startTimeMillis);
                 logger.debug(
@@ -462,6 +464,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
                 // continue we will retry with the same cluster state but that might not help.
                 assert applicationMayFail();
                 clusterApplyListener.onFailure(e);
+            } finally {
+                clearIsApplyingClusterState();
             }
         }
     }
@@ -663,4 +667,38 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
     public int getTimeoutClusterStateListenersSize() {
         return timeoutClusterStateListeners.size();
     }
+
+    /**
+     * Used in tests to ensure we don't do overly expensive operations such as closing a shard on the applier thread
+     */
+    @Nullable // if assertions are disabled
+    private static final ThreadLocal<Boolean> isApplyingClusterState;
+
+    static {
+        isApplyingClusterState = Assertions.ENABLED ? new ThreadLocal<>() : null;
+    }
+
+    public static boolean assertIsApplyingClusterState() {
+        assert isApplyingClusterState == null || isApplyingClusterState.get() != null
+            : "operation not permitted unless applying cluster state on thread " + Thread.currentThread().getName();
+        return true;
+    }
+
+    public static boolean assertNotApplyingClusterState() {
+        assert isApplyingClusterState == null || isApplyingClusterState.get() == null
+            : "operation not permitted while applying cluster state on thread " + Thread.currentThread().getName();
+        return true;
+    }
+
+    public static void setIsApplyingClusterState() {
+        if (isApplyingClusterState != null) {
+            isApplyingClusterState.set(Boolean.TRUE);
+        }
+    }
+
+    public static void clearIsApplyingClusterState() {
+        if (isApplyingClusterState != null) {
+            isApplyingClusterState.remove();
+        }
+    }
 }

+ 78 - 0
server/src/main/java/org/elasticsearch/index/CloseUtils.java

@@ -0,0 +1,78 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.index;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.core.CheckedConsumer;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Utilities to help with closing shards and indices
+ */
+public class CloseUtils {
+
+    private CloseUtils() {/* no instances */}
+
+    /**
+     * Sentinel result value to record success
+     */
+    private static final Exception SUCCEEDED = new Exception() {
+        @Override
+        public synchronized Throwable fillInStackTrace() {
+            return this;
+        }
+    };
+
+    /**
+     * Execute a naturally-async action (e.g. to close a shard) but using the current thread so that it completes synchronously, re-throwing
+     * any exception that might be passed to its listener.
+     */
+    public static void executeDirectly(CheckedConsumer<ActionListener<Void>, IOException> action) throws IOException {
+        // it's possible to do this with a PlainActionFuture too but extracting the exact Exception is a bit of a pain because of
+        // exception-mangling and/or interrupt handling - see #108125
+        final var closeExceptionRef = new AtomicReference<Exception>();
+        ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
+            @Override
+            public void onResponse(Void unused) {
+                closeExceptionRef.set(SUCCEEDED);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                closeExceptionRef.set(e);
+            }
+        }), action);
+        final var closeException = closeExceptionRef.get();
+        if (closeException == SUCCEEDED) {
+            return;
+        }
+        if (closeException instanceof RuntimeException runtimeException) {
+            throw runtimeException;
+        }
+        if (closeException instanceof IOException ioException) {
+            throw ioException;
+        }
+        assert false : closeException;
+        if (closeException != null) {
+            throw new RuntimeException("unexpected exception on shard close", closeException);
+        } // else listener not completed, definitely a bug, but throwing something won't help anyone here
+    }
+
+    /**
+     * Utility shard-close executor for the cases where we close an {@link IndexService} without having created any shards, so we can assert
+     * that it's never used.
+     */
+    public static final Executor NO_SHARDS_CREATED_EXECUTOR = r -> {
+        assert false : r;
+        r.run(); // just in case we're wrong, in production we need to actually run the task
+    };
+}

+ 83 - 27
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -16,6 +16,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.Accountable;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -355,19 +356,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         return indexSortSupplier;
     }
 
-    public synchronized void close(final String reason, boolean delete) throws IOException {
+    public synchronized void close(final String reason, boolean delete, Executor shardCloseExecutor, ActionListener<Void> closeListener) {
         if (closed.compareAndSet(false, true)) {
             deleted.compareAndSet(false, delete);
-            try {
-                final Set<Integer> shardIds = shardIds();
-                for (final int shardId : shardIds) {
-                    try {
-                        removeShard(shardId, reason);
-                    } catch (Exception e) {
-                        logger.warn("failed to close shard", e);
-                    }
-                }
-            } finally {
+            try (var refs = new RefCountingRunnable(() -> ActionListener.run(closeListener, l -> {
                 IOUtils.close(
                     bitsetFilterCache,
                     indexCache,
@@ -379,7 +371,18 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                     globalCheckpointTask,
                     retentionLeaseSyncTask
                 );
+                l.onResponse(null);
+            }))) {
+                final Set<Integer> shardIds = shardIds();
+                for (final int shardId : shardIds) {
+                    ActionListener.run(refs.acquireListener().delegateResponse((l, e) -> {
+                        logger.warn("failed to close shard", e);
+                        l.onResponse(null);
+                    }), l -> removeShard(shardId, reason, shardCloseExecutor, l));
+                }
             }
+        } else {
+            closeListener.onResponse(null);
         }
     }
 
@@ -553,24 +556,57 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 if (lock != null) {
                     IOUtils.closeWhileHandlingException(lock);
                 }
-                closeShard("initialization failed", shardId, indexShard, store, eventListener);
+                final var finalStore = store;
+                final var finalIndexShard = indexShard;
+                CloseUtils.executeDirectly(
+                    l -> closeShard(
+                        "initialization failed",
+                        shardId,
+                        finalIndexShard,
+                        finalStore,
+                        eventListener,
+                        EsExecutors.DIRECT_EXECUTOR_SERVICE /* closing a shard that failed to start up should be fast enough */,
+                        l
+                    )
+                );
             }
         }
     }
 
     @Override
-    public synchronized void removeShard(int shardId, String reason) {
+    public synchronized void removeShard(int shardId, String reason, Executor closeExecutor, ActionListener<Void> closeListener) {
         final IndexShard indexShard = shards.get(shardId);
         if (indexShard == null) {
+            closeListener.onResponse(null);
             return;
         }
         logger.debug("[{}] closing... (reason: [{}])", shardId, reason);
+        final var wrappedListener = logger.isDebugEnabled()
+            ? ActionListener.runBefore(closeListener, () -> logger.debug("[{}] closed (reason: [{}])", shardId, reason))
+            : closeListener;
+
         shards = Maps.copyMapWithRemovedEntry(shards, shardId);
-        closeShard(reason, indexShard.shardId(), indexShard, indexShard.store(), indexShard.getIndexEventListener());
-        logger.debug("[{}] closed (reason: [{}])", shardId, reason);
+        closeShard(
+            reason,
+            indexShard.shardId(),
+            indexShard,
+            indexShard.store(),
+            indexShard.getIndexEventListener(),
+            closeExecutor,
+            wrappedListener
+        );
+        logger.debug("[{}] removed (reason: [{}])", shardId, reason);
     }
 
-    private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) {
+    private void closeShard(
+        String reason,
+        ShardId sId,
+        IndexShard indexShard,
+        Store store,
+        IndexEventListener listener,
+        Executor closeExecutor,
+        ActionListener<Void> closeListener
+    ) {
         final int shardId = sId.id();
         final Settings indexSettings = this.getIndexSettings().getSettings();
         if (store != null) {
@@ -582,18 +618,38 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
             } finally {
                 // this logic is tricky, we want to close the engine so we rollback the changes done to it
                 // and close the shard so no operations are allowed to it
-                if (indexShard != null) {
-                    try {
-                        // only flush if we are closed (closed index or shutdown) and if we are not deleted
-                        final boolean flushEngine = deleted.get() == false && closed.get();
-                        indexShard.close(reason, flushEngine);
-                    } catch (Exception e) {
-                        logger.debug(() -> "[" + shardId + "] failed to close index shard", e);
-                        // ignore
-                    }
+                if (indexShard == null) {
+                    closeListener.onResponse(null);
+                } else {
+                    // only flush if we are closed (closed index or shutdown) and if we are not deleted
+                    final boolean flushEngine = deleted.get() == false && closed.get();
+                    // if the store is still open, want to keep it open until afterIndexShardClosed
+                    assert store == null || store.hasReferences() : "store exists but already closed";
+                    final var hasStoreRef = store != null && store.tryIncRef(); // being cautious
+                    ActionListener.run(new ActionListener<Void>() {
+                        @Override
+                        public void onResponse(Void unused) {
+                            try {
+                                // call this before we close the store, so we can release resources for it
+                                listener.afterIndexShardClosed(sId, indexShard, indexSettings);
+                            } finally {
+                                try {
+                                    if (hasStoreRef) {
+                                        store.decRef();
+                                    }
+                                } finally {
+                                    closeListener.onResponse(null);
+                                }
+                            }
+                        }
+
+                        @Override
+                        public void onFailure(Exception e) {
+                            logger.debug(() -> "[" + shardId + "] failed to close index shard", e);
+                            onResponse(null); // otherwise ignore the exception
+                        }
+                    }, l -> indexShard.close(reason, flushEngine, closeExecutor, l));
                 }
-                // call this before we close the store, so we can release resources for it
-                listener.afterIndexShardClosed(sId, indexShard, indexSettings);
             }
         } finally {
             try {

+ 2 - 0
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -44,6 +44,7 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@@ -2629,6 +2630,7 @@ public class InternalEngine extends Engine {
                 // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
                 logger.trace("rollback indexWriter");
                 try {
+                    assert ClusterApplierService.assertNotApplyingClusterState();
                     indexWriter.rollback();
                 } catch (AlreadyClosedException ex) {
                     failOnTragicEvent(ex);

+ 25 - 10
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1718,7 +1718,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
 
     }
 
-    public void close(String reason, boolean flushEngine) throws IOException {
+    public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
         synchronized (engineMutex) {
             try {
                 synchronized (mutex) {
@@ -1727,16 +1727,31 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                 checkAndCallWaitForEngineOrClosedShardListeners();
             } finally {
                 final Engine engine = this.currentEngineReference.getAndSet(null);
-                try {
-                    if (engine != null && flushEngine) {
-                        engine.flushAndClose();
+                closeExecutor.execute(ActionRunnable.run(closeListener, new CheckedRunnable<>() {
+                    @Override
+                    public void run() throws Exception {
+                        try {
+                            if (engine != null && flushEngine) {
+                                engine.flushAndClose();
+                            }
+                        } finally {
+                            // playing safe here and close the engine even if the above succeeds - close can be called multiple times
+                            // Also closing refreshListeners to prevent us from accumulating any more listeners
+                            IOUtils.close(
+                                engine,
+                                globalCheckpointListeners,
+                                refreshListeners,
+                                pendingReplicationActions,
+                                indexShardOperationPermits
+                            );
+                        }
                     }
-                } finally {
-                    // playing safe here and close the engine even if the above succeeds - close can be called multiple times
-                    // Also closing refreshListeners to prevent us from accumulating any more listeners
-                    IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
-                    indexShardOperationPermits.close();
-                }
+
+                    @Override
+                    public String toString() {
+                        return "IndexShard#close[" + shardId + "]";
+                    }
+                }));
             }
         }
     }

+ 62 - 31
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -81,6 +81,7 @@ import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.gateway.MetaStateService;
 import org.elasticsearch.gateway.MetadataStateFormat;
+import org.elasticsearch.index.CloseUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.IndexModule;
@@ -163,6 +164,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -396,13 +398,18 @@ public class IndicesService extends AbstractLifecycleComponent
         final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());
         final CountDownLatch latch = new CountDownLatch(indices.size());
         for (final Index index : indices) {
-            indicesStopExecutor.execute(() -> {
-                try {
-                    removeIndex(index, IndexRemovalReason.SHUTDOWN, "shutdown");
-                } finally {
-                    latch.countDown();
-                }
-            });
+            indicesStopExecutor.execute(
+                () -> ActionListener.run(
+                    ActionListener.assertOnce(ActionListener.<Void>releasing(latch::countDown)),
+                    l -> removeIndex(
+                        index,
+                        IndexRemovalReason.SHUTDOWN,
+                        "shutdown",
+                        EsExecutors.DIRECT_EXECUTOR_SERVICE /* node shutdown can be blocking */,
+                        l
+                    )
+                )
+            );
         }
         try {
             if (latch.await(shardsClosedTimeout.seconds(), TimeUnit.SECONDS) == false) {
@@ -668,7 +675,7 @@ public class IndicesService extends AbstractLifecycleComponent
             return indexService;
         } finally {
             if (success == false) {
-                indexService.close("plugins_failed", true);
+                CloseUtils.executeDirectly(l -> indexService.close("plugins_failed", true, CloseUtils.NO_SHARDS_CREATED_EXECUTOR, l));
             }
         }
     }
@@ -705,7 +712,11 @@ public class IndicesService extends AbstractLifecycleComponent
             finalListeners,
             indexingMemoryController
         );
-        try (Closeable dummy = () -> indexService.close("temp", false)) {
+        try (
+            Closeable ignored = () -> CloseUtils.executeDirectly(
+                l -> indexService.close("temp", false, CloseUtils.NO_SHARDS_CREATED_EXECUTOR, l)
+            )
+        ) {
             return indexServiceConsumer.apply(indexService);
         }
     }
@@ -846,7 +857,11 @@ public class IndicesService extends AbstractLifecycleComponent
                 indicesFieldDataCache,
                 emptyList()
             );
-            closeables.add(() -> service.close("metadata verification", false));
+            closeables.add(
+                () -> CloseUtils.executeDirectly(
+                    l -> service.close("metadata verification", false, CloseUtils.NO_SHARDS_CREATED_EXECUTOR, l)
+                )
+            );
             service.mapperService().merge(metadata, MapperService.MergeReason.MAPPING_RECOVERY);
             if (metadata.equals(metadataUpdate) == false) {
                 service.updateMetadata(metadata, metadataUpdate);
@@ -896,36 +911,52 @@ public class IndicesService extends AbstractLifecycleComponent
     }
 
     @Override
-    public void removeIndex(final Index index, final IndexRemovalReason reason, final String extraInfo) {
+    public void removeIndex(
+        final Index index,
+        final IndexRemovalReason reason,
+        final String extraInfo,
+        Executor shardCloseExecutor,
+        ActionListener<Void> shardsClosedListener
+    ) {
         final String indexName = index.getName();
-        try {
+        ActionListener.run(ActionListener.assertOnce(shardsClosedListener.delegateResponse((l, e) -> {
+            logger.warn(() -> format("failed to remove index %s ([%s][%s])", index, reason, extraInfo), e);
+            l.onResponse(null);
+        })), l -> {
             final IndexService indexService;
             final IndexEventListener listener;
             synchronized (this) {
-                if (hasIndex(index) == false) {
-                    return;
+                if (hasIndex(index)) {
+                    logger.debug("[{}] closing ... (reason [{}])", indexName, reason);
+                    indexService = indices.get(index.getUUID());
+                    assert indexService != null : "IndexService is null for index: " + index;
+                    indices = Maps.copyMapWithRemovedEntry(indices, index.getUUID());
+                    listener = indexService.getIndexEventListener();
+                } else {
+                    indexService = null;
+                    listener = null;
                 }
+            }
 
-                logger.debug("[{}] closing ... (reason [{}])", indexName, reason);
-                indexService = indices.get(index.getUUID());
-                assert indexService != null : "IndexService is null for index: " + index;
-                indices = Maps.copyMapWithRemovedEntry(indices, index.getUUID());
-                listener = indexService.getIndexEventListener();
+            assert (indexService == null) == (listener == null) : indexService + " vs " + listener;
+
+            if (indexService == null) {
+                l.onResponse(null);
+                return;
             }
 
             listener.beforeIndexRemoved(indexService, reason);
             logger.debug("{} closing index service (reason [{}][{}])", index, reason, extraInfo);
-            indexService.close(extraInfo, reason == IndexRemovalReason.DELETED);
-            logger.debug("{} closed... (reason [{}][{}])", index, reason, extraInfo);
-            final IndexSettings indexSettings = indexService.getIndexSettings();
-            listener.afterIndexRemoved(indexService.index(), indexSettings, reason);
-            if (reason == IndexRemovalReason.DELETED) {
-                // now we are done - try to wipe data on disk if possible
-                deleteIndexStore(extraInfo, indexService.index(), indexSettings);
-            }
-        } catch (Exception e) {
-            logger.warn(() -> format("failed to remove index %s ([%s][%s])", index, reason, extraInfo), e);
-        }
+            indexService.close(extraInfo, reason == IndexRemovalReason.DELETED, shardCloseExecutor, ActionListener.runBefore(l, () -> {
+                logger.debug("{} closed... (reason [{}][{}])", index, reason, extraInfo);
+                final IndexSettings indexSettings = indexService.getIndexSettings();
+                listener.afterIndexRemoved(indexService.index(), indexSettings, reason);
+                if (reason == IndexRemovalReason.DELETED) {
+                    // now we are done - try to wipe data on disk if possible
+                    deleteIndexStore(extraInfo, indexService.index(), indexSettings);
+                }
+            }));
+        });
     }
 
     public IndicesFieldDataCache getIndicesFieldDataCache() {
@@ -974,7 +1005,7 @@ public class IndicesService extends AbstractLifecycleComponent
 
     /**
      * Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index
-     * but does not deal with in-memory structures. For those call {@link #removeIndex(Index, IndexRemovalReason, String)}
+     * but does not deal with in-memory structures. For those call {@link #removeIndex}
      */
     @Override
     public void deleteUnassignedIndex(String reason, IndexMetadata oldIndexMetadata, ClusterState clusterState) {

+ 98 - 23
server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -17,6 +17,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -81,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -128,6 +130,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
     private final TimeValue shardLockRetryInterval;
     private final TimeValue shardLockRetryTimeout;
 
+    private final Executor shardCloseExecutor = r -> {
+        // ES-8334 TODO move this work onto a background thread
+        try {
+            assert ClusterApplierService.assertIsApplyingClusterState();
+            ClusterApplierService.clearIsApplyingClusterState();
+            r.run();
+        } finally {
+            ClusterApplierService.setIsApplyingClusterState();
+        }
+    };
+
     @Inject
     public IndicesClusterStateService(
         final Settings settings,
@@ -234,7 +247,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                 indicesService.removeIndex(
                     indexService.getIndexSettings().getIndex(),
                     NO_LONGER_ASSIGNED,
-                    "cleaning index (disabled block persistence)"
+                    "cleaning index (disabled block persistence)",
+                    shardCloseExecutor,
+                    ActionListener.noop()
                 );
             }
             return;
@@ -319,7 +334,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
             final IndexSettings indexSettings;
             if (indexService != null) {
                 indexSettings = indexService.getIndexSettings();
-                indicesService.removeIndex(index, DELETED, "index no longer part of the metadata");
+                indicesService.removeIndex(
+                    index,
+                    DELETED,
+                    "index no longer part of the metadata",
+                    shardCloseExecutor,
+                    ActionListener.noop()
+                );
             } else if (previousState.metadata().hasIndex(index)) {
                 // The deleted index was part of the previous cluster state, but not loaded on the local node
                 final IndexMetadata metadata = previousState.metadata().index(index);
@@ -364,6 +385,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                             logger.warn("[{}] failed to lock all shards for index - interrupted", index);
                         }
                     }
+
+                    @Override
+                    public String toString() {
+                        return "processPendingDeletes[" + index + "]";
+                    }
                 });
             }
         }
@@ -406,7 +432,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
             if (reason != null) {
                 logger.debug("{} removing index ({})", index, reason);
-                indicesService.removeIndex(index, reason, "removing index (" + reason + ")");
+                indicesService.removeIndex(index, reason, "removing index (" + reason + ")", shardCloseExecutor, ActionListener.noop());
             } else {
                 // remove shards based on routing nodes (no deletion of data)
                 for (Shard shard : indexService) {
@@ -417,7 +443,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                         // we can just remove the shard without cleaning it locally, since we will clean it in IndicesStore
                         // once all shards are allocated
                         logger.debug("{} removing shard (not allocated)", shardId);
-                        indexService.removeShard(shardId.id(), "removing shard (not allocated)");
+                        indexService.removeShard(shardId.id(), "removing shard (not allocated)", shardCloseExecutor, ActionListener.noop());
                     } else if (newShardRouting.isSameAllocation(currentRoutingEntry) == false) {
                         logger.debug(
                             "{} removing shard (stale allocation id, stale {}, new {})",
@@ -425,20 +451,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                             currentRoutingEntry,
                             newShardRouting
                         );
-                        indexService.removeShard(shardId.id(), "removing shard (stale copy)");
+                        indexService.removeShard(shardId.id(), "removing shard (stale copy)", shardCloseExecutor, ActionListener.noop());
                     } else if (newShardRouting.initializing() && currentRoutingEntry.active()) {
                         // this can happen if the node was isolated/gc-ed, rejoins the cluster and a new shard with the same allocation id
                         // is assigned to it. Batch cluster state processing or if shard fetching completes before the node gets a new
                         // cluster state may result in a new shard being initialized while having the same allocation id as the currently
                         // started shard.
                         logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
-                        indexService.removeShard(shardId.id(), "removing shard (stale copy)");
+                        indexService.removeShard(shardId.id(), "removing shard (stale copy)", shardCloseExecutor, ActionListener.noop());
                     } else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
                         assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
                         // this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
                         // and assigning an initializing primary to this node
                         logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
-                        indexService.removeShard(shardId.id(), "removing shard (stale copy)");
+                        indexService.removeShard(shardId.id(), "removing shard (stale copy)", shardCloseExecutor, ActionListener.noop());
                     }
                 }
             }
@@ -498,7 +524,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                     failShardReason = "failed to create index";
                 } else {
                     failShardReason = "failed to update mapping for index";
-                    indicesService.removeIndex(index, FAILURE, "removing index (mapping update failed)");
+                    indicesService.removeIndex(
+                        index,
+                        FAILURE,
+                        "removing index (mapping update failed)",
+                        shardCloseExecutor,
+                        ActionListener.noop()
+                    );
                 }
                 for (ShardRouting shardRouting : entry.getValue()) {
                     sendFailShard(shardRouting, failShardReason, e, state);
@@ -546,7 +578,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                     reason = "mapping update failed";
                     indexService.updateMapping(currentIndexMetadata, newIndexMetadata);
                 } catch (Exception e) {
-                    indicesService.removeIndex(index, FAILURE, "removing index (" + reason + ")");
+                    indicesService.removeIndex(
+                        index,
+                        FAILURE,
+                        "removing index (" + reason + ")",
+                        shardCloseExecutor,
+                        ActionListener.noop()
+                    );
 
                     // fail shards that would be created or updated by createOrUpdateShards
                     RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
@@ -599,7 +637,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
                     @Override
                     public void onFailure(Exception e) {
-                        failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
+                        failAndRemoveShard(
+                            shardRouting,
+                            true,
+                            "failed to create shard",
+                            e,
+                            state,
+                            shardCloseExecutor,
+                            ActionListener.noop()
+                        );
                     }
                 }, () -> {
                     assert ThreadPool.assertCurrentThreadPool(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME);
@@ -609,7 +655,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
         } catch (Exception e) {
             assert pendingShardCreations.get(shardId) == null
                 || pendingShardCreations.get(shardId).clusterStateUUID().equals(state.stateUUID()) == false;
-            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
+            failAndRemoveShard(shardRouting, true, "failed to create shard", e, state, shardCloseExecutor, ActionListener.noop());
         }
     }
 
@@ -766,7 +812,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                 indexShardRoutingTable
             );
         } catch (Exception e) {
-            failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
+            failAndRemoveShard(
+                shardRouting,
+                true,
+                "failed updating shard routing entry",
+                e,
+                clusterState,
+                shardCloseExecutor,
+                ActionListener.noop()
+            );
             return;
         }
 
@@ -872,7 +926,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
     // package-private for testing
     synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
-        failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
+        failAndRemoveShard(
+            shardRouting,
+            sendShardFailure,
+            "failed recovery",
+            failure,
+            clusterService.state(),
+            EsExecutors.DIRECT_EXECUTOR_SERVICE,
+            ActionListener.noop()
+        );
     }
 
     private void failAndRemoveShard(
@@ -880,14 +942,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
         boolean sendShardFailure,
         String message,
         @Nullable Exception failure,
-        ClusterState state
+        ClusterState state,
+        Executor shardCloseExecutor,
+        ActionListener<Void> shardCloseListener
     ) {
-        try {
+        try (var listeners = new RefCountingListener(shardCloseListener)) {
             AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
             if (indexService != null) {
                 Shard shard = indexService.getShardOrNull(shardRouting.shardId().id());
                 if (shard != null && shard.routingEntry().isSameAllocation(shardRouting)) {
-                    indexService.removeShard(shardRouting.shardId().id(), message);
+                    indexService.removeShard(shardRouting.shardId().id(), message, shardCloseExecutor, listeners.acquire());
                 }
             }
         } catch (ShardNotFoundException e) {
@@ -939,7 +1003,9 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
                         true,
                         "shard failure, reason [" + shardFailure.reason() + "]",
                         shardFailure.cause(),
-                        clusterService.state()
+                        clusterService.state(),
+                        EsExecutors.DIRECT_EXECUTOR_SERVICE /* NB holding mutex while closing shard, ES-8334 TODO revisit this? */,
+                        ActionListener.noop()
                     );
                 }
             });
@@ -1030,7 +1096,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
         /**
          * Removes shard with given id.
          */
-        void removeShard(int shardId, String message);
+        void removeShard(int shardId, String message, Executor closeExecutor, ActionListener<Void> closeListener);
     }
 
     public interface AllocatedIndices<T extends Shard, U extends AllocatedIndex<T>> extends Iterable<U> {
@@ -1059,18 +1125,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
 
         /**
          * Deletes an index that is not assigned to this node. This method cleans up all disk folders relating to the index
-         * but does not deal with in-memory structures. For those call {@link #removeIndex(Index, IndexRemovalReason, String)}
+         * but does not deal with in-memory structures. For those call {@link #removeIndex}
          */
         void deleteUnassignedIndex(String reason, IndexMetadata metadata, ClusterState clusterState);
 
         /**
          * Removes the given index from this service and releases all associated resources. Persistent parts of the index
          * like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
-         * @param index the index to remove
-         * @param reason the reason to remove the index
-         * @param extraInfo extra information that will be used for logging and reporting
+         *
+         * @param index                the index to remove
+         * @param reason               the reason to remove the index
+         * @param extraInfo            extra information that will be used for logging and reporting
+         * @param shardCloseExecutor   executor to use to close individual shards
+         * @param shardsClosedListener listener which is completed when all shards have been closed
          */
-        void removeIndex(Index index, IndexRemovalReason reason, String extraInfo);
+        void removeIndex(
+            Index index,
+            IndexRemovalReason reason,
+            String extraInfo,
+            Executor shardCloseExecutor,
+            ActionListener<Void> shardsClosedListener
+        );
 
         /**
          * Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.

+ 2 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

@@ -268,6 +268,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
         );
         MapperService mapperService = mock(MapperService.class);
         when(shard.mapperService()).thenReturn(mapperService);
+        addMockCloseImplementation(shard);
 
         // merged mapping source needs to be different from previous one for the master node to be invoked
         DocumentMapper mergedDoc = mock(DocumentMapper.class);
@@ -991,6 +992,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong(), anyString())).thenCallRealMethod();
             MapperService mapperService = mock(MapperService.class);
             when(shard.mapperService()).thenReturn(mapperService);
+            addMockCloseImplementation(shard);
 
             // merged mapping source needs to be different from previous one for the master node to be invoked
             DocumentMapper mergedDoc = mock(DocumentMapper.class);

+ 3 - 1
server/src/test/java/org/elasticsearch/index/IndexServiceTests.java

@@ -461,6 +461,8 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
     }
 
     public static void closeIndexService(IndexService indexService) throws IOException {
-        indexService.close("IndexServiceTests#closeIndexService", false);
+        CloseUtils.executeDirectly(
+            l -> indexService.close("IndexServiceTests#closeIndexService", false, EsExecutors.DIRECT_EXECUTOR_SERVICE, l)
+        );
     }
 }

+ 32 - 0
server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -64,6 +64,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
+import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Releasable;
@@ -150,6 +151,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
@@ -294,6 +296,36 @@ public class IndexShardTests extends IndexShardTestCase {
         );
     }
 
+    public void testAsyncCloseShard() throws Exception {
+        final var shard = newStartedShard();
+        final var store = shard.store();
+        final var storeCloser = new RunOnce(store::close);
+        final var engine = Objects.requireNonNull(shard.getEngineOrNull());
+
+        final var closeFuture = new PlainActionFuture<Void>();
+        final var closeTasks = new ArrayList<Runnable>();
+        shard.close(getTestName(), randomBoolean(), closeTasks::add, closeFuture);
+
+        if (randomBoolean()) {
+            storeCloser.run();
+        }
+
+        assertFalse(closeFuture.isDone());
+        assertThat(closeTasks, hasSize(1));
+        assertEquals(IndexShardState.CLOSED, shard.state());
+        assertNull(shard.getEngineOrNull());
+        EngineTestCase.ensureOpen(engine); // does not throw ACE
+
+        if (randomBoolean()) {
+            storeCloser.run();
+        }
+        assertTrue(store.hasReferences());
+
+        closeTasks.forEach(Runnable::run);
+        storeCloser.run();
+        assertFalse(store.hasReferences());
+    }
+
     ShardStateMetadata getShardStateMetadata(IndexShard shard) {
         ShardRouting shardRouting = shard.routingEntry();
         if (shardRouting == null) {

+ 4 - 2
server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java

@@ -7,6 +7,7 @@
  */
 package org.elasticsearch.indices;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -15,6 +16,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingHelper;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.IndexSettings;
@@ -105,7 +107,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
             }
 
         };
-        indicesService.removeIndex(idx, DELETED, "simon says");
+        indicesService.removeIndex(idx, DELETED, "simon says", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionListener.noop());
         try {
             IndexService index = indicesService.createIndex(metadata, Arrays.asList(countingListener), false);
             assertEquals(3, counter.get());
@@ -126,7 +128,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
             IndexShardTestCase.updateRoutingEntry(shard, newRouting);
             assertEquals(6, counter.get());
         } finally {
-            indicesService.removeIndex(idx, DELETED, "simon says");
+            indicesService.removeIndex(idx, DELETED, "simon says", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionListener.noop());
         }
         assertEquals(10, counter.get());
     }

+ 3 - 1
server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java

@@ -13,6 +13,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
+import org.elasticsearch.action.support.ActionTestUtils;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
@@ -26,6 +27,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.env.ShardLockObtainFailedException;
@@ -270,7 +272,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
             indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),
             ShardDeletionCheckResult.STILL_ALLOCATED
         );
-        test.removeShard(0, "boom");
+        test.removeShard(0, "boom", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionTestUtils.assertNoFailureListener(v -> {}));
         assertEquals(
             "shard is removed",
             indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),

+ 20 - 8
server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -214,12 +215,19 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
         }
 
         @Override
-        public synchronized void removeIndex(Index index, IndexRemovalReason reason, String extraInfo) {
+        public synchronized void removeIndex(
+            Index index,
+            IndexRemovalReason reason,
+            String extraInfo,
+            Executor shardCloseExecutor,
+            ActionListener<Void> shardsClosedListener
+        ) {
             if (hasIndex(index)) {
                 Map<String, MockIndexService> newIndices = new HashMap<>(indices);
                 newIndices.remove(index.getUUID());
                 indices = unmodifiableMap(newIndices);
             }
+            shardsClosedListener.onResponse(null);
         }
 
         @Override
@@ -304,14 +312,18 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
         }
 
         @Override
-        public synchronized void removeShard(int shardId, String reason) {
-            if (shards.containsKey(shardId) == false) {
-                return;
+        public synchronized void removeShard(int shardId, String reason, Executor closeExecutor, ActionListener<Void> closeListener) {
+            try {
+                if (shards.containsKey(shardId) == false) {
+                    return;
+                }
+                HashMap<Integer, MockIndexShard> newShards = new HashMap<>(shards);
+                MockIndexShard indexShard = newShards.remove(shardId);
+                assert indexShard != null;
+                shards = unmodifiableMap(newShards);
+            } finally {
+                closeListener.onResponse(null);
             }
-            HashMap<Integer, MockIndexShard> newShards = new HashMap<>(shards);
-            MockIndexShard indexShard = newShards.remove(shardId);
-            assert indexShard != null;
-            shards = unmodifiableMap(newShards);
         }
 
         @Override

+ 10 - 2
server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.indices.cluster;
 
 import org.elasticsearch.TransportVersion;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
 import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -61,6 +62,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -570,8 +572,14 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
         private Set<Index> deletedIndices = Collections.emptySet();
 
         @Override
-        public synchronized void removeIndex(Index index, IndexRemovalReason reason, String extraInfo) {
-            super.removeIndex(index, reason, extraInfo);
+        public synchronized void removeIndex(
+            Index index,
+            IndexRemovalReason reason,
+            String extraInfo,
+            Executor shardCloseExecutor,
+            ActionListener<Void> shardsClosedListener
+        ) {
+            super.removeIndex(index, reason, extraInfo, shardCloseExecutor, shardsClosedListener);
             if (reason == IndexRemovalReason.DELETED) {
                 Set<Index> newSet = Sets.newHashSet(deletedIndices);
                 newSet.add(index);

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -1645,4 +1645,8 @@ public abstract class EngineTestCase extends ESTestCase {
             fail(e);
         }
     }
+
+    public static void ensureOpen(Engine engine) {
+        engine.ensureOpen();
+    }
 }

+ 17 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -9,6 +9,7 @@ package org.elasticsearch.index.shard;
 
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.store.Directory;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
@@ -30,11 +31,13 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.CloseUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.IndexSettings;
@@ -104,6 +107,9 @@ import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBui
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
@@ -141,6 +147,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
     protected Executor writeExecutor;
     protected long primaryTerm;
 
+    public static void addMockCloseImplementation(IndexShard shard) throws IOException {
+        doAnswer(invocation -> {
+            final ActionListener<Void> listener = invocation.getArgument(3);
+            listener.onResponse(null);
+            return null;
+        }).when(shard).close(any(), anyBoolean(), any(), any());
+    }
+
     @Override
     public void setUp() throws Exception {
         super.setUp();
@@ -697,7 +711,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
      * Close an {@link IndexShard}, optionally flushing first, without performing the consistency checks that {@link #closeShard} performs.
      */
     public static void closeShardNoCheck(IndexShard indexShard, boolean flushEngine) throws IOException {
-        indexShard.close("IndexShardTestCase#closeShardNoCheck", flushEngine);
+        CloseUtils.executeDirectly(
+            l -> indexShard.close("IndexShardTestCase#closeShardNoCheck", flushEngine, EsExecutors.DIRECT_EXECUTOR_SERVICE, l)
+        );
     }
 
     /**