|
|
@@ -67,7 +67,6 @@ import java.util.function.BiPredicate;
|
|
|
import java.util.function.BooleanSupplier;
|
|
|
import java.util.function.Predicate;
|
|
|
import java.util.function.ToLongFunction;
|
|
|
-import java.util.stream.Stream;
|
|
|
|
|
|
/**
|
|
|
* Transport Action for get snapshots operation
|
|
|
@@ -181,7 +180,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
// results
|
|
|
private final Map<String, ElasticsearchException> failuresByRepository = ConcurrentCollections.newConcurrentMap();
|
|
|
private final Queue<List<SnapshotInfo>> allSnapshotInfos = ConcurrentCollections.newQueue();
|
|
|
- private final AtomicInteger remaining = new AtomicInteger();
|
|
|
private final AtomicInteger totalCount = new AtomicInteger();
|
|
|
|
|
|
GetSnapshotsOperation(
|
|
|
@@ -256,7 +254,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
@Override
|
|
|
public void onResponse(SnapshotsInRepo snapshotsInRepo) {
|
|
|
allSnapshotInfos.add(snapshotsInRepo.snapshotInfos());
|
|
|
- remaining.addAndGet(snapshotsInRepo.remaining());
|
|
|
totalCount.addAndGet(snapshotsInRepo.totalCount());
|
|
|
delegate.onResponse(null);
|
|
|
}
|
|
|
@@ -275,26 +272,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
})
|
|
|
|
|
|
- .addListener(listener.map(ignored -> {
|
|
|
- assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
|
|
|
- cancellableTask.ensureNotCancelled();
|
|
|
- final var sortedSnapshotsInRepos = sortSnapshots(
|
|
|
- allSnapshotInfos.stream().flatMap(Collection::stream),
|
|
|
- totalCount.get(),
|
|
|
- offset,
|
|
|
- size
|
|
|
- );
|
|
|
- final var snapshotInfos = sortedSnapshotsInRepos.snapshotInfos();
|
|
|
- assert indices || snapshotInfos.stream().allMatch(snapshotInfo -> snapshotInfo.indices().isEmpty());
|
|
|
- final int finalRemaining = sortedSnapshotsInRepos.remaining() + remaining.get();
|
|
|
- return new GetSnapshotsResponse(
|
|
|
- snapshotInfos,
|
|
|
- failuresByRepository,
|
|
|
- finalRemaining > 0 ? sortBy.encodeAfterQueryParam(snapshotInfos.get(snapshotInfos.size() - 1)) : null,
|
|
|
- totalCount.get(),
|
|
|
- finalRemaining
|
|
|
- );
|
|
|
- }));
|
|
|
+ .addListener(listener.map(ignored -> buildResponse()));
|
|
|
}
|
|
|
|
|
|
private boolean skipRepository(String repositoryName) {
|
|
|
@@ -486,30 +464,40 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
|
|
|
private SnapshotsInRepo applyAfterPredicate(List<SnapshotInfo> snapshotInfos) {
|
|
|
- return new SnapshotsInRepo(snapshotInfos.stream().filter(afterPredicate).toList(), snapshotInfos.size(), 0);
|
|
|
+ return new SnapshotsInRepo(snapshotInfos.stream().filter(afterPredicate).toList(), snapshotInfos.size());
|
|
|
}
|
|
|
|
|
|
- private SnapshotsInRepo sortSnapshots(Stream<SnapshotInfo> snapshotInfoStream, int totalCount, int offset, int size) {
|
|
|
+ private GetSnapshotsResponse buildResponse() {
|
|
|
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
|
|
|
- final var resultsStream = snapshotInfoStream.peek(this::assertSatisfiesAllPredicates)
|
|
|
+ cancellableTask.ensureNotCancelled();
|
|
|
+ int remaining = 0;
|
|
|
+ final var resultsStream = allSnapshotInfos.stream()
|
|
|
+ .flatMap(Collection::stream)
|
|
|
+ .peek(this::assertSatisfiesAllPredicates)
|
|
|
.sorted(sortBy.getSnapshotInfoComparator(order))
|
|
|
.skip(offset);
|
|
|
+ final List<SnapshotInfo> snapshotInfos;
|
|
|
if (size == GetSnapshotsRequest.NO_LIMIT) {
|
|
|
- return new SnapshotsInRepo(resultsStream.toList(), totalCount, 0);
|
|
|
+ snapshotInfos = resultsStream.toList();
|
|
|
} else {
|
|
|
final var allocateSize = Math.min(size, 1000); // ignore excessively-large sizes in request params
|
|
|
- final var results = new ArrayList<SnapshotInfo>(allocateSize);
|
|
|
- var remaining = 0;
|
|
|
+ snapshotInfos = new ArrayList<>(allocateSize);
|
|
|
for (var iterator = resultsStream.iterator(); iterator.hasNext();) {
|
|
|
final var snapshotInfo = iterator.next();
|
|
|
- if (results.size() < size) {
|
|
|
- results.add(snapshotInfo);
|
|
|
+ if (snapshotInfos.size() < size) {
|
|
|
+ snapshotInfos.add(snapshotInfo);
|
|
|
} else {
|
|
|
remaining += 1;
|
|
|
}
|
|
|
}
|
|
|
- return new SnapshotsInRepo(results, totalCount, remaining);
|
|
|
}
|
|
|
+ return new GetSnapshotsResponse(
|
|
|
+ snapshotInfos,
|
|
|
+ failuresByRepository,
|
|
|
+ remaining > 0 ? sortBy.encodeAfterQueryParam(snapshotInfos.get(snapshotInfos.size() - 1)) : null,
|
|
|
+ totalCount.get(),
|
|
|
+ remaining
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private void assertSatisfiesAllPredicates(SnapshotInfo snapshotInfo) {
|
|
|
@@ -684,9 +672,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private record SnapshotsInRepo(List<SnapshotInfo> snapshotInfos, int totalCount, int remaining) {
|
|
|
- private static final SnapshotsInRepo EMPTY = new SnapshotsInRepo(List.of(), 0, 0);
|
|
|
- }
|
|
|
+ private record SnapshotsInRepo(List<SnapshotInfo> snapshotInfos, int totalCount) {}
|
|
|
|
|
|
/**
|
|
|
* Throttling executor for retrieving {@link SnapshotInfo} instances from the repository without spamming the SNAPSHOT_META threadpool
|