Browse Source

Lower Memory Usage of SnapshotRetentionTask (#71092)

Sometimes `SnapshotInfo` can be non-trivial in size when containing
a number of exceptions. This commit makes it so we don't retain these
objects all the way until the deletion finishes.
Armin Braun 4 years ago
parent
commit
33c918600d

+ 20 - 15
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

@@ -18,6 +18,7 @@ import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.snapshots.SnapshotId;
 import org.elasticsearch.snapshots.SnapshotInfo;
@@ -143,14 +144,17 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
                         logger.trace("retrieved snapshots: [{}]", formatSnapshots(allSnapshots));
                     }
                     // Find all the snapshots that are past their retention date
-                    final Map<String, List<SnapshotInfo>> snapshotsToBeDeleted = allSnapshots.entrySet().stream()
+                    final Map<String, List<Tuple<SnapshotId, String>>> snapshotsToBeDeleted = allSnapshots.entrySet().stream()
                             .collect(Collectors.toMap(Map.Entry::getKey,
                                     e -> e.getValue().stream()
                                             .filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention))
+                                            // SnapshotInfo instances can be quite large in case they contain e.g. a large collection of
+                                            // exceptions so we extract the only two things (id + policy id) here so they can be GCed
+                                            .map(snapshotInfo -> Tuple.tuple(snapshotInfo.snapshotId(), getPolicyId(snapshotInfo)))
                                             .collect(Collectors.toList())));
 
                     if (logger.isTraceEnabled()) {
-                        logger.trace("snapshots eligible for deletion: [{}]", formatSnapshots(snapshotsToBeDeleted));
+                        logger.trace("snapshots eligible for deletion: [{}]", snapshotsToBeDeleted);
                     }
 
                     // Finally, delete the snapshots that need to be deleted
@@ -272,7 +276,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
                 " to have a policy in its metadata, but it did not"));
     }
 
-    void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
+    void deleteSnapshots(Map<String, List<Tuple<SnapshotId, String>>> snapshotsToDelete,
                          SnapshotLifecycleStats slmStats,
                          ActionListener<Void> listener) {
         int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum();
@@ -293,9 +297,9 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
                             logger.debug("total elapsed time for deletion of [{}] snapshots: {}", deleted, totalElapsedTime);
                             slmStats.deletionTime(totalElapsedTime);
                         }), snapshotsToDelete.size());
-        for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
+        for (Map.Entry<String, List<Tuple<SnapshotId, String>>> entry : snapshotsToDelete.entrySet()) {
             String repo = entry.getKey();
-            List<SnapshotInfo> snapshots = entry.getValue();
+            List<Tuple<SnapshotId, String>> snapshots = entry.getValue();
             if (snapshots.isEmpty() == false) {
                 deleteSnapshots(slmStats, deleted, failed, repo, snapshots, allDeletesListener);
             }
@@ -303,34 +307,35 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
     }
 
     private void deleteSnapshots(SnapshotLifecycleStats slmStats, AtomicInteger deleted, AtomicInteger failed, String repo,
-                                 List<SnapshotInfo> snapshots, ActionListener<Void> listener) {
+                                 List<Tuple<SnapshotId, String>> snapshots, ActionListener<Void> listener) {
 
         final ActionListener<Void> allDeletesListener =
                 new GroupedActionListener<>(listener.map(v -> null), snapshots.size());
-        for (SnapshotInfo info : snapshots) {
-            if (runningDeletions.add(info.snapshotId()) == false) {
+        for (Tuple<SnapshotId, String> info : snapshots) {
+            final SnapshotId snapshotId = info.v1();
+            if (runningDeletions.add(snapshotId) == false) {
                 // snapshot is already being deleted, no need to start another delete job for it
                 allDeletesListener.onResponse(null);
                 continue;
             }
             boolean success = false;
             try {
-                final String policyId = getPolicyId(info);
+                final String policyId = info.v2();
                 final long deleteStartTime = nowNanoSupplier.getAsLong();
                 // TODO: Use snapshot multi-delete instead of this loop if all nodes in the cluster support it
                 //       i.e are newer or equal to SnapshotsService#MULTI_DELETE_VERSION
-                deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.runAfter(
+                deleteSnapshot(policyId, repo, snapshotId, slmStats, ActionListener.runAfter(
                         ActionListener.wrap(acknowledgedResponse -> {
                             deleted.incrementAndGet();
                             assert acknowledgedResponse.isAcknowledged();
                             historyStore.putAsync(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
-                                    info.snapshotId().getName(), policyId, repo));
+                                    snapshotId.getName(), policyId, repo));
                             allDeletesListener.onResponse(null);
                         }, e -> {
                             failed.incrementAndGet();
                             try {
                                 final SnapshotHistoryItem result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
-                                        info.snapshotId().getName(), policyId, repo, e);
+                                    snapshotId.getName(), policyId, repo, e);
                                 historyStore.putAsync(result);
                             } catch (IOException ex) {
                                 // This shouldn't happen unless there's an issue with serializing the original exception
@@ -341,17 +346,17 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
                                 allDeletesListener.onFailure(e);
                             }
                         }), () -> {
-                            runningDeletions.remove(info.snapshotId());
+                            runningDeletions.remove(snapshotId);
                             long finishTime = nowNanoSupplier.getAsLong();
                             TimeValue deletionTime = TimeValue.timeValueNanos(finishTime - deleteStartTime);
-                            logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), deletionTime);
+                            logger.debug("elapsed time for deletion of [{}] snapshot: {}", snapshotId, deletionTime);
                         }));
                 success = true;
             } catch (Exception e) {
                 listener.onFailure(e);
             } finally {
                 if (success == false) {
-                    runningDeletions.remove(info.snapshotId());
+                    runningDeletions.remove(snapshotId);
                 }
             }
         }