|
|
@@ -164,8 +164,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
@Nullable
|
|
|
private final String fromSortValue;
|
|
|
private final int offset;
|
|
|
- @Nullable
|
|
|
- private final SnapshotSortKey.After after;
|
|
|
+ private final Predicate<SnapshotInfo> afterPredicate;
|
|
|
private final int size;
|
|
|
|
|
|
// current state
|
|
|
@@ -210,7 +209,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
this.order = order;
|
|
|
this.fromSortValue = fromSortValue;
|
|
|
this.offset = offset;
|
|
|
- this.after = after;
|
|
|
this.size = size;
|
|
|
this.snapshotsInProgress = snapshotsInProgress;
|
|
|
this.verbose = verbose;
|
|
|
@@ -219,6 +217,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
this.snapshotNamePredicate = SnapshotNamePredicate.forSnapshots(ignoreUnavailable, snapshots);
|
|
|
this.fromSortValuePredicates = SnapshotPredicates.forFromSortValue(fromSortValue, sortBy, order);
|
|
|
this.slmPolicyPredicate = SlmPolicyPredicate.forPolicies(policies);
|
|
|
+ this.afterPredicate = sortBy.getAfterPredicate(after, order);
|
|
|
|
|
|
this.getSnapshotInfoExecutor = new GetSnapshotInfoExecutor(
|
|
|
threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(),
|
|
|
@@ -344,20 +343,15 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
} else {
|
|
|
assert fromSortValuePredicates.isMatchAll() : "filtering is not supported in non-verbose mode";
|
|
|
assert slmPolicyPredicate == SlmPolicyPredicate.MATCH_ALL_POLICIES : "filtering is not supported in non-verbose mode";
|
|
|
- final var currentSnapshots = snapshotsInProgress.forRepo(repo)
|
|
|
- .stream()
|
|
|
- .map(entry -> SnapshotInfo.inProgress(entry).basic())
|
|
|
- .toList();
|
|
|
-
|
|
|
- final SnapshotsInRepo snapshotInfos;
|
|
|
- if (repositoryData != null) {
|
|
|
- // want non-current snapshots as well, which are found in the repository data
|
|
|
- snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots);
|
|
|
- } else {
|
|
|
- // only want current snapshots
|
|
|
- snapshotInfos = sortSnapshotsWithNoOffsetOrLimit(currentSnapshots);
|
|
|
- }
|
|
|
- listener.onResponse(snapshotInfos);
|
|
|
+
|
|
|
+ listener.onResponse(
|
|
|
+ buildSimpleSnapshotInfos(
|
|
|
+ toResolve,
|
|
|
+ repo,
|
|
|
+ repositoryData,
|
|
|
+ snapshotsInProgress.forRepo(repo).stream().map(entry -> SnapshotInfo.inProgress(entry).basic()).toList()
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -446,7 +440,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
|
|
|
.addListener(listener.safeMap(v ->
|
|
|
// no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here
|
|
|
- sortSnapshotsWithNoOffsetOrLimit(snapshots)), executor, threadPool.getThreadContext());
|
|
|
+ applyAfterPredicate(snapshots)), executor, threadPool.getThreadContext());
|
|
|
}
|
|
|
|
|
|
private SnapshotsInRepo buildSimpleSnapshotInfos(
|
|
|
@@ -455,6 +449,11 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
final RepositoryData repositoryData,
|
|
|
final List<SnapshotInfo> currentSnapshots
|
|
|
) {
|
|
|
+ if (repositoryData == null) {
|
|
|
+ // only want current snapshots
|
|
|
+ return applyAfterPredicate(currentSnapshots);
|
|
|
+ } // else want non-current snapshots as well, which are found in the repository data
|
|
|
+
|
|
|
List<SnapshotInfo> snapshotInfos = new ArrayList<>();
|
|
|
for (SnapshotInfo snapshotInfo : currentSnapshots) {
|
|
|
assert snapshotInfo.startTime() == 0L && snapshotInfo.endTime() == 0L && snapshotInfo.totalShards() == 0L : snapshotInfo;
|
|
|
@@ -483,16 +482,16 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
)
|
|
|
);
|
|
|
}
|
|
|
- return sortSnapshotsWithNoOffsetOrLimit(snapshotInfos);
|
|
|
+ return applyAfterPredicate(snapshotInfos);
|
|
|
}
|
|
|
|
|
|
- private SnapshotsInRepo sortSnapshotsWithNoOffsetOrLimit(List<SnapshotInfo> snapshotInfos) {
|
|
|
- return sortSnapshots(snapshotInfos.stream(), snapshotInfos.size(), 0, GetSnapshotsRequest.NO_LIMIT);
|
|
|
+ private SnapshotsInRepo applyAfterPredicate(List<SnapshotInfo> snapshotInfos) {
|
|
|
+ return new SnapshotsInRepo(snapshotInfos.stream().filter(afterPredicate).toList(), snapshotInfos.size(), 0);
|
|
|
}
|
|
|
|
|
|
private SnapshotsInRepo sortSnapshots(Stream<SnapshotInfo> snapshotInfoStream, int totalCount, int offset, int size) {
|
|
|
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
|
|
|
- final var resultsStream = snapshotInfoStream.filter(sortBy.getAfterPredicate(after, order))
|
|
|
+ final var resultsStream = snapshotInfoStream.peek(this::assertSatisfiesAllPredicates)
|
|
|
.sorted(sortBy.getSnapshotInfoComparator(order))
|
|
|
.skip(offset);
|
|
|
if (size == GetSnapshotsRequest.NO_LIMIT) {
|
|
|
@@ -513,6 +512,12 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void assertSatisfiesAllPredicates(SnapshotInfo snapshotInfo) {
|
|
|
+ assert matchesPredicates(snapshotInfo);
|
|
|
+ assert afterPredicate.test(snapshotInfo);
|
|
|
+ assert indices || snapshotInfo.indices().isEmpty();
|
|
|
+ }
|
|
|
+
|
|
|
private boolean matchesPredicates(SnapshotId snapshotId, RepositoryData repositoryData) {
|
|
|
if (fromSortValuePredicates.test(snapshotId, repositoryData) == false) {
|
|
|
return false;
|