|
@@ -2468,8 +2468,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) ->
|
|
|
ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
|
|
|
- new SnapshotShardsUpdateContext(currentState, tasks).applyToEntries(
|
|
|
- currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()).updatedState());
|
|
|
+ new SnapshotShardsUpdateContext(currentState, tasks).computeUpdatedState());
|
|
|
|
|
|
private static boolean isQueued(@Nullable ShardSnapshotStatus status) {
|
|
|
return status != null && status.state() == ShardState.QUEUED;
|
|
@@ -2490,9 +2489,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// current cluster state
|
|
|
private final ClusterState currentState;
|
|
|
|
|
|
- // snapshot entries computed by applying tasks to existing snapshot entries
|
|
|
- private final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
|
|
-
|
|
|
// updates outstanding to be applied to existing snapshot entries
|
|
|
private final List<ShardSnapshotUpdate> unconsumedUpdates;
|
|
|
|
|
@@ -2504,29 +2500,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
unconsumedUpdates = new ArrayList<>(updates);
|
|
|
}
|
|
|
|
|
|
- public ClusterState updatedState() {
|
|
|
+ ClusterState computeUpdatedState() {
|
|
|
+ final List<SnapshotsInProgress.Entry> oldEntries
|
|
|
+ = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries();
|
|
|
+ final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(oldEntries.size());
|
|
|
+ for (SnapshotsInProgress.Entry entry : oldEntries) {
|
|
|
+ newEntries.add(applyToEntry(entry));
|
|
|
+ }
|
|
|
+
|
|
|
if (changedCount > 0) {
|
|
|
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
|
|
|
"[{}] shard snapshots", changedCount, startedCount);
|
|
|
- return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
|
|
|
+ return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(newEntries)).build();
|
|
|
}
|
|
|
return currentState;
|
|
|
}
|
|
|
|
|
|
- SnapshotShardsUpdateContext applyToEntries(List<SnapshotsInProgress.Entry> snapshots) {
|
|
|
- for (SnapshotsInProgress.Entry entry : snapshots) {
|
|
|
- entries.add(applyToEntry(entry));
|
|
|
- }
|
|
|
- return this;
|
|
|
- }
|
|
|
-
|
|
|
private SnapshotsInProgress.Entry applyToEntry(SnapshotsInProgress.Entry entry) {
|
|
|
// Completed snapshots do not require any updates so we just add them to the output list and keep going.
|
|
|
// Also we short circuit if there are no more unconsumed updates to apply.
|
|
|
if (entry.state().completed() || unconsumedUpdates.isEmpty()) {
|
|
|
return entry;
|
|
|
}
|
|
|
- return new EntryContext(entry, unconsumedUpdates.iterator()).updatedEntry();
|
|
|
+ return new EntryContext(entry).computeUpdatedEntry();
|
|
|
}
|
|
|
|
|
|
// Per snapshot entry state
|
|
@@ -2543,12 +2539,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
// builder for updated shard clone status mappings if any could be computed
|
|
|
private ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder = null;
|
|
|
|
|
|
- EntryContext(SnapshotsInProgress.Entry entry, Iterator<ShardSnapshotUpdate> iterator) {
|
|
|
+ EntryContext(SnapshotsInProgress.Entry entry) {
|
|
|
this.entry = entry;
|
|
|
- this.iterator = iterator;
|
|
|
+ this.iterator = unconsumedUpdates.iterator();
|
|
|
}
|
|
|
|
|
|
- SnapshotsInProgress.Entry updatedEntry() {
|
|
|
+ SnapshotsInProgress.Entry computeUpdatedEntry() {
|
|
|
+ assert shardsBuilder == null && clonesBuilder == null : "update context was already used";
|
|
|
+
|
|
|
// loop over all the shard updates that are potentially applicable to the current snapshot entry
|
|
|
while (iterator.hasNext()) {
|
|
|
final ShardSnapshotUpdate update = iterator.next();
|