|
@@ -48,6 +48,7 @@ import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.Consumer;
|
|
@@ -82,6 +83,15 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
this.threadPool = threadPool;
|
|
|
}
|
|
|
|
|
|
+ private static String formatSnapshots(Map<String, List<SnapshotInfo>> snapshotMap) {
|
|
|
+ return snapshotMap.entrySet().stream()
|
|
|
+ .map(e -> e.getKey() + ": [" + e.getValue().stream()
|
|
|
+ .map(si -> si.snapshotId().getName())
|
|
|
+ .collect(Collectors.joining(","))
|
|
|
+ + "]")
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void triggered(SchedulerEngine.Event event) {
|
|
|
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) ||
|
|
@@ -109,6 +119,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
slmStats.retentionFailed();
|
|
|
updateStateWithStats(slmStats);
|
|
|
} finally {
|
|
|
+ logger.info("SLM retention snapshot cleanup task completed with error");
|
|
|
running.set(false);
|
|
|
}
|
|
|
};
|
|
@@ -120,18 +131,20 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
slmStats.retentionRun();
|
|
|
// Find all SLM policies that have retention enabled
|
|
|
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
|
|
|
+ logger.trace("policies with retention enabled: {}", policiesWithRetention.keySet());
|
|
|
|
|
|
// For those policies (there may be more than one for the same repo),
|
|
|
// return the repos that we need to get the snapshots for
|
|
|
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
|
|
|
.map(SnapshotLifecyclePolicy::getRepository)
|
|
|
.collect(Collectors.toSet());
|
|
|
+ logger.trace("fetching snapshots from repositories: {}", repositioriesToFetch);
|
|
|
|
|
|
if (repositioriesToFetch.isEmpty()) {
|
|
|
running.set(false);
|
|
|
+ logger.info("there are no repositories to fetch, SLM retention snapshot cleanup task complete");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
// Finally, asynchronously retrieve all the snapshots, deleting them serially,
|
|
|
// before updating the cluster state with the new metrics and setting 'running'
|
|
|
// back to false
|
|
@@ -139,6 +152,9 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
@Override
|
|
|
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
|
|
|
try {
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots));
|
|
|
+ }
|
|
|
// Find all the snapshots that are past their retention date
|
|
|
final Map<String, List<SnapshotInfo>> snapshotsToBeDeleted = allSnapshots.entrySet().stream()
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
|
@@ -146,11 +162,16 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
|
|
|
.collect(Collectors.toList())));
|
|
|
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("snapshots eligible for deletion: [{}]", formatSnapshots(snapshotsToBeDeleted));
|
|
|
+ }
|
|
|
+
|
|
|
// Finally, delete the snapshots that need to be deleted
|
|
|
maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats);
|
|
|
|
|
|
updateStateWithStats(slmStats);
|
|
|
} finally {
|
|
|
+ logger.info("SLM retention snapshot cleanup task complete");
|
|
|
running.set(false);
|
|
|
}
|
|
|
}
|
|
@@ -239,6 +260,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
.execute(new ActionListener<>() {
|
|
|
@Override
|
|
|
public void onResponse(final GetSnapshotsResponse resp) {
|
|
|
+ if (logger.isTraceEnabled()) {
|
|
|
+ logger.trace("retrieved snapshots: {}",
|
|
|
+ repositories.stream()
|
|
|
+ .flatMap(repo -> resp.getSnapshots(repo).stream().map(si -> si.snapshotId().getName()))
|
|
|
+ .collect(Collectors.toList()));
|
|
|
+ }
|
|
|
Map<String, List<SnapshotInfo>> snapshots = new HashMap<>();
|
|
|
final Set<SnapshotState> retainableStates = Set.of(SnapshotState.SUCCESS, SnapshotState.FAILED, SnapshotState.PARTIAL);
|
|
|
repositories.forEach(repo -> {
|
|
@@ -291,6 +318,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
|
|
|
ClusterState state = clusterService.state();
|
|
|
if (okayToDeleteSnapshots(state)) {
|
|
|
+ logger.trace("there are no snapshots currently running, proceeding with snapshot deletion of [{}]",
|
|
|
+ formatSnapshots(snapshotsToDelete));
|
|
|
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
|
|
} else {
|
|
|
logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
|
|
@@ -300,6 +329,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
new NoSnapshotRunningListener(observer,
|
|
|
newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
|
|
|
try {
|
|
|
+ logger.trace("received cluster state without running snapshots, proceeding with snapshot deletion of [{}]",
|
|
|
+ formatSnapshots(snapshotsToDelete));
|
|
|
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
|
|
|
} finally {
|
|
|
latch.countDown();
|
|
@@ -310,7 +341,11 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
throw new ElasticsearchException(e);
|
|
|
}));
|
|
|
try {
|
|
|
- latch.await();
|
|
|
+ logger.trace("waiting for snapshot deletion to complete");
|
|
|
+ // Wait until we find a cluster state not running a snapshot operation.
|
|
|
+ // If we can't find one within a day, give up and throw an error.
|
|
|
+ latch.await(1, TimeUnit.DAYS);
|
|
|
+ logger.trace("deletion complete");
|
|
|
} catch (InterruptedException e) {
|
|
|
throw new ElasticsearchException(e);
|
|
|
}
|
|
@@ -429,24 +464,28 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|
|
// Cannot delete during a snapshot
|
|
|
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
|
|
|
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
|
|
|
+ logger.trace("deletion cannot proceed as there are snapshots in progress");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// Cannot delete during an existing delete
|
|
|
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
|
|
|
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
|
|
+ logger.trace("deletion cannot proceed as there are snapshot deletions in progress");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// Cannot delete while a repository is being cleaned
|
|
|
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
|
|
|
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
|
|
+ logger.trace("deletion cannot proceed as there are repository cleanups in progress");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// Cannot delete during a restore
|
|
|
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
|
|
|
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
|
|
|
+ logger.trace("deletion cannot proceed as there are snapshot restores in progress");
|
|
|
return false;
|
|
|
}
|
|
|
|