Explorar o código

Ensure Node Shutdown Waits for Running Restores to Complete (#76070)

We must wait for ongoing restores to complete before shutting down the repositories
service. Otherwise we may leak file descriptors because tasks for releasing the store
are submitted to the `SNAPSHOT` or some searchable snapshot pools that quietly accept
but never reject/fail tasks after shutdown.

same as #46178 where we had the same bug in recoveries

closes #75686
Armin Braun %!s(int64=4) %!d(string=hai) anos
pai
achega
f62618c5ae

+ 5 - 0
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -181,6 +181,11 @@ public class FilterRepository implements Repository {
         in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
     }
 
+    @Override
+    public void awaitIdle() {
+        in.awaitIdle();
+    }
+
     @Override
     public Lifecycle.State lifecycleState() {
         return in.lifecycleState();

+ 3 - 0
server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

@@ -771,5 +771,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
         repos.addAll(internalRepositories.values());
         repos.addAll(repositories.values());
         IOUtils.close(repos);
+        for (Repository repo : repos) {
+            repo.awaitIdle();
+        }
     }
 }

+ 9 - 0
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -321,6 +321,15 @@ public interface Repository extends LifecycleComponent {
         return userMetadata;
     }
 
+    /**
+     * Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed
+     * by a call to stop {@link #close()}.
+     * Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface
+     * as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the
+     * cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked.
+     */
+    void awaitIdle();
+
     static boolean assertSnapshotMetaThread() {
         final String threadName = Thread.currentThread().getName();
         assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-")

+ 53 - 1
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -30,6 +30,7 @@ import org.elasticsearch.action.ResultDeduplicator;
 import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -66,6 +67,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -423,6 +425,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    // listeners to invoke when a restore completes and there are no more restores running
+    @Nullable
+    private List<ActionListener<Void>> emptyListeners;
+
+    // Set of shard ids that this repository is currently restoring
+    private final Set<ShardId> ongoingRestores = new HashSet<>();
+
+    @Override
+    public void awaitIdle() {
+        assert lifecycle.stoppedOrClosed();
+        final PlainActionFuture<Void> future;
+        synchronized (ongoingRestores) {
+            if (ongoingRestores.isEmpty()) {
+                return;
+            }
+            future = new PlainActionFuture<>();
+            if (emptyListeners == null) {
+                emptyListeners = new ArrayList<>();
+            }
+            emptyListeners.add(future);
+        }
+        FutureUtils.get(future);
+    }
+
     @Override
     public void executeConsistentStateUpdate(
         Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
@@ -2885,7 +2911,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         );
         final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
         final BlobContainer container = shardContainer(indexId, snapshotShardId);
-        executor.execute(ActionRunnable.wrap(restoreListener, l -> {
+        synchronized (ongoingRestores) {
+            if (store.isClosing()) {
+                restoreListener.onFailure(new AlreadyClosedException("store is closing"));
+                return;
+            }
+            if (lifecycle.started() == false) {
+                restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed"));
+                return;
+            }
+            final boolean added = ongoingRestores.add(shardId);
+            assert added : "add restore for [" + shardId + "] that already has an existing restore";
+        }
+        executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> {
+            final List<ActionListener<Void>> onEmptyListeners;
+            synchronized (ongoingRestores) {
+                if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) {
+                    onEmptyListeners = emptyListeners;
+                    emptyListeners = null;
+                } else {
+                    return;
+                }
+            }
+            ActionListener.onResponse(onEmptyListeners, null);
+        }), l -> {
             final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
             final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
             new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@@ -2991,6 +3040,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                     if (store.isClosing()) {
                         throw new AlreadyClosedException("store is closing");
                     }
+                    if (lifecycle.started() == false) {
+                        throw new AlreadyClosedException("repository [" + metadata.name() + "] closed");
+                    }
                 }
 
             }.restore(snapshotFiles, store, l);

+ 3 - 0
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -317,6 +317,9 @@ public class RepositoriesServiceTests extends ESTestCase {
 
         }
 
+        @Override
+        public void awaitIdle() {}
+
         @Override
         public Lifecycle.State lifecycleState() {
             return null;

+ 4 - 0
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -148,6 +148,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     public void updateState(final ClusterState state) {
     }
 
+    @Override
+    public void awaitIdle() {
+    }
+
     @Override
     public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
                                              Consumer<Exception> onFailure) {

+ 4 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -494,6 +494,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
         throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
     }
 
+    @Override
+    public void awaitIdle() {
+    }
+
     private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
                                 Client followerClient, Index followerIndex) {
         final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();