Browse Source

Release cancellation listeners on cancellation (#103266)

Today a `CancellableTask` retains references to its listeners even after
they have been invoked, which may keep hold of substantial unnecessary
heap. This commit makes sure to release the listeners as soon as
possible.
David Turner 1 year ago
parent
commit
bf0f09fc33

+ 16 - 11
server/src/main/java/org/elasticsearch/tasks/CancellableTask.java

@@ -9,10 +9,10 @@
 package org.elasticsearch.tasks;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.core.Nullable;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * A task that can be cancelled
@@ -21,7 +21,7 @@ public class CancellableTask extends Task {
 
     private volatile String reason;
     private volatile boolean isCancelled;
-    private final ConcurrentLinkedQueue<CancellationListener> listeners = new ConcurrentLinkedQueue<>();
+    private final SubscribableListener<Void> listeners = new SubscribableListener<>();
 
     public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
         super(id, type, action, description, parentTaskId, headers);
@@ -39,7 +39,7 @@ public class CancellableTask extends Task {
             this.isCancelled = true;
             this.reason = reason;
         }
-        listeners.forEach(CancellationListener::onCancelled);
+        listeners.onResponse(null);
         onCancelled();
     }
 
@@ -74,14 +74,7 @@ public class CancellableTask extends Task {
      * This method adds a listener that needs to be notified if this task is cancelled.
      */
     public final void addListener(CancellationListener listener) {
-        synchronized (this) {
-            if (this.isCancelled == false) {
-                listeners.add(listener);
-            }
-        }
-        if (isCancelled) {
-            listener.onCancelled();
-        }
+        listeners.addListener(new CancellationListenerAdapter(listener));
     }
 
     /**
@@ -127,4 +120,16 @@ public class CancellableTask extends Task {
     public interface CancellationListener {
         void onCancelled();
     }
+
+    private record CancellationListenerAdapter(CancellationListener cancellationListener) implements ActionListener<Void> {
+        @Override
+        public void onResponse(Void unused) {
+            cancellationListener.onCancelled();
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            assert false : e;
+        }
+    }
 }

+ 13 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.tasks.TaskManager;
+import org.elasticsearch.test.ReachabilityChecker;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.FakeTcpChannel;
 import org.elasticsearch.transport.TestTransportChannels;
@@ -649,4 +650,16 @@ public class CancellableTasksTests extends TaskManagerTestCase {
         concurrentNotify.join();
     }
 
+    public void testReleaseListenersOnCancellation() {
+        final CancellableTask task = new CancellableTask(randomLong(), "transport", "action", "", TaskId.EMPTY_TASK_ID, emptyMap());
+        final AtomicBoolean cancelNotified = new AtomicBoolean();
+        final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
+        task.addListener(reachabilityChecker.register(() -> assertTrue(cancelNotified.compareAndSet(false, true))));
+
+        reachabilityChecker.checkReachable();
+        TaskCancelHelper.cancel(task, "simulated");
+        reachabilityChecker.ensureUnreachable();
+        assertTrue(cancelNotified.get());
+    }
+
 }