Browse Source

Fix Threading in Snapshot Restore (#68390)

Same as #68023 but even less likely (couldn't really find a quick way
to write a test for it for that reason).
Fix is the same, fork off to the generic pool for listener handling.
Also, this allows removing the forking in the transport action since we don't do any long
runnning work on the calling thread any longer in the restore method.
Armin Braun 4 years ago
parent
commit
78dc03cbab

+ 1 - 3
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java

@@ -32,10 +32,8 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
     public TransportRestoreSnapshotAction(TransportService transportService, ClusterService clusterService,
                                           ThreadPool threadPool, RestoreService restoreService, ActionFilters actionFilters,
                                           IndexNameExpressionResolver indexNameExpressionResolver) {
-        // Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
-        // which would block the request from getting an error response because of the ongoing task
         super(RestoreSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters,
-              RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.GENERIC);
+              RestoreSnapshotRequest::new, indexNameExpressionResolver, RestoreSnapshotResponse::new, ThreadPool.Names.SAME);
         this.restoreService = restoreService;
     }
 

+ 1 - 2
server/src/main/java/org/elasticsearch/node/Node.java

@@ -597,8 +597,7 @@ public class Node implements Closeable {
             SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
                 transportService, indicesService);
             RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
-                metadataCreateIndexService, indexMetadataVerifier, clusterService.getClusterSettings(), shardLimitValidator);
-
+                    metadataCreateIndexService, indexMetadataVerifier, shardLimitValidator);
             final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
                 clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
             clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);

+ 9 - 7
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -16,9 +16,9 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.StepListener;
 import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
@@ -66,6 +66,7 @@ import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -152,8 +153,7 @@ public class RestoreService implements ClusterStateApplier {
 
     public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
                           AllocationService allocationService, MetadataCreateIndexService createIndexService,
-                          IndexMetadataVerifier indexMetadataVerifier, ClusterSettings clusterSettings,
-                          ShardLimitValidator shardLimitValidator) {
+                          IndexMetadataVerifier indexMetadataVerifier, ShardLimitValidator shardLimitValidator) {
         this.clusterService = clusterService;
         this.repositoriesService = repositoriesService;
         this.allocationService = allocationService;
@@ -191,9 +191,7 @@ public class RestoreService implements ClusterStateApplier {
             // Read snapshot info and metadata from the repository
             final String repositoryName = request.repository();
             Repository repository = repositoriesService.repository(repositoryName);
-            final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
-            repository.getRepositoryData(repositoryDataListener);
-            repositoryDataListener.whenComplete(repositoryData -> {
+            ActionListener<RepositoryData> repoDataListener = ActionListener.wrap(repositoryData -> {
                 final String snapshotName = request.snapshot();
                 final Optional<SnapshotId> matchingSnapshotId = repositoryData.getSnapshotIds().stream()
                     .filter(s -> snapshotName.equals(s.getName())).findFirst();
@@ -499,7 +497,7 @@ public class RestoreService implements ClusterStateApplier {
                     }
 
                     private void validateExistingIndex(IndexMetadata currentIndexMetadata, IndexMetadata snapshotIndexMetadata,
-                        String renamedIndex, boolean partial) {
+                                                       String renamedIndex, boolean partial) {
                         // Index exist - checking that it's closed
                         if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
                             // TODO: Enable restore for open indices
@@ -594,6 +592,10 @@ public class RestoreService implements ClusterStateApplier {
                     }
                 });
             }, listener::onFailure);
+            // fork handling the above listener to the generic pool since it loads various pieces of metadata from the repository over a
+            // longer period of time
+            repository.getRepositoryData(new ThreadedActionListener<>(logger, clusterService.getClusterApplierService().threadPool(),
+                    ThreadPool.Names.GENERIC, repoDataListener, false));
         } catch (Exception e) {
             logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot",
                 request.repository() + ":" + request.snapshot()), e);

+ 0 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1567,7 +1567,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         mapperRegistry,
                         indexScopedSettings,
                         null),
-                    clusterSettings,
                         shardLimitValidator
                 );
                 actions.put(PutMappingAction.INSTANCE,