Explorar el Código

Move executeConsistentStateUpdate method from Repository to SnapshotsService (#94020)

This refactoring helps make https://github.com/elastic/elasticsearch/pull/90332 easier to read.
Also, even in isolation it saves a lot of code and seems to generally be the right thing to have this
method in the snapshots service.
Armin Braun hace 2 años
padre
commit
d27102fa7e

+ 0 - 12
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -10,7 +10,6 @@ package org.elasticsearch.repositories;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -28,8 +27,6 @@ import org.elasticsearch.snapshots.SnapshotId;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 public class FilterRepository implements Repository {
 
@@ -145,15 +142,6 @@ public class FilterRepository implements Repository {
         in.updateState(state);
     }
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
-    }
-
     @Override
     public void cloneShardSnapshot(
         SnapshotId source,

+ 0 - 12
server/src/main/java/org/elasticsearch/repositories/InvalidRepository.java

@@ -11,7 +11,6 @@ package org.elasticsearch.repositories;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -26,8 +25,6 @@ import org.elasticsearch.snapshots.SnapshotId;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * Represents a repository that exists in the cluster state but could not be instantiated on a node, typically due to invalid configuration.
@@ -148,15 +145,6 @@ public class InvalidRepository extends AbstractLifecycleComponent implements Rep
 
     }
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        onFailure.accept(createCreationException());
-    }
-
     @Override
     public void cloneShardSnapshot(
         SnapshotId source,

+ 0 - 20
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -10,7 +10,6 @@ package org.elasticsearch.repositories;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -31,7 +30,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -262,24 +260,6 @@ public interface Repository extends LifecycleComponent {
      */
     void updateState(ClusterState state);
 
-    /**
-     * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
-     * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
-     * at the time the {@code RepositoryData} was loaded.
-     * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
-     * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
-     * cluster state.
-     *
-     * @param createUpdateTask function to supply cluster state update task
-     * @param source           the source of the cluster state update task
-     * @param onFailure        error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
-     */
-    void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    );
-
     /**
      * Clones a shard snapshot.
      *

+ 0 - 12
server/src/main/java/org/elasticsearch/repositories/UnknownTypeRepository.java

@@ -11,7 +11,6 @@ package org.elasticsearch.repositories;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -26,8 +25,6 @@ import org.elasticsearch.snapshots.SnapshotId;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * This class represents a repository that could not be initialized due to unknown type.
@@ -146,15 +143,6 @@ public class UnknownTypeRepository extends AbstractLifecycleComponent implements
 
     }
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        onFailure.accept(createUnknownTypeException());
-    }
-
     @Override
     public void cloneShardSnapshot(
         SnapshotId source,

+ 2 - 51
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -488,55 +488,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         FutureUtils.get(future);
     }
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        final RepositoryMetadata repositoryMetadataStart = metadata;
-        getRepositoryData(ActionListener.wrap(repositoryData -> {
-            final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
-            submitUnbatchedTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) {
-
-                private boolean executedTask = false;
-
-                @Override
-                public ClusterState execute(ClusterState currentState) throws Exception {
-                    // Comparing the full metadata here on purpose instead of simply comparing the safe generation.
-                    // If the safe generation has changed, then we have to reload repository data and start over.
-                    // If the pending generation has changed we are in the midst of a write operation and might pick up the
-                    // updated repository data and state on the retry. We don't want to wait for the write to finish though
-                    // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
-                    // to change in any form.
-                    if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
-                        executedTask = true;
-                        return updateTask.execute(currentState);
-                    }
-                    return currentState;
-                }
-
-                @Override
-                public void onFailure(Exception e) {
-                    if (executedTask) {
-                        updateTask.onFailure(e);
-                    } else {
-                        onFailure.accept(e);
-                    }
-                }
-
-                @Override
-                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                    if (executedTask) {
-                        updateTask.clusterStateProcessed(oldState, newState);
-                    } else {
-                        executeConsistentStateUpdate(createUpdateTask, source, onFailure);
-                    }
-                }
-            });
-        }, onFailure));
-    }
-
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
@@ -1825,8 +1776,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
     /**
      * Method used to set the current repository generation in the cluster state's {@link RepositoryMetadata} to the latest generation that
      * can be physically found in the repository before passing the latest {@link RepositoryData} to the given listener.
-     * This ensures that operations using {@link #executeConsistentStateUpdate} right after mounting a fresh repository will have a
-     * consistent view of the {@link RepositoryData} before any data has been written to the repository.
+     * This ensures that operations using {@link SnapshotsService#executeConsistentStateUpdate} right after mounting a fresh repository will
+     * have a consistent view of the {@link RepositoryData} before any data has been written to the repository.
      *
      * @param listener listener to resolve with new repository data
      */

+ 71 - 4
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
+import org.elasticsearch.cluster.metadata.RepositoryMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -294,7 +295,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             featureStatesSet = Collections.emptySet();
         }
 
-        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
+        executeConsistentStateUpdate(repository, repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
 
             private SnapshotsInProgress.Entry newEntry;
 
@@ -462,7 +463,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
         final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
         initializingClones.add(snapshot);
-        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
+        executeConsistentStateUpdate(repository, repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
 
             private SnapshotsInProgress.Entry newEntry;
 
@@ -627,7 +628,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         }, onFailure);
 
         // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
-        allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() {
+        allShardCountsListener.whenComplete(counts -> executeConsistentStateUpdate(repository, repoData -> new ClusterStateUpdateTask() {
 
             private SnapshotsInProgress.Entry updatedEntry;
 
@@ -2066,7 +2067,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         );
 
         final Repository repository = repositoriesService.repository(repositoryName);
-        repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
+        executeConsistentStateUpdate(repository, repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) {
 
             private SnapshotDeletionsInProgress.Entry newDelete = null;
 
@@ -2399,6 +2400,72 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         clusterService.submitUnbatchedStateUpdateTask(source, task);
     }
 
+    /**
+     * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
+     * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
+     * at the time the {@code RepositoryData} was loaded.
+     * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
+     * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
+     * cluster state.
+     *
+     * @param repository       repository to execute update for
+     * @param createUpdateTask function to supply cluster state update task
+     * @param source           the source of the cluster state update task
+     * @param onFailure        error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
+     */
+    private void executeConsistentStateUpdate(
+        Repository repository,
+        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
+        String source,
+        Consumer<Exception> onFailure
+    ) {
+        final RepositoryMetadata repositoryMetadataStart = repository.getMetadata();
+        repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
+            final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
+            submitUnbatchedTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) {
+
+                private boolean executedTask = false;
+
+                @Override
+                public ClusterState execute(ClusterState currentState) throws Exception {
+                    // Comparing the full metadata here on purpose instead of simply comparing the safe generation.
+                    // If the safe generation has changed, then we have to reload repository data and start over.
+                    // If the pending generation has changed we are in the midst of a write operation and might pick up the
+                    // updated repository data and state on the retry. We don't want to wait for the write to finish though
+                    // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
+                    // to change in any form.
+                    if (repositoryMetadataStart.equals(
+                        currentState.getMetadata()
+                            .<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
+                            .repository(repository.getMetadata().name())
+                    )) {
+                        executedTask = true;
+                        return updateTask.execute(currentState);
+                    }
+                    return currentState;
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    if (executedTask) {
+                        updateTask.onFailure(e);
+                    } else {
+                        onFailure.accept(e);
+                    }
+                }
+
+                @Override
+                public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+                    if (executedTask) {
+                        updateTask.clusterStateProcessed(oldState, newState);
+                    } else {
+                        executeConsistentStateUpdate(repository, createUpdateTask, source, onFailure);
+                    }
+                }
+            });
+        }, onFailure));
+    }
+
     /** Deletes snapshot from repository
      *
      * @param deleteEntry       delete entry in cluster state

+ 0 - 10
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -15,7 +15,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
@@ -50,8 +49,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.isA;
@@ -413,13 +410,6 @@ public class RepositoriesServiceTests extends ESTestCase {
         @Override
         public void updateState(final ClusterState state) {}
 
-        @Override
-        public void executeConsistentStateUpdate(
-            Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-            String source,
-            Consumer<Exception> onFailure
-        ) {}
-
         @Override
         public void cloneShardSnapshot(
             SnapshotId source,

+ 0 - 12
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -10,7 +10,6 @@ package org.elasticsearch.index.shard;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -33,8 +32,6 @@ import org.elasticsearch.snapshots.SnapshotId;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
@@ -149,15 +146,6 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     @Override
     public void awaitIdle() {}
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        throw new UnsupportedOperationException("Unsupported for restore-only repository");
-    }
-
     @Override
     public void cloneShardSnapshot(
         SnapshotId source,

+ 0 - 12
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -27,7 +27,6 @@ import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -102,8 +101,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.LongConsumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -523,15 +520,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     @Override
     public void updateState(ClusterState state) {}
 
-    @Override
-    public void executeConsistentStateUpdate(
-        Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
-        String source,
-        Consumer<Exception> onFailure
-    ) {
-        throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
-    }
-
     @Override
     public void cloneShardSnapshot(
         SnapshotId source,