|  | @@ -33,7 +33,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.io.stream.StreamOutput;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.unit.ByteSizeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.util.concurrent.ThrottledIterator;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.xcontent.StatusToXContentObject;
 | 
	
		
			
				|  |  | +import org.elasticsearch.core.Releasable;
 | 
	
		
			
				|  |  |  import org.elasticsearch.core.TimeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.repositories.RepositoriesService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.repositories.Repository;
 | 
	
	
		
			
				|  | @@ -56,6 +58,7 @@ import java.util.Arrays;
 | 
	
		
			
				|  |  |  import java.util.Collection;
 | 
	
		
			
				|  |  |  import java.util.Collections;
 | 
	
		
			
				|  |  |  import java.util.HashSet;
 | 
	
		
			
				|  |  | +import java.util.Iterator;
 | 
	
		
			
				|  |  |  import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  |  import java.util.Queue;
 | 
	
	
		
			
				|  | @@ -63,6 +66,7 @@ import java.util.Random;
 | 
	
		
			
				|  |  |  import java.util.Set;
 | 
	
		
			
				|  |  |  import java.util.concurrent.Semaphore;
 | 
	
		
			
				|  |  |  import java.util.concurrent.atomic.AtomicReference;
 | 
	
		
			
				|  |  | +import java.util.function.Consumer;
 | 
	
		
			
				|  |  |  import java.util.function.LongSupplier;
 | 
	
		
			
				|  |  |  import java.util.stream.IntStream;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -357,7 +361,7 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
 | 
	
		
			
				|  |  |          // choose the blob path nondeterministically to avoid clashes, assuming that the actual path doesn't matter for reproduction
 | 
	
		
			
				|  |  |          private final String blobPath = "temp-analysis-" + UUIDs.randomBase64UUID();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        private final Queue<VerifyBlobTask> queue = ConcurrentCollections.newQueue();
 | 
	
		
			
				|  |  | +        private final Queue<Consumer<Releasable>> queue = ConcurrentCollections.newQueue();
 | 
	
		
			
				|  |  |          private final AtomicReference<Exception> failure = new AtomicReference<>();
 | 
	
		
			
				|  |  |          private final Semaphore innerFailures = new Semaphore(5); // limit the number of suppressed failures
 | 
	
		
			
				|  |  |          private final RefCountingRunnable requestRefs = new RefCountingRunnable(this::runCleanUp);
 | 
	
	
		
			
				|  | @@ -447,80 +451,87 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
 | 
	
		
			
				|  |  |                  final long targetLength = blobSizes.get(i);
 | 
	
		
			
				|  |  |                  final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
 | 
	
		
			
				|  |  |                  final boolean abortWrite = smallBlob && request.isAbortWritePermitted() && rarely(random);
 | 
	
		
			
				|  |  | -                final VerifyBlobTask verifyBlobTask = new VerifyBlobTask(
 | 
	
		
			
				|  |  | -                    nodes.get(random.nextInt(nodes.size())),
 | 
	
		
			
				|  |  | -                    new BlobAnalyzeAction.Request(
 | 
	
		
			
				|  |  | -                        request.getRepositoryName(),
 | 
	
		
			
				|  |  | -                        blobPath,
 | 
	
		
			
				|  |  | -                        "test-blob-" + i + "-" + UUIDs.randomBase64UUID(random),
 | 
	
		
			
				|  |  | -                        targetLength,
 | 
	
		
			
				|  |  | -                        random.nextLong(),
 | 
	
		
			
				|  |  | -                        nodes,
 | 
	
		
			
				|  |  | -                        request.getReadNodeCount(),
 | 
	
		
			
				|  |  | -                        request.getEarlyReadNodeCount(),
 | 
	
		
			
				|  |  | -                        smallBlob && rarely(random),
 | 
	
		
			
				|  |  | -                        repository.supportURLRepo()
 | 
	
		
			
				|  |  | -                            && repository.hasAtomicOverwrites()
 | 
	
		
			
				|  |  | -                            && smallBlob
 | 
	
		
			
				|  |  | -                            && rarely(random)
 | 
	
		
			
				|  |  | -                            && abortWrite == false,
 | 
	
		
			
				|  |  | -                        abortWrite
 | 
	
		
			
				|  |  | -                    )
 | 
	
		
			
				|  |  | +                final BlobAnalyzeAction.Request blobAnalyzeRequest = new BlobAnalyzeAction.Request(
 | 
	
		
			
				|  |  | +                    this.request.getRepositoryName(),
 | 
	
		
			
				|  |  | +                    blobPath,
 | 
	
		
			
				|  |  | +                    "test-blob-" + i + "-" + UUIDs.randomBase64UUID(random),
 | 
	
		
			
				|  |  | +                    targetLength,
 | 
	
		
			
				|  |  | +                    random.nextLong(),
 | 
	
		
			
				|  |  | +                    nodes,
 | 
	
		
			
				|  |  | +                    this.request.getReadNodeCount(),
 | 
	
		
			
				|  |  | +                    this.request.getEarlyReadNodeCount(),
 | 
	
		
			
				|  |  | +                    smallBlob && rarely(random),
 | 
	
		
			
				|  |  | +                    repository.supportURLRepo() && repository.hasAtomicOverwrites() && smallBlob && rarely(random) && abortWrite == false,
 | 
	
		
			
				|  |  | +                    abortWrite
 | 
	
		
			
				|  |  |                  );
 | 
	
		
			
				|  |  | -                queue.add(verifyBlobTask);
 | 
	
		
			
				|  |  | +                final DiscoveryNode node = nodes.get(random.nextInt(nodes.size()));
 | 
	
		
			
				|  |  | +                queue.add(ref -> runBlobAnalysis(ref, blobAnalyzeRequest, node));
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            try (var ignored = requestRefs) {
 | 
	
		
			
				|  |  | -                for (int i = 0; i < request.getConcurrency(); i++) {
 | 
	
		
			
				|  |  | -                    processNextTask();
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +            ThrottledIterator.run(
 | 
	
		
			
				|  |  | +                getQueueIterator(),
 | 
	
		
			
				|  |  | +                (ref, task) -> task.accept(ref),
 | 
	
		
			
				|  |  | +                request.getConcurrency(),
 | 
	
		
			
				|  |  | +                () -> {},
 | 
	
		
			
				|  |  | +                requestRefs::close
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          private boolean rarely(Random random) {
 | 
	
		
			
				|  |  |              return random.nextDouble() < request.getRareActionProbability();
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        private void processNextTask() {
 | 
	
		
			
				|  |  | -            final VerifyBlobTask thisTask = queue.poll();
 | 
	
		
			
				|  |  | -            if (isRunning() && thisTask != null) {
 | 
	
		
			
				|  |  | -                logger.trace("processing [{}]", thisTask);
 | 
	
		
			
				|  |  | -                // NB although all this is on the SAME thread, the per-blob verification runs on a SNAPSHOT thread so we don't have to worry
 | 
	
		
			
				|  |  | -                // about local requests resulting in a stack overflow here
 | 
	
		
			
				|  |  | -                final TransportRequestOptions transportRequestOptions = TransportRequestOptions.timeout(
 | 
	
		
			
				|  |  | -                    TimeValue.timeValueMillis(timeoutTimeMillis - currentTimeMillisSupplier.getAsLong())
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | -                transportService.sendChildRequest(
 | 
	
		
			
				|  |  | -                    thisTask.node,
 | 
	
		
			
				|  |  | -                    BlobAnalyzeAction.NAME,
 | 
	
		
			
				|  |  | -                    thisTask.request,
 | 
	
		
			
				|  |  | -                    task,
 | 
	
		
			
				|  |  | -                    transportRequestOptions,
 | 
	
		
			
				|  |  | -                    new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
 | 
	
		
			
				|  |  | -                        @Override
 | 
	
		
			
				|  |  | -                        public void onResponse(BlobAnalyzeAction.Response response) {
 | 
	
		
			
				|  |  | -                            logger.trace("finished [{}]", thisTask);
 | 
	
		
			
				|  |  | -                            if (thisTask.request.getAbortWrite() == false) {
 | 
	
		
			
				|  |  | -                                expectedBlobs.add(thisTask.request.getBlobName()); // each task cleans up its own mess on failure
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -                            if (request.detailed) {
 | 
	
		
			
				|  |  | -                                synchronized (responses) {
 | 
	
		
			
				|  |  | -                                    responses.add(response);
 | 
	
		
			
				|  |  | -                                }
 | 
	
		
			
				|  |  | -                            }
 | 
	
		
			
				|  |  | -                            summary.add(response);
 | 
	
		
			
				|  |  | -                            processNextTask();
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | +        private Iterator<Consumer<Releasable>> getQueueIterator() {
 | 
	
		
			
				|  |  | +            return new Iterator<>() {
 | 
	
		
			
				|  |  | +                Consumer<Releasable> nextItem = queue.poll();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                        @Override
 | 
	
		
			
				|  |  | -                        public void onFailure(Exception exp) {
 | 
	
		
			
				|  |  | -                            logger.debug(() -> "failed [" + thisTask + "]", exp);
 | 
	
		
			
				|  |  | -                            fail(exp);
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                public boolean hasNext() {
 | 
	
		
			
				|  |  | +                    return isRunning() && nextItem != null;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                @Override
 | 
	
		
			
				|  |  | +                public Consumer<Releasable> next() {
 | 
	
		
			
				|  |  | +                    assert nextItem != null;
 | 
	
		
			
				|  |  | +                    final var currentItem = nextItem;
 | 
	
		
			
				|  |  | +                    nextItem = queue.poll();
 | 
	
		
			
				|  |  | +                    return currentItem;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            };
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private void runBlobAnalysis(Releasable ref, final BlobAnalyzeAction.Request request, DiscoveryNode node) {
 | 
	
		
			
				|  |  | +            logger.trace("processing [{}] on [{}]", request, node);
 | 
	
		
			
				|  |  | +            // NB although all this is on the SAME thread, the per-blob verification runs on a SNAPSHOT thread so we don't have to worry
 | 
	
		
			
				|  |  | +            // about local requests resulting in a stack overflow here
 | 
	
		
			
				|  |  | +            transportService.sendChildRequest(
 | 
	
		
			
				|  |  | +                node,
 | 
	
		
			
				|  |  | +                BlobAnalyzeAction.NAME,
 | 
	
		
			
				|  |  | +                request,
 | 
	
		
			
				|  |  | +                task,
 | 
	
		
			
				|  |  | +                TransportRequestOptions.timeout(TimeValue.timeValueMillis(timeoutTimeMillis - currentTimeMillisSupplier.getAsLong())),
 | 
	
		
			
				|  |  | +                new ActionListenerResponseHandler<>(ActionListener.releaseAfter(new ActionListener<>() {
 | 
	
		
			
				|  |  | +                    @Override
 | 
	
		
			
				|  |  | +                    public void onResponse(BlobAnalyzeAction.Response response) {
 | 
	
		
			
				|  |  | +                        logger.trace("finished [{}] on [{}]", request, node);
 | 
	
		
			
				|  |  | +                        if (request.getAbortWrite() == false) {
 | 
	
		
			
				|  |  | +                            expectedBlobs.add(request.getBlobName()); // each task cleans up its own mess on failure
 | 
	
		
			
				|  |  |                          }
 | 
	
		
			
				|  |  | -                    }, requestRefs.acquire()), BlobAnalyzeAction.Response::new)
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +                        if (AsyncAction.this.request.detailed) {
 | 
	
		
			
				|  |  | +                            synchronized (responses) {
 | 
	
		
			
				|  |  | +                                responses.add(response);
 | 
	
		
			
				|  |  | +                            }
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  | +                        summary.add(response);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +                    @Override
 | 
	
		
			
				|  |  | +                    public void onFailure(Exception exp) {
 | 
	
		
			
				|  |  | +                        logger.debug(() -> "failed [" + request + "] on [" + node + "]", exp);
 | 
	
		
			
				|  |  | +                        fail(exp);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }, ref), BlobAnalyzeAction.Response::new)
 | 
	
		
			
				|  |  | +            );
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          private BlobContainer getBlobContainer() {
 | 
	
	
		
			
				|  | @@ -634,8 +645,6 @@ public class RepositoryAnalyzeAction extends ActionType<RepositoryAnalyzeAction.
 | 
	
		
			
				|  |  |                  );
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        private record VerifyBlobTask(DiscoveryNode node, BlobAnalyzeAction.Request request) {}
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public static class Request extends ActionRequest {
 |