Kaynağa Gözat

Fix racy use of ConcurrentHashMap (#36603)

ConcurrentHashMap does not always behave correctly if removing elements and
concurrently checking for its emptyiness. Work around this by protecting all
usages with a mutex (there was only one usage unprotected by the mutex anyway)
and then we don't even need a ConcurrentHashMap at all.
David Turner 6 yıl önce
ebeveyn
işleme
44ba9ab04d

+ 8 - 18
server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

@@ -55,6 +55,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -62,10 +63,9 @@ import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
 import static org.elasticsearch.cluster.coordination.Coordinator.isZen1Node;
 import static org.elasticsearch.cluster.coordination.DiscoveryUpgradeService.createDiscoveryNodeWithImpossiblyHighId;
-import static java.util.Collections.emptyList;
-import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
 
 public abstract class PeerFinder {
 
@@ -95,7 +95,7 @@ public abstract class PeerFinder {
     private volatile long currentTerm;
     private boolean active;
     private DiscoveryNodes lastAcceptedNodes;
-    private final Map<TransportAddress, Peer> peersByAddress = newConcurrentMap();
+    private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
     private Optional<DiscoveryNode> leader = Optional.empty();
     private volatile List<TransportAddress> lastResolvedAddresses = emptyList();
 
@@ -150,6 +150,7 @@ public abstract class PeerFinder {
     }
 
     private boolean assertInactiveWithNoKnownPeers() {
+        assert holdsLock() : "PeerFinder mutex not held";
         assert active == false;
         assert peersByAddress.isEmpty() : peersByAddress.keySet();
         return true;
@@ -256,11 +257,7 @@ public abstract class PeerFinder {
     private boolean handleWakeUp() {
         assert holdsLock() : "PeerFinder mutex not held";
 
-        boolean peersRemoved = false;
-
-        for (final Peer peer : peersByAddress.values()) {
-            peersRemoved = peer.handleWakeUp() || peersRemoved; // care: avoid short-circuiting, each peer needs waking up
-        }
+        final boolean peersRemoved = peersByAddress.values().removeIf(Peer::handleWakeUp);
 
         if (active == false) {
             logger.trace("not active");
@@ -344,7 +341,6 @@ public abstract class PeerFinder {
             assert holdsLock() : "PeerFinder mutex not held";
 
             if (active == false) {
-                removePeer();
                 return true;
             }
 
@@ -358,7 +354,6 @@ public abstract class PeerFinder {
                     }
                 } else {
                     logger.trace("{} no longer connected", this);
-                    removePeer();
                     return true;
                 }
             }
@@ -394,18 +389,13 @@ public abstract class PeerFinder {
                 @Override
                 public void onFailure(Exception e) {
                     logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
-                    removePeer();
+                    synchronized (mutex) {
+                        peersByAddress.remove(transportAddress);
+                    }
                 }
             });
         }
 
-        void removePeer() {
-            final Peer removed = peersByAddress.remove(transportAddress);
-            // assert removed == Peer.this : removed + " != " + Peer.this;
-            // ^ This assertion sometimes trips if we are deactivated and reactivated while a request is in flight.
-            // TODO be more careful about avoiding multiple active Peer objects for each address
-        }
-
         private void requestPeers() {
             assert holdsLock() : "PeerFinder mutex not held";
             assert peersRequestInFlight == false : "PeersRequest already in flight";