Browse Source

Remove timeout task after completing cluster state publication (#40411)

Each cluster state publication schedules a cancellation task with the provided publication timeout
(30s by default). This scheduled cancellation keeps a reference to the publication, and therefore the
full cluster state that was published. In case of frequently updating a large cluster state, this results
in a large number of cancellation tasks keeping references to all previously published cluster states.
Yannick Welsch 6 years ago
parent
commit
61e8b280cd

+ 18 - 14
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -62,6 +62,7 @@ import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
 import org.elasticsearch.discovery.PeerFinder;
 import org.elasticsearch.discovery.SeedHostsProvider;
 import org.elasticsearch.discovery.SeedHostsResolver;
+import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool.Names;
 import org.elasticsearch.transport.TransportResponse.Empty;
 import org.elasticsearch.transport.TransportService;
@@ -983,20 +984,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                     new ListenableFuture<>(), ackListener, publishListener);
                 currentPublication = Optional.of(publication);
 
-                transportService.getThreadPool().schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        synchronized (mutex) {
-                            publication.cancel("timed out after " + publishTimeout);
-                        }
-                    }
-
-                    @Override
-                    public String toString() {
-                        return "scheduled timeout for " + publication;
-                    }
-                }, publishTimeout, Names.GENERIC);
-
                 final DiscoveryNodes publishNodes = publishRequest.getAcceptedState().nodes();
                 leaderChecker.setCurrentNodes(publishNodes);
                 followersChecker.setCurrentNodes(publishNodes);
@@ -1161,6 +1148,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
         private final AckListener ackListener;
         private final ActionListener<Void> publishListener;
         private final PublicationTransportHandler.PublicationContext publicationContext;
+        private final Scheduler.ScheduledCancellable scheduledCancellable;
 
         // We may not have accepted our own state before receiving a join from another node, causing its join to be rejected (we cannot
         // safely accept a join whose last-accepted term/version is ahead of ours), so store them up and process them at the end.
@@ -1201,6 +1189,19 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
             this.localNodeAckEvent = localNodeAckEvent;
             this.ackListener = ackListener;
             this.publishListener = publishListener;
+            this.scheduledCancellable = transportService.getThreadPool().schedule(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (mutex) {
+                        cancel("timed out after " + publishTimeout);
+                    }
+                }
+
+                @Override
+                public String toString() {
+                    return "scheduled timeout for " + this;
+                }
+            }, publishTimeout, Names.GENERIC);
         }
 
         private void removePublicationAndPossiblyBecomeCandidate(String reason) {
@@ -1242,6 +1243,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                                 synchronized (mutex) {
                                     removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
                                 }
+                                scheduledCancellable.cancel();
                                 ackListener.onNodeAck(getLocalNode(), e);
                                 publishListener.onFailure(e);
                             }
@@ -1271,6 +1273,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                                     }
                                     lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
                                 }
+                                scheduledCancellable.cancel();
                                 ackListener.onNodeAck(getLocalNode(), null);
                                 publishListener.onResponse(null);
                             }
@@ -1281,6 +1284,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                 public void onFailure(Exception e) {
                     assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
                     removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
+                    scheduledCancellable.cancel();
 
                     final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
                     ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.