|
@@ -431,7 +431,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
processStartedShards();
|
|
|
}
|
|
|
if (newMaster) {
|
|
|
- endCompletedSnapshots(event.state());
|
|
|
+ // Cleanup all snapshots that have no more work left:
|
|
|
+ // 1. Completed snapshots
|
|
|
+ // 2. Snapshots in state INIT that the previous master failed to start
|
|
|
+ // 3. Snapshots in any other state that have all their shard tasks completed
|
|
|
+ snapshotsInProgress.entries().stream().filter(
|
|
|
+ entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values())
|
|
|
+ ).forEach(entry -> endSnapshot(entry, event.state().metadata()));
|
|
|
}
|
|
|
}
|
|
|
if (newMaster) {
|
|
@@ -444,20 +450,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
assert assertConsistentWithClusterState(event.state());
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Cleanup all snapshots found in the given cluster state that have no more work left:
|
|
|
- * 1. Completed snapshots
|
|
|
- * 2. Snapshots in state INIT that a previous master of an older version failed to start
|
|
|
- * 3. Snapshots in any other state that have all their shard tasks completed
|
|
|
- */
|
|
|
- private void endCompletedSnapshots(ClusterState state) {
|
|
|
- SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
|
|
- assert snapshotsInProgress != null;
|
|
|
- snapshotsInProgress.entries().stream().filter(
|
|
|
- entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values())
|
|
|
- ).forEach(entry -> endSnapshot(entry, state.metadata()));
|
|
|
- }
|
|
|
-
|
|
|
private boolean assertConsistentWithClusterState(ClusterState state) {
|
|
|
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
|
|
if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) {
|
|
@@ -501,7 +493,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
private void processSnapshotsOnRemovedNodes() {
|
|
|
clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() {
|
|
|
|
|
|
- private boolean changed = false;
|
|
|
+ private final Collection<SnapshotsInProgress.Entry> finishedSnapshots = new ArrayList<>();
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
@@ -510,6 +502,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
if (snapshots == null) {
|
|
|
return currentState;
|
|
|
}
|
|
|
+ boolean changed = false;
|
|
|
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
|
|
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
|
|
|
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
|
|
@@ -540,6 +533,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
|
|
|
if (!snapshot.state().completed() && completed(shardsMap.values())) {
|
|
|
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap);
|
|
|
+ finishedSnapshots.add(updatedSnapshot);
|
|
|
} else {
|
|
|
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap);
|
|
|
}
|
|
@@ -567,9 +561,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
- if (changed) {
|
|
|
- endCompletedSnapshots(newState);
|
|
|
- }
|
|
|
+ finishedSnapshots.forEach(entry -> endSnapshot(entry, newState.metadata()));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -577,12 +569,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
private void processStartedShards() {
|
|
|
clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
|
|
|
|
|
|
- private boolean changed = false;
|
|
|
+ private final Collection<SnapshotsInProgress.Entry> finishedSnapshots = new ArrayList<>();
|
|
|
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
RoutingTable routingTable = currentState.routingTable();
|
|
|
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
|
|
+ boolean changed = false;
|
|
|
if (snapshots != null) {
|
|
|
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
|
|
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
|
|
@@ -594,6 +587,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
changed = true;
|
|
|
if (!snapshot.state().completed() && completed(shards.values())) {
|
|
|
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
|
|
|
+ finishedSnapshots.add(updatedSnapshot);
|
|
|
} else {
|
|
|
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
|
|
|
}
|
|
@@ -617,9 +611,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
- if (changed) {
|
|
|
- endCompletedSnapshots(newState);
|
|
|
- }
|
|
|
+ finishedSnapshots.forEach(entry -> endSnapshot(entry, newState.metadata()));
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -1427,7 +1419,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
try {
|
|
|
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
|
|
|
} finally {
|
|
|
- endCompletedSnapshots(newState);
|
|
|
+ // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
|
|
|
+ // state update we check if its state is completed and end it if it is.
|
|
|
+ if (endingSnapshots.contains(request.snapshot()) == false) {
|
|
|
+ final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
|
|
|
+ final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
|
|
|
+ if (updatedEntry.state().completed()) {
|
|
|
+ endSnapshot(updatedEntry, newState.metadata());
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
});
|