|
@@ -106,9 +106,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
|
|
* <ul>
|
|
|
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
|
|
|
* no snapshot is currently running and registers the new snapshot in cluster state</li>
|
|
|
- * <li>When cluster state is updated
|
|
|
- * the {@link #beginSnapshot} method kicks in and initializes
|
|
|
- * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
|
|
|
+ * <li>When the cluster state is updated the {@link #beginSnapshot} method kicks in and populates the list of shards that need to be
|
|
|
+ * snapshotted in cluster state</li>
|
|
|
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
|
|
|
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
|
|
|
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using
|
|
@@ -1006,10 +1005,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
if (endingSnapshots.add(entry.snapshot()) == false) {
|
|
|
return;
|
|
|
}
|
|
|
+ final Snapshot snapshot = entry.snapshot();
|
|
|
+ if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
|
|
|
+ logger.debug("[{}] was aborted before starting", snapshot);
|
|
|
+ removeSnapshotFromClusterState(entry.snapshot(), null,
|
|
|
+ new SnapshotException(snapshot, "Aborted on initialization"));
|
|
|
+ return;
|
|
|
+ }
|
|
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
|
|
@Override
|
|
|
protected void doRun() {
|
|
|
- final Snapshot snapshot = entry.snapshot();
|
|
|
final Repository repository = repositoriesService.repository(snapshot.getRepository());
|
|
|
final String failure = entry.failure();
|
|
|
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
|
@@ -1205,12 +1210,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
*/
|
|
|
private void deleteSnapshot(final Snapshot snapshot, final ActionListener<Void> listener, final long repositoryStateId,
|
|
|
final boolean immediatePriority) {
|
|
|
- logger.info("deleting snapshot [{}]", snapshot);
|
|
|
Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL;
|
|
|
+ logger.info("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]",
|
|
|
+ snapshot, repositoryStateId, priority);
|
|
|
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
|
|
|
|
|
|
boolean waitForSnapshot = false;
|
|
|
|
|
|
+ boolean abortedDuringInit = false;
|
|
|
+
|
|
|
@Override
|
|
|
public ClusterState execute(ClusterState currentState) {
|
|
|
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
|
@@ -1270,6 +1278,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
shards = snapshotEntry.shards();
|
|
|
assert shards.isEmpty();
|
|
|
failure = "Snapshot was aborted during initialization";
|
|
|
+ abortedDuringInit = true;
|
|
|
} else if (state == State.STARTED) {
|
|
|
// snapshot is started - mark every non completed shard as aborted
|
|
|
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
|
|
@@ -1334,19 +1343,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
);
|
|
|
},
|
|
|
e -> {
|
|
|
- logger.warn("deleted snapshot failed - deleting files", e);
|
|
|
- threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
|
|
|
- try {
|
|
|
- deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true);
|
|
|
- } catch (SnapshotMissingException smex) {
|
|
|
- logger.info(() -> new ParameterizedMessage(
|
|
|
- "Tried deleting in-progress snapshot [{}], but it could not be found after failing to abort.",
|
|
|
- smex.getSnapshotName()), e);
|
|
|
- listener.onFailure(new SnapshotException(snapshot,
|
|
|
- "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " +
|
|
|
- "could not be found after failing to abort.", smex));
|
|
|
+ if (abortedDuringInit) {
|
|
|
+ logger.debug(() -> new ParameterizedMessage("Snapshot [{}] was aborted during INIT", snapshot), e);
|
|
|
+ listener.onResponse(null);
|
|
|
+ } else {
|
|
|
+ if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
|
|
|
+ != null) {
|
|
|
+ logger.warn("master failover before deleted snapshot could complete", e);
|
|
|
+ // Just pass the exception to the transport handler as is so it is retried on the new master
|
|
|
+ listener.onFailure(e);
|
|
|
+ } else {
|
|
|
+ logger.warn("deleted snapshot failed", e);
|
|
|
+ listener.onFailure(
|
|
|
+ new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId(), e));
|
|
|
}
|
|
|
- });
|
|
|
+ }
|
|
|
}
|
|
|
));
|
|
|
} else {
|