Selaa lähdekoodia

Add Functionality to Consistently Read RepositoryData For CS Updates (#55773)

Using optimistic locking, add the ability to run a repository state
update task with a consistent view of the current repository data.
Allows for a follow-up to remove the snapshot INIT state.
Armin Braun 5 vuotta sitten
vanhempi
commit
c1fca1255b

+ 8 - 0
server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

@@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -42,6 +43,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 public class FilterRepository implements Repository {
@@ -146,6 +148,12 @@ public class FilterRepository implements Repository {
         in.updateState(state);
     }
 
+    @Override
+    public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                             Consumer<Exception> onFailure) {
+        in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
+    }
+
     @Override
     public Lifecycle.State lifecycleState() {
         return in.lifecycleState();

+ 17 - 0
server/src/main/java/org/elasticsearch/repositories/Repository.java

@@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.SnapshotsInProgress;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -43,6 +44,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -256,6 +258,21 @@ public interface Repository extends LifecycleComponent {
      */
     void updateState(ClusterState state);
 
+    /**
+     * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
+     * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
+     * at the time the {@code RepositoryData} was loaded.
+     * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
+     * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
+     * cluster state.
+     *
+     * @param createUpdateTask function to supply cluster state update task
+     * @param source           the source of the cluster state update task
+     * @param onFailure        error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
+     */
+    void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                      Consumer<Exception> onFailure);
+
     /**
      * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
      * during snapshot initialization.

+ 62 - 0
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -327,6 +328,67 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
         }
     }
 
+    @Override
+    public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                             Consumer<Exception> onFailure) {
+        threadPool.generic().execute(new AbstractRunnable() {
+            @Override
+            protected void doRun() {
+                final RepositoryMetadata repositoryMetadataStart = metadata;
+                getRepositoryData(ActionListener.wrap(repositoryData -> {
+                    final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
+                    clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
+
+                        private boolean executedTask = false;
+
+                        @Override
+                        public ClusterState execute(ClusterState currentState) throws Exception {
+                            // Comparing the full metadata here on purpose instead of simply comparing the safe generation.
+                            // If the safe generation has changed, then we have to reload repository data and start over.
+                            // If the pending generation has changed we are in the midst of a write operation and might pick up the
+                            // updated repository data and state on the retry. We don't want to wait for the write to finish though
+                            // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
+                            // to change in any form.
+                            if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
+                                executedTask = true;
+                                return updateTask.execute(currentState);
+                            }
+                            return currentState;
+                        }
+
+                        @Override
+                        public void onFailure(String source, Exception e) {
+                            if (executedTask) {
+                                updateTask.onFailure(source, e);
+                            } else {
+                                onFailure.accept(e);
+                            }
+                        }
+
+                        @Override
+                        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                            if (executedTask) {
+                                updateTask.clusterStateProcessed(source, oldState, newState);
+                            } else {
+                                executeConsistentStateUpdate(createUpdateTask, source, onFailure);
+                            }
+                        }
+
+                        @Override
+                        public TimeValue timeout() {
+                            return updateTask.timeout();
+                        }
+                    });
+                }, onFailure));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                onFailure.accept(e);
+            }
+        });
+    }
+
     // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
     // #latestKnownRepoGen if a newer than currently known generation is found
     @Override

+ 38 - 23
server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

@@ -988,7 +988,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
      * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
      * the repository.
      * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
-     * given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}.
+     * given names in the repository and deletes them.
      *
      * @param request         delete snapshot request
      * @param listener        listener
@@ -1092,18 +1092,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 if (runningSnapshot == null) {
-                    threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
-                            repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData ->
-                                    deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName),
-                                            repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure))));
+                    try {
+                        repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
+                                createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
+                                        repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
+                                        "delete completed snapshots", listener::onFailure);
+                    } catch (RepositoryMissingException e) {
+                        listener.onFailure(e);
+                    }
                     return;
                 }
                 logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
                 addListener(runningSnapshot, ActionListener.wrap(
                     result -> {
                         logger.debug("deleted snapshot completed - deleting files");
-                        deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName,
-                                result.v1().getGenId(), Priority.IMMEDIATE, listener);
+                        clusterService.submitStateUpdateTask("delete snapshot",
+                                createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
+                                        result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
                     },
                     e -> {
                         if (abortedDuringInit) {
@@ -1174,23 +1179,33 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
         return snapshotEntry;
     }
 
-    /**
-     * Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state.
-     *
-     * @param snapshotIds       Snapshots to delete
-     * @param repoName          Repository name
-     * @param repositoryStateId Repository generation to base the delete on
-     * @param listener          Listener to complete when done
-     */
-    private void deleteCompletedSnapshots(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId, Priority priority,
-                                          ActionListener<Void> listener) {
+    private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
+                                                           @Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
+        // Short circuit to noop state update if there isn't anything to delete
         if (snapshotIds.isEmpty()) {
-            listener.onResponse(null);
-            return;
+            return new ClusterStateUpdateTask() {
+                @Override
+                public ClusterState execute(ClusterState currentState) {
+                    return currentState;
+                }
+
+                @Override
+                public void onFailure(String source, Exception e) {
+                    listener.onFailure(e);
+                }
+
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    listener.onResponse(null);
+                }
+
+                @Override
+                public TimeValue timeout() {
+                    return timeout;
+                }
+            };
         }
-        logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId,
-            priority);
-        clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
+        return new ClusterStateUpdateTask(priority) {
             @Override
             public ClusterState execute(ClusterState currentState) {
                 SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
@@ -1246,7 +1261,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                 deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
             }
-        });
+        };
     }
 
     /**

+ 7 - 0
server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -53,6 +54,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.mockito.Mockito.mock;
@@ -225,6 +227,11 @@ public class RepositoriesServiceTests extends ESTestCase {
         public void updateState(final ClusterState state) {
         }
 
+        @Override
+        public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                                 Consumer<Exception> onFailure) {
+        }
+
         @Override
         public Lifecycle.State lifecycleState() {
             return null;

+ 59 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -202,6 +202,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -220,6 +221,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.iterableWithSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.Mockito.mock;
 
 public class SnapshotResiliencyTests extends ESTestCase {
@@ -506,7 +508,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
         assertEquals(0, snapshotInfo.failedShards());
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55702")
     public void testConcurrentSnapshotCreateAndDeleteOther() {
         setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
 
@@ -738,6 +739,63 @@ public class SnapshotResiliencyTests extends ESTestCase {
         assertEquals(0, snapshotInfo.failedShards());
     }
 
+    public void testConcurrentDeletes() {
+        setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
+
+        String repoName = "repo";
+        String snapshotName = "snapshot";
+        final String index = "test";
+        final int shards = randomIntBetween(1, 10);
+
+        TestClusterNodes.TestClusterNode masterNode =
+                testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
+
+        final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
+
+        continueOrDie(createRepoAndIndex(repoName, index, shards),
+                createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
+                        .setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
+
+        final Collection<StepListener<Boolean>> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>());
+
+        final AtomicInteger successfulDeletes = new AtomicInteger(0);
+
+        continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
+            for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
+                client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(
+                        ActionListener.wrap(
+                                resp -> deleteListener.onResponse(true),
+                                e -> {
+                                    final Throwable unwrapped = ExceptionsHelper.unwrap(
+                                            e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class);
+                                    assertThat(unwrapped, notNullValue());
+                                    deleteListener.onResponse(false);
+                                }));
+            }
+        });
+
+        for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
+            continueOrDie(deleteListener, deleted -> {
+                if (deleted) {
+                    successfulDeletes.incrementAndGet();
+                }
+            });
+        }
+
+        deterministicTaskQueue.runAllRunnableTasks();
+
+        SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
+        assertFalse(deletionsInProgress.hasDeletionsInProgress());
+        final Repository repository = masterNode.repositoriesService.repository(repoName);
+        final RepositoryData repositoryData = getRepositoryData(repository);
+        Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
+        // We end up with no snapshots since at least one of the deletes worked out
+        assertThat(snapshotIds, empty());
+        assertThat(successfulDeletes.get(), either(is(1)).or(is(2)));
+        // We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1
+        assertThat(repositoryData.getGenId(), is(1L));
+    }
+
     /**
      * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
      * deleting a snapshot.

+ 8 - 0
test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

@@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexCommit;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@@ -44,6 +45,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static java.util.Collections.emptyList;
@@ -153,4 +155,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
     @Override
     public void updateState(final ClusterState state) {
     }
+
+    @Override
+    public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                             Consumer<Exception> onFailure) {
+        throw new UnsupportedOperationException("Unsupported for restore-only repository");
+    }
 }

+ 8 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -92,6 +93,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongConsumer;
 import java.util.function.Supplier;
@@ -433,6 +435,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
     public void updateState(ClusterState state) {
     }
 
+    @Override
+    public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
+                                             Consumer<Exception> onFailure) {
+        throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
+    }
+
     private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
                                 Client followerClient, Index followerIndex) {
         final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();