瀏覽代碼

Use Consistent Cluster State in GetSnapshotsAction (#56352)

We shouldn't needlessly create a transport request here when all
`TransportGetRepositoriesAction` does, is to filter the cluster state.
Aside from just being needlessly complex, this can also introduce a strange race
if the master gets disconnected/fails-over from the cluster during the request and then
starts to wait for the new master in the transport master node action and then continues
processing the request with an outdated instance of `SnapshotsInProgress`.
(especially strange when just loading the `_current` snapshot and running into a
concurrent fail-over)
=> extracted the logic for getting the repo list to simplify things and avoid strange races.
Armin Braun 5 年之前
父節點
當前提交
99a8f4d8c4

+ 18 - 8
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java

@@ -76,18 +76,29 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadActio
     @Override
     protected void masterOperation(Task task, final GetRepositoriesRequest request, ClusterState state,
                                    final ActionListener<GetRepositoriesResponse> listener) {
+        listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(getRepositories(state, request.repositories()))));
+    }
+
+    /**
+     * Get repository metadata for given repository names from given cluster state.
+     *
+     * @param state     Cluster state
+     * @param repoNames Repository names or patterns to get metadata for
+     * @return list of repository metadata
+     */
+    public static List<RepositoryMetadata> getRepositories(ClusterState state, String[] repoNames) {
         Metadata metadata = state.metadata();
         RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
-        if (request.repositories().length == 0 || (request.repositories().length == 1 && "_all".equals(request.repositories()[0]))) {
+        if (repoNames.length == 0 || (repoNames.length == 1 && "_all".equals(repoNames[0]))) {
             if (repositories != null) {
-                listener.onResponse(new GetRepositoriesResponse(repositories));
+                return repositories.repositories();
             } else {
-                listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(Collections.emptyList())));
+                return Collections.emptyList();
             }
         } else {
             if (repositories != null) {
                 Set<String> repositoriesToGet = new LinkedHashSet<>(); // to keep insertion order
-                for (String repositoryOrPattern : request.repositories()) {
+                for (String repositoryOrPattern : repoNames) {
                     if (Regex.isSimpleMatchPattern(repositoryOrPattern) == false) {
                         repositoriesToGet.add(repositoryOrPattern);
                     } else {
@@ -102,14 +113,13 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadActio
                 for (String repository : repositoriesToGet) {
                     RepositoryMetadata repositoryMetadata = repositories.repository(repository);
                     if (repositoryMetadata == null) {
-                        listener.onFailure(new RepositoryMissingException(repository));
-                        return;
+                        throw new RepositoryMissingException(repository);
                     }
                     repositoryListBuilder.add(repositoryMetadata);
                 }
-                listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(repositoryListBuilder)));
+                return repositoryListBuilder;
             } else {
-                listener.onFailure(new RepositoryMissingException(request.repositories()[0]));
+                throw new RepositoryMissingException(repoNames[0]);
             }
         }
     }

+ 3 - 13
server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

@@ -25,12 +25,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.StepListener;
-import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
-import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
-import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
+import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -56,7 +53,6 @@ import org.elasticsearch.snapshots.SnapshotMissingException;
 import org.elasticsearch.snapshots.SnapshotsService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
@@ -107,14 +103,8 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
     @Override
     protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
                                    final ActionListener<GetSnapshotsResponse> listener) {
-        final String[] repositories = request.repositories();
-        final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
-        transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
-            new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
-            new ActionListenerResponseHandler<>(
-                ActionListener.wrap(response -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
-                    request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener), listener::onFailure),
-                GetRepositoriesResponse::new));
+        getMultipleReposSnapshotInfo(state.custom(SnapshotsInProgress.TYPE), TransportGetRepositoriesAction.getRepositories(
+                state, request.repositories()), request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener);
     }
 
     private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, List<RepositoryMetadata> repos,