Browse Source

Ensure that mock transport does not swallow req (#108656)

Currently it is possible for the MockTransportService distrupt behavior
to swallow requests if either the connection is already closed (in which
case response pruning has already occurred) or if the behavior is added
after the clear callback has been triggered.
Tim Brooks 1 year ago
parent
commit
74ec90bf1d

+ 22 - 4
test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

@@ -29,8 +29,10 @@ import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.RunOnce;
+import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.UpdateForV9;
@@ -48,6 +50,7 @@ import org.elasticsearch.transport.BytesTransportRequest;
 import org.elasticsearch.transport.ClusterConnectionManager;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.ConnectionProfile;
+import org.elasticsearch.transport.NodeNotConnectedException;
 import org.elasticsearch.transport.RequestHandlerRegistry;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.Transport;
@@ -428,7 +431,9 @@ public class MockTransportService extends TransportService {
         );
 
         transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() {
+
             private final Set<Transport.Connection> toClose = ConcurrentHashMap.newKeySet();
+            private final RefCounted refs = AbstractRefCounted.of(this::closeConnections);
 
             @Override
             public void sendRequest(
@@ -437,19 +442,32 @@ public class MockTransportService extends TransportService {
                 String action,
                 TransportRequest request,
                 TransportRequestOptions options
-            ) {
-                // don't send anything, the receiving node is unresponsive
-                toClose.add(connection);
+            ) throws IOException {
+                if (connection.isClosed()) {
+                    throw new NodeNotConnectedException(connection.getNode(), "connection already closed");
+                } else if (refs.tryIncRef()) {
+                    // don't send anything, the receiving node is unresponsive
+                    toClose.add(connection);
+                    refs.decRef();
+                } else {
+                    connection.sendRequest(requestId, action, request, options);
+                }
             }
 
             @Override
             public void clearCallback() {
+                // close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
+                // responds).
+                refs.decRef();
+            }
+
+            private void closeConnections() {
                 // close to simulate that tcp-ip eventually times out and closes connection (necessary to ensure transport eventually
                 // responds).
                 try {
                     IOUtils.close(toClose);
                 } catch (IOException e) {
-                    throw new RuntimeException(e);
+                    throw new AssertionError(e);
                 }
             }
         });