Browse Source

Introduce RefCountingListener (#92995)

Like `RefCountingRunnable` but it keeps track of a (bounded) set of
exceptions received by the acquired listeners too.
David Turner 2 years ago
parent
commit
e22e0242e1

+ 177 - 0
server/src/main/java/org/elasticsearch/action/support/RefCountingListener.java

@@ -0,0 +1,177 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.core.Releasable;
+
+import java.util.Objects;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A mechanism to complete a listener on the completion of some (dynamic) collection of other actions. Basic usage is as follows:
+ *
+ * <pre>
+ * try (var refs = new RefCountingListener(finalListener)) {
+ *     for (var item : collection) {
+ *         runAsyncAction(item, refs.acquire()); // completes the acquired listener on completion
+ *     }
+ * }
+ * </pre>
+ *
+ * The delegate listener is completed when execution leaves the try-with-resources block and every acquired reference is released. The
+ * {@link RefCountingListener} collects (a bounded number of) exceptions received by its subsidiary listeners, and completes the delegate
+ * listener with an exception if (and only if) any subsidiary listener fails. However, unlike a {@link GroupedActionListener} it leaves it
+ * to the caller to collect the results of successful completions by accumulating them in a data structure of its choice. Also unlike a
+ * {@link GroupedActionListener} there is no need to declare the number of subsidiary listeners up front: listeners can be acquired
+ * dynamically as needed. Finally, you can continue to acquire additional listeners even outside the try-with-resources block, perhaps in a
+ * separate thread, as long as there's at least one listener outstanding:
+ *
+ * <pre>
+ * try (var refs = new RefCountingListener(finalListener)) {
+ *     for (var item : collection) {
+ *         if (condition(item)) {
+ *             runAsyncAction(item, refs.acquire().map(results::add));
+ *         }
+ *     }
+ *     if (flag) {
+ *         runOneOffAsyncAction(refs.acquire().map(results::add));
+ *         return;
+ *     }
+ *     for (var item : otherCollection) {
+ *         var itemRef = refs.acquire(); // delays completion while the background action is pending
+ *         executorService.execute(() -> {
+ *             try {
+ *                 if (condition(item)) {
+ *                     runOtherAsyncAction(item, refs.acquire().map(results::add));
+ *                 }
+ *             } finally {
+ *                 itemRef.onResponse(null);
+ *             }
+ *         });
+ *     }
+ * }
+ * </pre>
+ *
+ * In particular (and also unlike a {@link GroupedActionListener}) this works even if you don't acquire any extra refs at all: in that case,
+ * the delegate listener is completed at the end of the try-with-resources block.
+ */
+public final class RefCountingListener implements Releasable {
+
+    private final ActionListener<Void> delegate;
+    private final RefCountingRunnable refs = new RefCountingRunnable(this::finish);
+
+    private final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+    private final Semaphore exceptionPermits;
+    private final AtomicInteger droppedExceptionsRef = new AtomicInteger();
+
+    /**
+     * Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released.
+     * @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all
+     *                 the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed
+     *                 with failure then the delegate is completed with the first exception received, with other exceptions added to its
+     *                 collection of suppressed exceptions.
+     */
+    public RefCountingListener(ActionListener<Void> delegate) {
+        this(10, delegate);
+    }
+
+    /**
+     * Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released.
+     * @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all
+     *                 the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed
+     *                 with failure then the delegate is completed with the first exception received, with other exceptions added to its
+     *                 collection of suppressed exceptions.
+     * @param maxExceptions The maximum number of exceptions to accumulate on failure.
+     */
+    public RefCountingListener(int maxExceptions, ActionListener<Void> delegate) {
+        if (maxExceptions <= 0) {
+            assert false : maxExceptions;
+            throw new IllegalArgumentException("maxExceptions must be positive");
+        }
+        this.delegate = Objects.requireNonNull(delegate);
+        this.exceptionPermits = new Semaphore(maxExceptions);
+    }
+
+    /**
+     * Release the original reference to this object, which commpletes the delegate {@link ActionListener} if there are no other references.
+     *
+     * It is invalid to call this method more than once. Doing so will trip an assertion if assertions are enabled, but will be ignored
+     * otherwise. This deviates from the contract of {@link java.io.Closeable}.
+     */
+    @Override
+    public void close() {
+        refs.close();
+    }
+
+    private void finish() {
+        try {
+            var exception = exceptionRef.get();
+            if (exception == null) {
+                delegate.onResponse(null);
+            } else {
+                final var droppedExceptions = droppedExceptionsRef.getAndSet(0);
+                if (droppedExceptions > 0) {
+                    exception.addSuppressed(new ElasticsearchException(droppedExceptions + " further exceptions were dropped"));
+                }
+                delegate.onFailure(exception);
+            }
+        } catch (Exception e) {
+            assert false : e;
+            throw e;
+        }
+    }
+
+    /**
+     * Acquire a reference to this object and return a listener which releases it. The delegate {@link ActionListener} is called when all
+     * its references have been released.
+     *
+     * It is invalid to call this method once all references are released. Doing so will trip an assertion if assertions are enabled, and
+     * will throw an {@link IllegalStateException} otherwise.
+     *
+     * It is also invalid to complete the returned listener more than once. Doing so will trip an assertion if assertions are enabled, but
+     * will be ignored otherwise.
+     */
+    public <T> ActionListener<T> acquire() {
+        return new ActionListener<>() {
+            private final Releasable ref = refs.acquire();
+
+            @Override
+            public void onResponse(T unused) {
+                ref.close();
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                if (exceptionPermits.tryAcquire()) {
+                    final var firstException = exceptionRef.compareAndExchange(null, e);
+                    if (firstException != null && firstException != e) {
+                        firstException.addSuppressed(e);
+                    }
+                } else {
+                    droppedExceptionsRef.incrementAndGet();
+                }
+                ref.close();
+            }
+
+            @Override
+            public String toString() {
+                return RefCountingListener.this.toString();
+            }
+        };
+    }
+
+    @Override
+    public String toString() {
+        return "refCounting[" + delegate + "]";
+    }
+}

+ 8 - 3
server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java

@@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * The delegate action is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a
  * {@link CountDown} there is no need to declare the number of subsidiary actions up front (refs can be acquired dynamically as needed) nor
  * does the caller need to check for completion each time a reference is released. Moreover even outside the try-with-resources block you
- * can continue to acquire additional listeners, even in a separate thread, as long as there's at least one listener outstanding:
+ * can continue to acquire additional references, even in a separate thread, as long as there's at least one reference outstanding:
  *
  * <pre>
  * try (var refs = new RefCountingRunnable(finalRunnable)) {
@@ -95,7 +95,11 @@ public final class RefCountingRunnable implements Releasable {
      * Acquire a reference to this object and return an action which releases it. The delegate {@link Runnable} is called when all its
      * references have been released.
      *
-     * Callers must take care to close the returned resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
+     * It is invalid to call this method once all references are released. Doing so will trip an assertion if assertions are enabled, and
+     * will throw an {@link IllegalStateException} otherwise.
+     *
+     * It is also invalid to release the acquired resource more than once. Doing so will trip an assertion if assertions are enabled, but
+     * will be ignored otherwise. This deviates from the contract of {@link java.io.Closeable}.
      */
     public Releasable acquire() {
         if (refCounted.tryIncRef()) {
@@ -116,7 +120,8 @@ public final class RefCountingRunnable implements Releasable {
     /**
      * Release the original reference to this object, which executes the delegate {@link Runnable} if there are no other references.
      *
-     * Callers must take care to close this resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
+     * It is invalid to call this method more than once. Doing so will trip an assertion if assertions are enabled, but will be ignored
+     * otherwise. This deviates from the contract of {@link java.io.Closeable}.
      */
     @Override
     public void close() {

+ 46 - 45
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

@@ -27,10 +27,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.SingleResultDeduplicator;
 import org.elasticsearch.action.StepListener;
-import org.elasticsearch.action.support.CountDownActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -1422,7 +1422,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 indexMetaIdentifiers = null;
             }
 
-            final ActionListener<Void> allMetaListener = new CountDownActionListener(2 + indices.size(), ActionListener.wrap(v -> {
+            try (var allMetaListeners = new RefCountingListener(ActionListener.wrap(v -> {
                 final String slmPolicy = slmPolicy(snapshotInfo);
                 final SnapshotDetails snapshotDetails = new SnapshotDetails(
                     snapshotInfo.state(),
@@ -1445,52 +1445,53 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         }
                     }, onUpdateFailure)
                 );
-            }, onUpdateFailure));
-
-            // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
-            // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
-            // index or global metadata will be compatible with the segments written in this snapshot as well.
-            // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
-            // that decrements the generation it points at
-            final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
-            // Write Global MetaData
-            executor.execute(
-                ActionRunnable.run(
-                    allMetaListener,
-                    () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
-                )
-            );
+            }, onUpdateFailure))) {
+
+                // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method
+                // will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of
+                // the index or global metadata will be compatible with the segments written in this snapshot as well.
+                // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
+                // that decrements the generation it points at
+                final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
+                // Write Global MetaData
+                executor.execute(
+                    ActionRunnable.run(
+                        allMetaListeners.acquire(),
+                        () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
+                    )
+                );
 
-            // write the index metadata for each index in the snapshot
-            for (IndexId index : indices) {
-                executor.execute(ActionRunnable.run(allMetaListener, () -> {
-                    final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
-                    if (writeIndexGens) {
-                        final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
-                        String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
-                        if (metaUUID == null) {
-                            // We don't yet have this version of the metadata so we write it
-                            metaUUID = UUIDs.base64UUID();
-                            INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
-                            indexMetaIdentifiers.put(identifiers, metaUUID);
+                // write the index metadata for each index in the snapshot
+                for (IndexId index : indices) {
+                    executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
+                        final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
+                        if (writeIndexGens) {
+                            final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
+                            String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
+                            if (metaUUID == null) {
+                                // We don't yet have this version of the metadata so we write it
+                                metaUUID = UUIDs.base64UUID();
+                                INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
+                                indexMetaIdentifiers.put(identifiers, metaUUID);
+                            }
+                            indexMetas.put(index, identifiers);
+                        } else {
+                            INDEX_METADATA_FORMAT.write(
+                                clusterMetadata.index(index.getName()),
+                                indexContainer(index),
+                                snapshotId.getUUID(),
+                                compress
+                            );
                         }
-                        indexMetas.put(index, identifiers);
-                    } else {
-                        INDEX_METADATA_FORMAT.write(
-                            clusterMetadata.index(index.getName()),
-                            indexContainer(index),
-                            snapshotId.getUUID(),
-                            compress
-                        );
-                    }
-                }));
+                    }));
+                }
+                executor.execute(
+                    ActionRunnable.run(
+                        allMetaListeners.acquire(),
+                        () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
+                    )
+                );
             }
-            executor.execute(
-                ActionRunnable.run(
-                    allMetaListener,
-                    () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
-                )
-            );
         }, onUpdateFailure);
     }
 

+ 207 - 0
server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java

@@ -0,0 +1,207 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class RefCountingListenerTests extends ESTestCase {
+
+    public void testBasicOperation() throws InterruptedException {
+        final var executed = new AtomicBoolean();
+        final var exceptionCount = new AtomicInteger();
+        final var threads = new Thread[between(0, 3)];
+        final var exceptionLimit = Math.max(1, between(0, threads.length));
+
+        boolean async = false;
+        final var startLatch = new CountDownLatch(1);
+
+        try (var refs = new RefCountingListener(exceptionLimit, new ActionListener<>() {
+            @Override
+            public void onResponse(Void unused) {
+                assertTrue(executed.compareAndSet(false, true));
+                assertEquals(0, exceptionCount.get());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                assertTrue(executed.compareAndSet(false, true));
+                assertThat(exceptionCount.get(), greaterThan(0));
+                Throwable[] suppressed = e.getSuppressed();
+                if (exceptionCount.get() > exceptionLimit) {
+                    assertEquals(exceptionLimit, suppressed.length);
+                    for (int i = 0; i < suppressed.length; i++) {
+                        Throwable throwable = suppressed[i];
+                        if (i == suppressed.length - 1) {
+                            assertThat(
+                                throwable.getMessage(),
+                                equalTo((exceptionCount.get() - exceptionLimit) + " further exceptions were dropped")
+                            );
+                        } else {
+                            assertThat(throwable.getMessage(), equalTo("simulated"));
+                        }
+                    }
+                } else {
+                    assertEquals(exceptionCount.get() - 1, suppressed.length);
+                    for (Throwable throwable : suppressed) {
+                        assertThat(throwable.getMessage(), equalTo("simulated"));
+                    }
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "test listener";
+            }
+        })) {
+            assertEquals("refCounting[test listener]", refs.toString());
+            var listener = refs.acquire();
+            assertThat(listener.toString(), containsString("refCounting[test listener]"));
+            listener.onResponse(null);
+
+            for (int i = 0; i < threads.length; i++) {
+                if (randomBoolean()) {
+                    async = true;
+                    var ref = refs.acquire();
+                    threads[i] = new Thread(() -> {
+                        try {
+                            assertTrue(startLatch.await(10, TimeUnit.SECONDS));
+                        } catch (InterruptedException e) {
+                            throw new AssertionError(e);
+                        }
+                        assertFalse(executed.get());
+                        if (randomBoolean()) {
+                            ref.onResponse(null);
+                        } else {
+                            exceptionCount.incrementAndGet();
+                            ref.onFailure(new ElasticsearchException("simulated"));
+                        }
+                    });
+                }
+            }
+
+            assertFalse(executed.get());
+        }
+
+        assertNotEquals(async, executed.get());
+
+        for (Thread thread : threads) {
+            if (thread != null) {
+                thread.start();
+            }
+        }
+
+        startLatch.countDown();
+
+        for (Thread thread : threads) {
+            if (thread != null) {
+                thread.join();
+            }
+        }
+
+        assertTrue(executed.get());
+    }
+
+    @SuppressWarnings("resource")
+    public void testNullCheck() {
+        expectThrows(NullPointerException.class, () -> new RefCountingListener(between(1, 10), null));
+    }
+
+    public void testValidation() {
+        final var callCount = new AtomicInteger();
+        final var refs = new RefCountingListener(Integer.MAX_VALUE, ActionListener.wrap(callCount::incrementAndGet));
+        refs.close();
+        assertEquals(1, callCount.get());
+
+        for (int i = between(1, 5); i > 0; i--) {
+            final ThrowingRunnable throwingRunnable;
+            final String expectedMessage;
+            if (randomBoolean()) {
+                throwingRunnable = refs::acquire;
+                expectedMessage = RefCountingRunnable.ALREADY_CLOSED_MESSAGE;
+            } else {
+                throwingRunnable = refs::close;
+                expectedMessage = "already closed";
+            }
+
+            assertEquals(expectedMessage, expectThrows(AssertionError.class, throwingRunnable).getMessage());
+            assertEquals(1, callCount.get());
+        }
+    }
+
+    public void testJavaDocExample() {
+        final var flag = new AtomicBoolean();
+        runExample(ActionListener.wrap(() -> assertTrue(flag.compareAndSet(false, true))));
+        assertTrue(flag.get());
+    }
+
+    private void runExample(ActionListener<Void> finalListener) {
+        final var collection = randomList(10, Object::new);
+        final var otherCollection = randomList(10, Object::new);
+        final var flag = randomBoolean();
+        @SuppressWarnings("UnnecessaryLocalVariable")
+        final var executorService = DIRECT_EXECUTOR_SERVICE;
+        final var results = new ArrayList<>();
+
+        try (var refs = new RefCountingListener(finalListener)) {
+            for (var item : collection) {
+                if (condition(item)) {
+                    runAsyncAction(item, refs.acquire().map(results::add));
+                }
+            }
+            if (flag) {
+                runOneOffAsyncAction(refs.acquire().map(results::add));
+                return;
+            }
+            for (var item : otherCollection) {
+                var itemRef = refs.acquire(); // delays completion while the background action is pending
+                executorService.execute(() -> {
+                    try {
+                        if (condition(item)) {
+                            runOtherAsyncAction(item, refs.acquire().map(results::add));
+                        }
+                    } finally {
+                        itemRef.onResponse(null);
+                    }
+                });
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private boolean condition(Object item) {
+        return randomBoolean();
+    }
+
+    @SuppressWarnings("unused")
+    private void runAsyncAction(Object item, ActionListener<Void> listener) {
+        listener.onResponse(null);
+    }
+
+    @SuppressWarnings("unused")
+    private void runOtherAsyncAction(Object item, ActionListener<Void> listener) {
+        listener.onResponse(null);
+    }
+
+    private void runOneOffAsyncAction(ActionListener<Void> listener) {
+        listener.onResponse(null);
+    }
+}

+ 20 - 42
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java

@@ -9,7 +9,7 @@ package org.elasticsearch.blobcache.common;
 
 import org.elasticsearch.Assertions;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.GroupedActionListener;
+import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.core.Nullable;
 
 import java.util.ArrayList;
@@ -243,30 +243,7 @@ public class SparseFileTracker {
                     .collect(Collectors.toList());
         }
 
-        // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
-        // there is no risk of concurrent modification.
-
-        switch (requiredRanges.size()) {
-            case 0 ->
-                // no need to wait for the gaps to be filled, the listener can be executed immediately
-                wrappedListener.onResponse(null);
-            case 1 -> {
-                final Range requiredRange = requiredRanges.get(0);
-                requiredRange.completionListener.addListener(
-                    wrappedListener.map(progress -> null),
-                    Math.min(requiredRange.completionListener.end, subRange.end())
-                );
-            }
-            default -> {
-                final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
-                    requiredRanges.size(),
-                    wrappedListener.map(progress -> null)
-                );
-                requiredRanges.forEach(
-                    r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.end()))
-                );
-            }
-        }
+        subscribeToCompletionListeners(requiredRanges, subRange.end(), wrappedListener);
 
         return Collections.unmodifiableList(gaps);
     }
@@ -332,31 +309,32 @@ public class SparseFileTracker {
             assert invariant();
         }
 
+        subscribeToCompletionListeners(pendingRanges, range.end(), wrappedListener);
+        return true;
+    }
+
+    private void subscribeToCompletionListeners(List<Range> requiredRanges, long rangeEnd, ActionListener<Void> listener) {
         // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
         // there is no risk of concurrent modification.
-
-        switch (pendingRanges.size()) {
-            case 0 -> wrappedListener.onResponse(null);
+        switch (requiredRanges.size()) {
+            case 0 ->
+                // no need to wait for the gaps to be filled, the listener can be executed immediately
+                listener.onResponse(null);
             case 1 -> {
-                final Range pendingRange = pendingRanges.get(0);
-                pendingRange.completionListener.addListener(
-                    wrappedListener.map(progress -> null),
-                    Math.min(pendingRange.completionListener.end, range.end())
+                final Range requiredRange = requiredRanges.get(0);
+                requiredRange.completionListener.addListener(
+                    listener.map(progress -> null),
+                    Math.min(requiredRange.completionListener.end, rangeEnd)
                 );
-                return true;
             }
             default -> {
-                final GroupedActionListener<Long> groupedActionListener = new GroupedActionListener<>(
-                    pendingRanges.size(),
-                    wrappedListener.map(progress -> null)
-                );
-                pendingRanges.forEach(
-                    r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, range.end()))
-                );
-                return true;
+                try (var listeners = new RefCountingListener(listener)) {
+                    for (Range range : requiredRanges) {
+                        range.completionListener.addListener(listeners.acquire(), Math.min(range.completionListener.end, rangeEnd));
+                    }
+                }
             }
         }
-        return true;
     }
 
     private ActionListener<Void> wrapWithAssertions(ActionListener<Void> listener) {