Browse Source

Fix a racing condition in MockTransportService#addUnresponsiveRule where a request can be delayed even if the rule was removed.

Relates to #21129

Also properly reset DiscoveryWithServiceDisruptionsIT#disableBeforeIndexDeletion
Boaz Leskes 9 years ago
parent
commit
523f7ea71e

+ 9 - 3
core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

@@ -50,13 +50,13 @@ import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.discovery.zen.ElectMasterService;
-import org.elasticsearch.discovery.zen.ZenDiscovery;
 import org.elasticsearch.discovery.zen.FaultDetection;
 import org.elasticsearch.discovery.zen.MembershipAction;
+import org.elasticsearch.discovery.zen.PublishClusterStateAction;
+import org.elasticsearch.discovery.zen.UnicastZenPing;
+import org.elasticsearch.discovery.zen.ZenDiscovery;
 import org.elasticsearch.discovery.zen.ZenPing;
 import org.elasticsearch.discovery.zen.ZenPingService;
-import org.elasticsearch.discovery.zen.UnicastZenPing;
-import org.elasticsearch.discovery.zen.PublishClusterStateAction;
 import org.elasticsearch.env.NodeEnvironment;
 import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
 import org.elasticsearch.monitor.jvm.HotThreads;
@@ -155,6 +155,12 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
     private boolean disableBeforeIndexDeletion;
 
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        disableBeforeIndexDeletion = false;
+    }
+
     @Override
     public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
         if (scheme instanceof NetworkDisruption &&

+ 16 - 6
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -60,9 +60,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -299,7 +299,8 @@ public final class MockTransportService extends TransportService {
         final long startTime = System.currentTimeMillis();
 
         addDelegate(transportAddress, new ClearableTransport(original) {
-            private final Queue<Runnable> requestsToSendWhenCleared = new ConcurrentLinkedQueue<>();
+            private final Queue<Runnable> requestsToSendWhenCleared = new LinkedBlockingDeque<Runnable>();
+            private boolean cleared = false;
 
             TimeValue getDelay() {
                 return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
@@ -386,15 +387,24 @@ public final class MockTransportService extends TransportService {
                 };
 
                 // store the request to send it once the rule is cleared.
-                requestsToSendWhenCleared.add(runnable);
-
-                threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable);
+                synchronized (this) {
+                    if (cleared) {
+                        runnable.run();
+                    } else {
+                        requestsToSendWhenCleared.add(runnable);
+                        threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable);
+                    }
+                }
             }
 
 
             @Override
             public void clearRule() {
-                requestsToSendWhenCleared.forEach(Runnable::run);
+                synchronized (this) {
+                    assert cleared == false;
+                    cleared = true;
+                    requestsToSendWhenCleared.forEach(Runnable::run);
+                }
             }
         });
     }