|
|
@@ -255,7 +255,15 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
|
|
|
SubscribableListener.<RepositoryData>newForked(l -> maybeGetRepositoryData(repoName, l))
|
|
|
- .<Void>andThen((l, repositoryData) -> loadSnapshotInfos(repoName, repositoryData, l))
|
|
|
+ .<Void>andThen((repositoryListener, repositoryData) -> {
|
|
|
+ assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
|
|
|
+ cancellableTask.ensureNotCancelled();
|
|
|
+ ensureRequiredNamesPresent(repoName, repositoryData);
|
|
|
+ loadSnapshotInfos(
|
|
|
+ getAsyncSnapshotInfoIterator(repositoriesService.repository(repoName), repositoryData),
|
|
|
+ repositoryListener
|
|
|
+ );
|
|
|
+ })
|
|
|
.addListener(listeners.acquire());
|
|
|
}
|
|
|
}
|
|
|
@@ -281,14 +289,6 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void loadSnapshotInfos(String repositoryName, @Nullable RepositoryData repositoryData, ActionListener<Void> listener) {
|
|
|
- assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
|
|
|
- cancellableTask.ensureNotCancelled();
|
|
|
- final var repository = repositoriesService.repository(repositoryName);
|
|
|
- ensureRequiredNamesPresent(repositoryName, repositoryData);
|
|
|
- loadSnapshotInfos(getAsyncSnapshotInfoIterator(repository, repositoryData), listener);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Check that the repository contains every <i>required</i> name according to {@link #snapshotNamePredicate}.
|
|
|
*
|
|
|
@@ -459,54 +459,45 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
|
|
|
final List<SnapshotInfo> snapshots = new ArrayList<>();
|
|
|
final List<SnapshotInfo> syncSnapshots = Collections.synchronizedList(snapshots);
|
|
|
|
|
|
- SubscribableListener
|
|
|
-
|
|
|
- .<Void>newForked(l -> {
|
|
|
- try (var listeners = new RefCountingListener(l)) {
|
|
|
- ThrottledIterator.run(
|
|
|
- Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()),
|
|
|
- (ref, asyncSnapshotInfo) -> {
|
|
|
- final var refListener = ActionListener.runBefore(listeners.acquire(), ref::close);
|
|
|
- asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
|
|
|
- @Override
|
|
|
- public void onResponse(SnapshotInfo snapshotInfo) {
|
|
|
- if (matchesPredicates(snapshotInfo)) {
|
|
|
- repositoryTotalCount.incrementAndGet();
|
|
|
- if (afterPredicate.test(snapshotInfo)) {
|
|
|
- syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
|
|
|
- }
|
|
|
- }
|
|
|
- refListener.onResponse(null);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- if (ignoreUnavailable) {
|
|
|
- logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
|
|
|
- refListener.onResponse(null);
|
|
|
- } else {
|
|
|
- refListener.onFailure(e);
|
|
|
- }
|
|
|
+ try (var listeners = new RefCountingListener(listener)) {
|
|
|
+ final var iterationCompleteListener = listeners.acquire(ignored -> {
|
|
|
+ totalCount.addAndGet(repositoryTotalCount.get());
|
|
|
+ // no need to synchronize access to snapshots: all writes happen-before this read
|
|
|
+ resultsCount.addAndGet(snapshots.size());
|
|
|
+ allSnapshotInfos.add(snapshots);
|
|
|
+ });
|
|
|
+ ThrottledIterator.run(
|
|
|
+ Iterators.failFast(asyncSnapshotInfoIterator, () -> cancellableTask.isCancelled() || listeners.isFailing()),
|
|
|
+ (ref, asyncSnapshotInfo) -> {
|
|
|
+ final var refListener = ActionListener.runBefore(listeners.acquire(), ref::close);
|
|
|
+ asyncSnapshotInfo.getSnapshotInfo(new ActionListener<>() {
|
|
|
+ @Override
|
|
|
+ public void onResponse(SnapshotInfo snapshotInfo) {
|
|
|
+ if (matchesPredicates(snapshotInfo)) {
|
|
|
+ repositoryTotalCount.incrementAndGet();
|
|
|
+ if (afterPredicate.test(snapshotInfo)) {
|
|
|
+ syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
|
|
|
}
|
|
|
- });
|
|
|
- },
|
|
|
- getSnapshotInfoExecutor.getMaxRunningTasks(),
|
|
|
- () -> {},
|
|
|
- () -> {}
|
|
|
- );
|
|
|
- }
|
|
|
- })
|
|
|
-
|
|
|
- // no need to synchronize access to snapshots: all writes happen-before this read
|
|
|
- .andThenAccept(ignored -> addResults(repositoryTotalCount.get(), snapshots))
|
|
|
-
|
|
|
- .addListener(listener);
|
|
|
- }
|
|
|
+ }
|
|
|
+ refListener.onResponse(null);
|
|
|
+ }
|
|
|
|
|
|
- private void addResults(int repositoryTotalCount, List<SnapshotInfo> snapshots) {
|
|
|
- totalCount.addAndGet(repositoryTotalCount);
|
|
|
- resultsCount.addAndGet(snapshots.size());
|
|
|
- allSnapshotInfos.add(snapshots);
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception e) {
|
|
|
+ if (ignoreUnavailable) {
|
|
|
+ logger.warn(Strings.format("failed to fetch snapshot info for [%s]", asyncSnapshotInfo), e);
|
|
|
+ refListener.onResponse(null);
|
|
|
+ } else {
|
|
|
+ refListener.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ },
|
|
|
+ getSnapshotInfoExecutor.getMaxRunningTasks(),
|
|
|
+ () -> {},
|
|
|
+ () -> iterationCompleteListener.onResponse(null)
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private GetSnapshotsResponse buildResponse() {
|