|
@@ -316,7 +316,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
ensureRepositoryExists(repositoryName, currentState);
|
|
|
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
|
|
|
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "clone snapshot");
|
|
|
- final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshots = SnapshotsInProgress.get(currentState);
|
|
|
ensureSnapshotNameNotRunning(snapshots, repositoryName, snapshotName);
|
|
|
validate(repositoryName, snapshotName, currentState);
|
|
|
|
|
@@ -325,10 +325,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
.filter(src -> src.getName().equals(request.source()))
|
|
|
.findAny()
|
|
|
.orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source()));
|
|
|
- final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
|
|
|
if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) {
|
|
|
throw new ConcurrentSnapshotExecutionException(
|
|
|
repositoryName,
|
|
@@ -391,10 +388,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
final String snapshotName,
|
|
|
final String reason
|
|
|
) {
|
|
|
- final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(
|
|
|
- RepositoryCleanupInProgress.TYPE,
|
|
|
- RepositoryCleanupInProgress.EMPTY
|
|
|
- );
|
|
|
+ final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
|
|
|
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
|
|
|
throw new ConcurrentSnapshotExecutionException(
|
|
|
repositoryName,
|
|
@@ -479,10 +473,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress = currentState.custom(
|
|
|
- SnapshotsInProgress.TYPE,
|
|
|
- SnapshotsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
|
|
|
final String repoName = cloneEntry.repository();
|
|
|
final List<SnapshotsInProgress.Entry> existingEntries = snapshotsInProgress.forRepo(repoName);
|
|
|
final List<SnapshotsInProgress.Entry> updatedEntries = new ArrayList<>(existingEntries.size());
|
|
@@ -492,10 +483,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
if (cloneEntry.snapshot().getSnapshotId().equals(existing.snapshot().getSnapshotId())) {
|
|
|
final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = ImmutableOpenMap
|
|
|
.builder();
|
|
|
- final boolean readyToExecute = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- ).hasExecutingDeletion(repoName) == false;
|
|
|
+ final boolean readyToExecute = SnapshotDeletionsInProgress.get(currentState)
|
|
|
+ .hasExecutingDeletion(repoName) == false;
|
|
|
final InFlightShardSnapshotStates inFlightShardStates;
|
|
|
if (readyToExecute) {
|
|
|
inFlightShardStates = InFlightShardSnapshotStates.forEntries(snapshotsInProgress.forRepo(repoName));
|
|
@@ -666,7 +655,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* Throws {@link RepositoryMissingException} if no repository by the given name is found in the given cluster state.
|
|
|
*/
|
|
|
public static void ensureRepositoryExists(String repoName, ClusterState state) {
|
|
|
- if (state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY).repository(repoName) == null) {
|
|
|
+ if (RepositoriesMetadata.get(state).repository(repoName) == null) {
|
|
|
throw new RepositoryMissingException(repoName);
|
|
|
}
|
|
|
}
|
|
@@ -679,8 +668,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @param state current cluster state
|
|
|
*/
|
|
|
private static void validate(String repositoryName, String snapshotName, ClusterState state) {
|
|
|
- RepositoriesMetadata repositoriesMetadata = state.getMetadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
|
|
|
- if (repositoriesMetadata.repository(repositoryName) == null) {
|
|
|
+ if (RepositoriesMetadata.get(state).repository(repositoryName) == null) {
|
|
|
throw new RepositoryMissingException(repositoryName);
|
|
|
}
|
|
|
validate(repositoryName, snapshotName);
|
|
@@ -717,15 +705,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
|
|
|
ShardGenerations.Builder builder = ShardGenerations.builder();
|
|
|
if (snapshot.isClone()) {
|
|
|
- snapshot.shardsByRepoShardId().entrySet().forEach(c -> builder.put(c.getKey().index(), c.getKey().shardId(), c.getValue()));
|
|
|
+ snapshot.shardsByRepoShardId().forEach((key, value) -> builder.put(key.index(), key.shardId(), value));
|
|
|
} else {
|
|
|
- snapshot.shardsByRepoShardId().entrySet().forEach(c -> {
|
|
|
- final Index index = snapshot.indexByName(c.getKey().indexName());
|
|
|
+ snapshot.shardsByRepoShardId().forEach((key, value) -> {
|
|
|
+ final Index index = snapshot.indexByName(key.indexName());
|
|
|
if (metadata.index(index) == null) {
|
|
|
assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial.";
|
|
|
return;
|
|
|
}
|
|
|
- builder.put(c.getKey().index(), c.getKey().shardId(), c.getValue());
|
|
|
+ builder.put(key.index(), key.shardId(), value);
|
|
|
});
|
|
|
}
|
|
|
return builder.build();
|
|
@@ -816,7 +804,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
try {
|
|
|
if (event.localNodeMaster()) {
|
|
|
// We don't remove old master when master flips anymore. So, we need to check for change in master
|
|
|
- SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(event.state());
|
|
|
final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false;
|
|
|
processExternalChanges(
|
|
|
newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()),
|
|
@@ -850,7 +838,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
|
|
|
private boolean assertConsistentWithClusterState(ClusterState state) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
|
|
|
if (snapshotsInProgress.isEmpty() == false) {
|
|
|
synchronized (endingSnapshots) {
|
|
|
final Set<Snapshot> runningSnapshots = Stream.concat(
|
|
@@ -865,10 +853,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
+ runningSnapshots;
|
|
|
}
|
|
|
}
|
|
|
- final SnapshotDeletionsInProgress snapshotDeletionsInProgress = state.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state);
|
|
|
if (snapshotDeletionsInProgress.hasDeletionsInProgress()) {
|
|
|
synchronized (repositoryOperations.runningDeletions) {
|
|
|
final Set<String> runningDeletes = Stream.concat(
|
|
@@ -886,11 +871,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it
|
|
|
// to be assigned
|
|
|
private static boolean assertNoDanglingSnapshots(ClusterState state) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
- final SnapshotDeletionsInProgress snapshotDeletionsInProgress = state.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
|
|
|
+ final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(state);
|
|
|
final Set<String> reposWithRunningDelete = snapshotDeletionsInProgress.getEntries()
|
|
|
.stream()
|
|
|
.filter(entry -> entry.state() == SnapshotDeletionsInProgress.State.STARTED)
|
|
@@ -943,11 +925,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
RoutingTable routingTable = currentState.routingTable();
|
|
|
- final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
- final SnapshotDeletionsInProgress deletes = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotsInProgress snapshots = SnapshotsInProgress.get(currentState);
|
|
|
+ final SnapshotDeletionsInProgress deletes = SnapshotDeletionsInProgress.get(currentState);
|
|
|
DiscoveryNodes nodes = currentState.nodes();
|
|
|
final EnumSet<State> statesToUpdate;
|
|
|
// If we are reacting to a change in the cluster node configuration we have to update the shard states of both started
|
|
@@ -1070,10 +1049,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
? ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updated).build()
|
|
|
: currentState
|
|
|
).v1();
|
|
|
- for (SnapshotDeletionsInProgress.Entry delete : res.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- ).getEntries()) {
|
|
|
+ for (SnapshotDeletionsInProgress.Entry delete : SnapshotDeletionsInProgress.get(res).getEntries()) {
|
|
|
if (delete.state() == SnapshotDeletionsInProgress.State.STARTED) {
|
|
|
deletionsToExecute.add(delete);
|
|
|
}
|
|
@@ -1088,10 +1064,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
|
|
|
- final SnapshotDeletionsInProgress snapshotDeletionsInProgress = newState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress.get(newState);
|
|
|
if (finishedSnapshots.isEmpty() == false) {
|
|
|
// If we found snapshots that should be finalized as a result of the CS update we try to initiate finalization for
|
|
|
// them
|
|
@@ -1110,7 +1083,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
|
|
|
+ startExecutableClones(SnapshotsInProgress.get(newState), null);
|
|
|
// run newly ready deletes
|
|
|
for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
|
|
|
if (tryEnterRepoLoop(entry.repository())) {
|
|
@@ -1328,9 +1301,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
|
|
|
assert currentlyFinalizing.contains(snapshot.getRepository());
|
|
|
try {
|
|
|
- SnapshotsInProgress.Entry entry = clusterService.state()
|
|
|
- .custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
|
|
|
- .snapshot(snapshot);
|
|
|
+ SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
|
|
|
final String failure = entry.failure();
|
|
|
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
|
|
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
|
|
@@ -1483,10 +1454,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
// Figure out which indices have unsuccessful shards
|
|
|
Set<String> indicesWithUnsuccessfulShards = new HashSet<>();
|
|
|
- entry.shardsByRepoShardId().entrySet().forEach(shard -> {
|
|
|
- final ShardState shardState = shard.getValue().state();
|
|
|
+ entry.shardsByRepoShardId().forEach((key, value) -> {
|
|
|
+ final ShardState shardState = value.state();
|
|
|
if (shardState.failed() || shardState.completed() == false) {
|
|
|
- indicesWithUnsuccessfulShards.add(shard.getKey().indexName());
|
|
|
+ indicesWithUnsuccessfulShards.add(key.indexName());
|
|
|
}
|
|
|
});
|
|
|
|
|
@@ -1573,10 +1544,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
assert readyDeletions(currentState).v1() == currentState
|
|
|
: "Deletes should have been set to ready by finished snapshot deletes and finalizations";
|
|
|
- for (SnapshotDeletionsInProgress.Entry entry : currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- ).getEntries()) {
|
|
|
+ for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(currentState).getEntries()) {
|
|
|
if (entry.repository().equals(repository) && entry.state() == SnapshotDeletionsInProgress.State.STARTED) {
|
|
|
deletionToRun = entry;
|
|
|
break;
|
|
@@ -1613,10 +1581,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @return tuple of an updated cluster state and currently executable snapshot delete operations
|
|
|
*/
|
|
|
private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> readyDeletions(ClusterState currentState) {
|
|
|
- final SnapshotDeletionsInProgress deletions = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress deletions = SnapshotDeletionsInProgress.get(currentState);
|
|
|
if (deletions.hasDeletionsInProgress() == false) {
|
|
|
return Tuple.tuple(currentState, List.of());
|
|
|
}
|
|
@@ -1659,7 +1624,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @return updated cluster state
|
|
|
*/
|
|
|
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
|
|
|
- final SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshots = SnapshotsInProgress.get(state);
|
|
|
ClusterState result = state;
|
|
|
int indexOfEntry = -1;
|
|
|
final List<SnapshotsInProgress.Entry> entryList = snapshots.forRepo(snapshot.getRepository());
|
|
@@ -1833,7 +1798,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
updatedState,
|
|
|
null,
|
|
|
deletionsWithoutSnapshots(
|
|
|
- updatedState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY),
|
|
|
+ SnapshotDeletionsInProgress.get(updatedState),
|
|
|
Collections.singletonList(snapshot.getSnapshotId()),
|
|
|
snapshot.getRepository()
|
|
|
)
|
|
@@ -1941,7 +1906,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
final Set<SnapshotId> snapshotIds = new HashSet<>();
|
|
|
|
|
|
// find in-progress snapshots to delete in cluster state
|
|
|
- final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
|
|
|
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repositoryName)) {
|
|
|
final SnapshotId snapshotId = entry.snapshot().getSnapshotId();
|
|
|
if (Regex.simpleMatch(snapshotNames, snapshotId.getName())) {
|
|
@@ -1996,12 +1961,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
"delete snapshot"
|
|
|
);
|
|
|
|
|
|
- final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
|
|
|
|
|
|
- final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
|
|
|
+ final RestoreInProgress restoreInProgress = RestoreInProgress.get(currentState);
|
|
|
// don't allow snapshot deletions while a restore is taking place,
|
|
|
// otherwise we could end up deleting a snapshot that is being restored
|
|
|
// and the files the restore depends on would all be gone
|
|
@@ -2293,9 +2255,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
|
|
|
// to change in any form.
|
|
|
if (repositoryMetadataStart.equals(
|
|
|
- currentState.getMetadata()
|
|
|
- .<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
|
|
|
- .repository(repository.getMetadata().name())
|
|
|
+ RepositoriesMetadata.get(currentState).repository(repository.getMetadata().name())
|
|
|
)) {
|
|
|
executedTask = true;
|
|
|
return updateTask.execute(currentState);
|
|
@@ -2552,7 +2512,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
// TODO: be more efficient here, we could collect newly ready shard clones as we compute them and then directly start them
|
|
|
// instead of looping over all possible clones to execute
|
|
|
- startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
|
|
|
+ startExecutableClones(SnapshotsInProgress.get(newState), null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2582,7 +2542,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
@Nullable
|
|
|
private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState, SnapshotDeletionsInProgress updatedDeletions) {
|
|
|
- final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
|
|
|
final List<SnapshotsInProgress.Entry> snapshotEntries = new ArrayList<>();
|
|
|
|
|
|
// Keep track of shardIds that we started snapshots for as a result of removing this delete so we don't assign
|
|
@@ -2703,9 +2663,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
for (IndexId indexIdToRefresh : newIndexIdsToRefresh) {
|
|
|
updatedIndexIds.put(indexIdToRefresh, new IndexId(indexIdToRefresh.getName(), UUIDs.randomBase64UUID()));
|
|
|
}
|
|
|
- for (int i = 0; i < snapshotEntries.size(); i++) {
|
|
|
- snapshotEntries.set(i, snapshotEntries.get(i).withUpdatedIndexIds(updatedIndexIds));
|
|
|
- }
|
|
|
+ snapshotEntries.replaceAll(entry -> entry.withUpdatedIndexIds(updatedIndexIds));
|
|
|
}
|
|
|
return changed ? snapshotsInProgress.withUpdatedEntriesForRepo(repoName, snapshotEntries) : null;
|
|
|
}
|
|
@@ -2771,7 +2729,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*
|
|
|
* @param indices Indices to snapshot
|
|
|
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
|
|
|
- * @return list of shard to be included into current snapshot
|
|
|
+ * @return map of shard-id to snapshot-status of all shards included into current snapshot
|
|
|
*/
|
|
|
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
|
|
|
SnapshotsInProgress snapshotsInProgress,
|
|
@@ -2863,7 +2821,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
public static Set<String> snapshottingDataStreams(final ClusterState currentState, final Set<String> dataStreamsToCheck) {
|
|
|
Map<String, DataStream> dataStreams = currentState.metadata().dataStreams();
|
|
|
- return currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
|
|
|
+ return SnapshotsInProgress.get(currentState)
|
|
|
.asStream()
|
|
|
.filter(e -> e.partial() == false)
|
|
|
.flatMap(e -> e.dataStreams().stream())
|
|
@@ -2876,8 +2834,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
public static Set<Index> snapshottingIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
|
|
|
final Set<Index> indices = new HashSet<>();
|
|
|
- for (List<SnapshotsInProgress.Entry> snapshotsInRepo : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
|
|
|
- .entriesByRepo()) {
|
|
|
+ for (List<SnapshotsInProgress.Entry> snapshotsInRepo : SnapshotsInProgress.get(currentState).entriesByRepo()) {
|
|
|
for (final SnapshotsInProgress.Entry entry : snapshotsInRepo) {
|
|
|
if (entry.partial() == false && entry.isClone() == false) {
|
|
|
for (String indexName : entry.indices().keySet()) {
|
|
@@ -3014,7 +2971,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
|
|
|
SnapshotsInProgress computeUpdatedState() {
|
|
|
- final SnapshotsInProgress existing = initialState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress existing = SnapshotsInProgress.get(initialState);
|
|
|
SnapshotsInProgress updated = existing;
|
|
|
for (Map.Entry<String, List<ShardSnapshotUpdate>> updates : updatesByRepo.entrySet()) {
|
|
|
final String repoName = updates.getKey();
|
|
@@ -3484,10 +3441,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
- final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
|
|
|
boolean changed = false;
|
|
|
final List<SnapshotDeletionsInProgress.Entry> remainingEntries = deletionsInProgress.getEntries();
|
|
|
List<SnapshotDeletionsInProgress.Entry> updatedEntries = new ArrayList<>(remainingEntries.size());
|
|
@@ -3500,7 +3454,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
}
|
|
|
final SnapshotDeletionsInProgress updatedDeletions = changed ? SnapshotDeletionsInProgress.of(updatedEntries) : null;
|
|
|
- final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(currentState);
|
|
|
boolean changedSnapshots = false;
|
|
|
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repository)) {
|
|
|
// We failed to read repository data for this delete, it is not the job of SnapshotsService to
|
|
@@ -3652,14 +3606,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionContext) throws Exception {
|
|
|
final ClusterState state = batchExecutionContext.initialState();
|
|
|
final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext(batchExecutionContext);
|
|
|
- final SnapshotsInProgress initialSnapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
|
|
|
+ final SnapshotsInProgress initialSnapshots = SnapshotsInProgress.get(state);
|
|
|
SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState();
|
|
|
for (final var taskContext : batchExecutionContext.taskContexts()) {
|
|
|
if (taskContext.getTask() instanceof CreateSnapshotTask task) {
|
|
|
try {
|
|
|
- final var repoMeta = state.metadata()
|
|
|
- .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY)
|
|
|
- .repository(task.snapshot.getRepository());
|
|
|
+ final var repoMeta = RepositoriesMetadata.get(state).repository(task.snapshot.getRepository());
|
|
|
if (Objects.equals(task.initialRepositoryMetadata, repoMeta)) {
|
|
|
snapshotsInProgress = createSnapshot(task, taskContext, state, snapshotsInProgress);
|
|
|
} else {
|
|
@@ -3701,10 +3653,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
|
|
|
ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
|
|
|
validate(repositoryName, snapshotName, currentState);
|
|
|
- final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
|
|
|
- SnapshotDeletionsInProgress.TYPE,
|
|
|
- SnapshotDeletionsInProgress.EMPTY
|
|
|
- );
|
|
|
+ final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
|
|
|
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
|
|
|
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);
|
|
|
|