|
@@ -140,22 +140,30 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
) {
|
|
|
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
|
|
|
if (repos.isEmpty()) {
|
|
|
- listener.onResponse(new GetSnapshotsResponse(Collections.emptyList(), Collections.emptyMap()));
|
|
|
+ listener.onResponse(new GetSnapshotsResponse(Collections.emptyList(), Collections.emptyMap(), null));
|
|
|
return;
|
|
|
}
|
|
|
- final GroupedActionListener<Tuple<Tuple<String, ElasticsearchException>, List<SnapshotInfo>>> groupedActionListener =
|
|
|
+ final GroupedActionListener<Tuple<Tuple<String, ElasticsearchException>, SnapshotsInRepo>> groupedActionListener =
|
|
|
new GroupedActionListener<>(listener.map(responses -> {
|
|
|
assert repos.size() == responses.size();
|
|
|
final List<SnapshotInfo> allSnapshots = responses.stream()
|
|
|
.map(Tuple::v2)
|
|
|
.filter(Objects::nonNull)
|
|
|
- .flatMap(Collection::stream)
|
|
|
+ .flatMap(snapshotsInRepo -> snapshotsInRepo.snapshotInfos.stream())
|
|
|
.collect(Collectors.toUnmodifiableList());
|
|
|
final Map<String, ElasticsearchException> failures = responses.stream()
|
|
|
.map(Tuple::v1)
|
|
|
.filter(Objects::nonNull)
|
|
|
.collect(Collectors.toMap(Tuple::v1, Tuple::v2));
|
|
|
- return new GetSnapshotsResponse(sortSnapshots(allSnapshots, sortBy, after, size, order), failures);
|
|
|
+ final SnapshotsInRepo snInfos = sortSnapshots(allSnapshots, sortBy, after, size, order);
|
|
|
+ final List<SnapshotInfo> snapshotInfos = snInfos.snapshotInfos;
|
|
|
+ return new GetSnapshotsResponse(
|
|
|
+ snapshotInfos,
|
|
|
+ failures,
|
|
|
+ snInfos.hasMore || responses.stream().anyMatch(r -> r.v2() != null && r.v2().hasMore)
|
|
|
+ ? GetSnapshotsRequest.After.from(snapshotInfos.get(snapshotInfos.size() - 1), sortBy).asQueryParam()
|
|
|
+ : null
|
|
|
+ );
|
|
|
}), repos.size());
|
|
|
|
|
|
for (final RepositoryMetadata repo : repos) {
|
|
@@ -193,11 +201,11 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
@Nullable final GetSnapshotsRequest.After after,
|
|
|
int size,
|
|
|
SortOrder order,
|
|
|
- ActionListener<List<SnapshotInfo>> listener
|
|
|
+ ActionListener<SnapshotsInRepo> listener
|
|
|
) {
|
|
|
final Map<String, Snapshot> allSnapshotIds = new HashMap<>();
|
|
|
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
|
|
|
- for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo, sortBy, after, size, order)) {
|
|
|
+ for (SnapshotInfo snapshotInfo : sortedCurrentSnapshots(snapshotsInProgress, repo, sortBy, after, size, order).snapshotInfos) {
|
|
|
Snapshot snapshot = snapshotInfo.snapshot();
|
|
|
allSnapshotIds.put(snapshot.getSnapshotId().getName(), snapshot);
|
|
|
currentSnapshots.add(snapshotInfo);
|
|
@@ -238,7 +246,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
* @param repositoryName repository name
|
|
|
* @return list of snapshots
|
|
|
*/
|
|
|
- private static List<SnapshotInfo> sortedCurrentSnapshots(
|
|
|
+ private static SnapshotsInRepo sortedCurrentSnapshots(
|
|
|
SnapshotsInProgress snapshotsInProgress,
|
|
|
String repositoryName,
|
|
|
GetSnapshotsRequest.SortBy sortBy,
|
|
@@ -272,7 +280,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
@Nullable final GetSnapshotsRequest.After after,
|
|
|
int size,
|
|
|
SortOrder order,
|
|
|
- ActionListener<List<SnapshotInfo>> listener
|
|
|
+ ActionListener<SnapshotsInRepo> listener
|
|
|
) {
|
|
|
if (task.isCancelled()) {
|
|
|
listener.onFailure(new TaskCancelledException("task cancelled"));
|
|
@@ -326,7 +334,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
listener
|
|
|
);
|
|
|
} else {
|
|
|
- final List<SnapshotInfo> snapshotInfos;
|
|
|
+ final SnapshotsInRepo snapshotInfos;
|
|
|
if (repositoryData != null) {
|
|
|
// want non-current snapshots as well, which are found in the repository data
|
|
|
snapshotInfos = buildSimpleSnapshotInfos(toResolve, repo, repositoryData, currentSnapshots, sortBy, after, size, order);
|
|
@@ -361,7 +369,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
@Nullable GetSnapshotsRequest.After after,
|
|
|
int size,
|
|
|
SortOrder order,
|
|
|
- ActionListener<List<SnapshotInfo>> listener
|
|
|
+ ActionListener<SnapshotsInRepo> listener
|
|
|
) {
|
|
|
if (task.isCancelled()) {
|
|
|
listener.onFailure(new TaskCancelledException("task cancelled"));
|
|
@@ -433,7 +441,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
return (snapshots.length == 1 && GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshots[0]));
|
|
|
}
|
|
|
|
|
|
- private static List<SnapshotInfo> buildSimpleSnapshotInfos(
|
|
|
+ private static SnapshotsInRepo buildSimpleSnapshotInfos(
|
|
|
final Set<Snapshot> toResolve,
|
|
|
final String repoName,
|
|
|
final RepositoryData repositoryData,
|
|
@@ -485,7 +493,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
|
|
|
private static final Comparator<SnapshotInfo> BY_NAME = Comparator.comparing(sni -> sni.snapshotId().getName());
|
|
|
|
|
|
- private static List<SnapshotInfo> sortSnapshots(
|
|
|
+ private static SnapshotsInRepo sortSnapshots(
|
|
|
List<SnapshotInfo> snapshotInfos,
|
|
|
GetSnapshotsRequest.SortBy sortBy,
|
|
|
@Nullable GetSnapshotsRequest.After after,
|
|
@@ -514,19 +522,34 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
|
|
|
if (after != null) {
|
|
|
final Predicate<SnapshotInfo> isAfter;
|
|
|
- final String name = after.snapshotName();
|
|
|
+ final String snapshotName = after.snapshotName();
|
|
|
+ final String repoName = after.repoName();
|
|
|
switch (sortBy) {
|
|
|
case START_TIME:
|
|
|
- isAfter = filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(after.value()), name, order);
|
|
|
+ isAfter = filterByLongOffset(SnapshotInfo::startTime, Long.parseLong(after.value()), snapshotName, repoName, order);
|
|
|
break;
|
|
|
case NAME:
|
|
|
- isAfter = order == SortOrder.ASC ? (info -> compareName(name, info) < 0) : (info -> compareName(name, info) > 0);
|
|
|
+ isAfter = order == SortOrder.ASC
|
|
|
+ ? (info -> compareName(snapshotName, repoName, info) < 0)
|
|
|
+ : (info -> compareName(snapshotName, repoName, info) > 0);
|
|
|
break;
|
|
|
case DURATION:
|
|
|
- isAfter = filterByLongOffset(info -> info.endTime() - info.startTime(), Long.parseLong(after.value()), name, order);
|
|
|
+ isAfter = filterByLongOffset(
|
|
|
+ info -> info.endTime() - info.startTime(),
|
|
|
+ Long.parseLong(after.value()),
|
|
|
+ snapshotName,
|
|
|
+ repoName,
|
|
|
+ order
|
|
|
+ );
|
|
|
break;
|
|
|
case INDICES:
|
|
|
- isAfter = filterByLongOffset(info -> info.indices().size(), Integer.parseInt(after.value()), name, order);
|
|
|
+ isAfter = filterByLongOffset(
|
|
|
+ info -> info.indices().size(),
|
|
|
+ Integer.parseInt(after.value()),
|
|
|
+ snapshotName,
|
|
|
+ repoName,
|
|
|
+ order
|
|
|
+ );
|
|
|
break;
|
|
|
default:
|
|
|
throw new AssertionError("unexpected sort column [" + sortBy + "]");
|
|
@@ -535,27 +558,47 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
infos = infos.sorted(order == SortOrder.DESC ? comparator.reversed() : comparator);
|
|
|
if (size != GetSnapshotsRequest.NO_LIMIT) {
|
|
|
- infos = infos.limit(size);
|
|
|
+ infos = infos.limit(size + 1);
|
|
|
}
|
|
|
- return infos.collect(Collectors.toUnmodifiableList());
|
|
|
+ final List<SnapshotInfo> snapshots = infos.collect(Collectors.toUnmodifiableList());
|
|
|
+ boolean hasMore = size != GetSnapshotsRequest.NO_LIMIT && size < snapshots.size();
|
|
|
+ return new SnapshotsInRepo(hasMore ? snapshots.subList(0, size) : snapshots, hasMore);
|
|
|
}
|
|
|
|
|
|
private static Predicate<SnapshotInfo> filterByLongOffset(
|
|
|
ToLongFunction<SnapshotInfo> extractor,
|
|
|
long after,
|
|
|
- String name,
|
|
|
+ String snapshotName,
|
|
|
+ String repoName,
|
|
|
SortOrder order
|
|
|
) {
|
|
|
return order == SortOrder.ASC ? info -> {
|
|
|
final long val = extractor.applyAsLong(info);
|
|
|
- return after < val || (after == val && compareName(name, info) < 0);
|
|
|
+
|
|
|
+ return after < val || (after == val && compareName(snapshotName, repoName, info) < 0);
|
|
|
} : info -> {
|
|
|
final long val = extractor.applyAsLong(info);
|
|
|
- return after > val || (after == val && compareName(name, info) > 0);
|
|
|
+ return after > val || (after == val && compareName(snapshotName, repoName, info) > 0);
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- private static int compareName(String name, SnapshotInfo info) {
|
|
|
- return name.compareTo(info.snapshotId().getName());
|
|
|
+ private static int compareName(String name, String repoName, SnapshotInfo info) {
|
|
|
+ final int res = name.compareTo(info.snapshotId().getName());
|
|
|
+ if (res != 0) {
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+ return repoName.compareTo(info.repository());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final class SnapshotsInRepo {
|
|
|
+
|
|
|
+ private final boolean hasMore;
|
|
|
+
|
|
|
+ private final List<SnapshotInfo> snapshotInfos;
|
|
|
+
|
|
|
+ SnapshotsInRepo(List<SnapshotInfo> snapshotInfos, boolean hasMore) {
|
|
|
+ this.hasMore = hasMore;
|
|
|
+ this.snapshotInfos = snapshotInfos;
|
|
|
+ }
|
|
|
}
|
|
|
}
|