|
@@ -11,12 +11,13 @@ package org.elasticsearch.action.support;
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.common.util.concurrent.RunOnce;
|
|
|
import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.tasks.CancellableTask;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
/**
|
|
|
* Allows an action to fan-out to several sub-actions and accumulate their results, but which reacts to a cancellation by releasing all
|
|
@@ -40,7 +41,7 @@ public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
|
|
|
* @param itemsIterator The items over which to fan out. Iterated on the calling thread.
|
|
|
* @param listener A listener for the final response, which is completed after all the fanned-out actions have completed. It is not
|
|
|
* completed promptly on cancellation. Completed on the thread that handles the final per-item response (or
|
|
|
- * the calling thread if there are no items), or on the cancelling thread if cancelled.
|
|
|
+ * the calling thread if there are no items).
|
|
|
*/
|
|
|
public final void run(@Nullable Task task, Iterator<Item> itemsIterator, ActionListener<FinalResponse> listener) {
|
|
|
|
|
@@ -50,9 +51,9 @@ public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
|
|
|
// outer listener, because we do not want to complete the outer listener until all sub-tasks are complete
|
|
|
final var resultListener = new SubscribableListener<FinalResponse>();
|
|
|
|
|
|
- // Completes resultListener (either on completion or on cancellation). Captures a reference to 'this', but within a 'RunOnce' so it
|
|
|
- // is released promptly when executed.
|
|
|
- final var resultListenerCompleter = new RunOnce(() -> {
|
|
|
+ // Completes resultListener (either on completion or on cancellation). Captures a reference to 'this', but within an
|
|
|
+ // 'AtomicReference' which is cleared, releasing the reference promptly, when executed.
|
|
|
+ final var resultListenerCompleter = new AtomicReference<Runnable>(() -> {
|
|
|
if (cancellableTask != null && cancellableTask.notifyIfCancelled(resultListener)) {
|
|
|
return;
|
|
|
}
|
|
@@ -66,17 +67,24 @@ public abstract class CancellableFanOut<Item, ItemResponse, FinalResponse> {
|
|
|
if (cancellableTask != null) {
|
|
|
cancellableTask.addListener(() -> {
|
|
|
assert cancellableTask.isCancelled();
|
|
|
- resultListenerCompleter.run();
|
|
|
+ // probably on a transport thread and we don't know if any of the callbacks are slow so we must avoid running them by
|
|
|
+ // blocking the thread which might add a subscriber to resultListener until after we've completed it
|
|
|
+ final var semaphore = new Semaphore(0);
|
|
|
+ // resultListenerCompleter is currently either a no-op, or else it immediately completes resultListener with a cancellation
|
|
|
+ // while it has no subscribers, so either way this semaphore is not held for long
|
|
|
+ resultListenerCompleter.getAndSet(semaphore::acquireUninterruptibly).run();
|
|
|
+ semaphore.release();
|
|
|
+ // finally, release refs to all the per-item listeners (without calling onItemFailure, so this is also fast)
|
|
|
cancellableTask.notifyIfCancelled(itemCancellationListener);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
try (var refs = new RefCountingRunnable(() -> {
|
|
|
// When all sub-tasks are complete, pass the result from resultListener to the outer listener.
|
|
|
- resultListenerCompleter.run();
|
|
|
- // If not cancelled then resultListener is always complete by this point so the outer listener is completed on this thread.
|
|
|
- // If it's being concurrently cancelled then the outer listener may be completed with the TaskCancelledException on the
|
|
|
- // cancelling thread.
|
|
|
+ resultListenerCompleter.getAndSet(() -> {}).run();
|
|
|
+ // May block (very briefly) if there's a concurrent cancellation, so that we are sure the resultListener is now complete and
|
|
|
+ // therefore the outer listener is completed on this thread.
|
|
|
+ assert resultListener.isDone();
|
|
|
resultListener.addListener(listener);
|
|
|
})) {
|
|
|
while (itemsIterator.hasNext()) {
|