|
@@ -115,7 +115,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
private final ThreadPool threadPool;
|
|
|
|
|
|
- private final Map<Snapshot, List<ActionListener<SnapshotInfo>>> snapshotCompletionListeners = new ConcurrentHashMap<>();
|
|
|
+ private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
|
|
|
+ new ConcurrentHashMap<>();
|
|
|
|
|
|
// Set of snapshots that are currently being initialized by this node
|
|
|
private final Set<Snapshot> initializingSnapshots = Collections.synchronizedSet(new HashSet<>());
|
|
@@ -144,7 +145,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @param listener snapshot completion listener
|
|
|
*/
|
|
|
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
|
|
|
- createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure));
|
|
|
+ createSnapshot(request,
|
|
|
+ ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -432,7 +434,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private static class CleanupAfterErrorListener implements ActionListener<SnapshotInfo> {
|
|
|
+ private static class CleanupAfterErrorListener {
|
|
|
|
|
|
private final ActionListener<Snapshot> userCreateSnapshotListener;
|
|
|
private final Exception e;
|
|
@@ -442,15 +444,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
this.e = e;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void onResponse(SnapshotInfo snapshotInfo) {
|
|
|
- userCreateSnapshotListener.onFailure(e);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- e.addSuppressed(this.e);
|
|
|
- userCreateSnapshotListener.onFailure(e);
|
|
|
+ public void onFailure(@Nullable Exception e) {
|
|
|
+ userCreateSnapshotListener.onFailure(ExceptionsHelper.useOrSuppress(e, this.e));
|
|
|
}
|
|
|
|
|
|
public void onNoLongerMaster() {
|
|
@@ -850,8 +845,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
metadataForSnapshot(entry, metadata),
|
|
|
entry.userMetadata(),
|
|
|
entry.version(),
|
|
|
- ActionListener.wrap(snapshotInfo -> {
|
|
|
- removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
|
|
|
+ ActionListener.wrap(result -> {
|
|
|
+ final SnapshotInfo snapshotInfo = result.v2();
|
|
|
+ removeSnapshotFromClusterState(snapshot, result, null);
|
|
|
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
|
|
|
}, this::onFailure));
|
|
|
}
|
|
@@ -877,11 +873,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
/**
|
|
|
* Removes record of running snapshot from cluster state
|
|
|
* @param snapshot snapshot
|
|
|
- * @param snapshotInfo snapshot info if snapshot was successful
|
|
|
+ * @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful
|
|
|
* @param e exception if snapshot failed, {@code null} otherwise
|
|
|
*/
|
|
|
- private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, @Nullable Exception e) {
|
|
|
- removeSnapshotFromClusterState(snapshot, snapshotInfo, e, null);
|
|
|
+ private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
|
|
|
+ @Nullable Exception e) {
|
|
|
+ removeSnapshotFromClusterState(snapshot, snapshotResult, e, null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -890,9 +887,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @param failure exception if snapshot failed, {@code null} otherwise
|
|
|
* @param listener listener to notify when snapshot information is removed from the cluster state
|
|
|
*/
|
|
|
- private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, @Nullable Exception failure,
|
|
|
- @Nullable CleanupAfterErrorListener listener) {
|
|
|
- assert snapshotInfo != null || failure != null : "Either snapshotInfo or failure must be supplied";
|
|
|
+ private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
|
|
|
+ @Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) {
|
|
|
+ assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied";
|
|
|
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
|
|
|
|
|
|
@Override
|
|
@@ -937,13 +934,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
- if (snapshotInfo == null) {
|
|
|
+ if (snapshotResult == null) {
|
|
|
failSnapshotCompletionListeners(snapshot, failure);
|
|
|
} else {
|
|
|
- final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
|
|
|
+ final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
|
|
|
+ snapshotCompletionListeners.remove(snapshot);
|
|
|
if (completionListeners != null) {
|
|
|
try {
|
|
|
- ActionListener.onResponse(completionListeners, snapshotInfo);
|
|
|
+ ActionListener.onResponse(completionListeners, snapshotResult);
|
|
|
} catch (Exception e) {
|
|
|
logger.warn("Failed to notify listeners", e);
|
|
|
}
|
|
@@ -951,14 +949,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
endingSnapshots.remove(snapshot);
|
|
|
}
|
|
|
if (listener != null) {
|
|
|
- listener.onResponse(snapshotInfo);
|
|
|
+ listener.onFailure(null);
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
|
|
|
- final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
|
|
|
+ final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners = snapshotCompletionListeners.remove(snapshot);
|
|
|
if (completionListeners != null) {
|
|
|
try {
|
|
|
ActionListener.onFailure(completionListeners, e);
|
|
@@ -1059,14 +1057,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
@Override
|
|
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
|
if (runningSnapshot == null) {
|
|
|
- tryDeleteExisting(Priority.NORMAL);
|
|
|
+ threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
|
|
|
+ repositoriesService.repository(repositoryName).getRepositoryData(
|
|
|
+ ActionListener.wrap(repositoryData -> {
|
|
|
+ Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
|
|
|
+ .stream()
|
|
|
+ .filter(s -> s.getName().equals(snapshotName))
|
|
|
+ .findFirst();
|
|
|
+ // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in
|
|
|
+ // the repository is not the one we expected to find when waiting for a finishing snapshot we fail.
|
|
|
+ if (matchedEntry.isPresent()) {
|
|
|
+ deleteCompletedSnapshot(
|
|
|
+ new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), Priority.NORMAL, l);
|
|
|
+ } else {
|
|
|
+ l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
|
|
|
+ }
|
|
|
+ }, l::onFailure))));
|
|
|
return;
|
|
|
}
|
|
|
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
|
|
addListener(runningSnapshot, ActionListener.wrap(
|
|
|
- snapshotInfo -> {
|
|
|
+ result -> {
|
|
|
logger.debug("deleted snapshot completed - deleting files");
|
|
|
- tryDeleteExisting(Priority.IMMEDIATE);
|
|
|
+ deleteCompletedSnapshot(
|
|
|
+ new Snapshot(repositoryName, result.v2().snapshotId()), result.v1().getGenId(), Priority.IMMEDIATE, listener);
|
|
|
},
|
|
|
e -> {
|
|
|
if (abortedDuringInit) {
|
|
@@ -1087,33 +1101,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
}
|
|
|
));
|
|
|
}
|
|
|
-
|
|
|
- private void tryDeleteExisting(Priority priority) {
|
|
|
- threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
|
|
|
- repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> {
|
|
|
- Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
|
|
|
- .stream()
|
|
|
- .filter(s -> s.getName().equals(snapshotName))
|
|
|
- .findFirst();
|
|
|
- // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the
|
|
|
- // repository is not the one we expected to find when waiting for a finishing snapshot we fail.
|
|
|
- // Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot
|
|
|
- // we waited for was concurrently deleted and another snapshot by the same name concurrently created
|
|
|
- // during the context switch from the cluster state thread to the snapshot thread. We still guard against the
|
|
|
- // possibility as a safety measure.
|
|
|
- if (matchedEntry.isPresent() == false
|
|
|
- || (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) {
|
|
|
- if (runningSnapshot != null && matchedEntry.isPresent()) {
|
|
|
- logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]",
|
|
|
- runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName);
|
|
|
- }
|
|
|
- l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
|
|
|
- } else {
|
|
|
- deleteCompletedSnapshot(
|
|
|
- new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l);
|
|
|
- }
|
|
|
- }, l::onFailure))));
|
|
|
- }
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -1421,7 +1408,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|
|
* @param snapshot Snapshot to listen for
|
|
|
* @param listener listener
|
|
|
*/
|
|
|
- private void addListener(Snapshot snapshot, ActionListener<SnapshotInfo> listener) {
|
|
|
+ private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
|
|
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
|
|
|
}
|
|
|
|