1
0
Эх сурвалжийг харах

Fail tests on unknown actions (#81230)

Today in tests we handle requests for unknown actions much as in
production code, dropping the message and logging a warning. Tests may
fail if they hit such an error, but there might be many retries first
and and the failure might just be reported as a timeout which is hard to
diagnose. Some tests might even still pass if, for instance, the bad
message is something like a background stats request.

Nonetheless we should not be sending requests for unknown actions in
(almost all) tests. We should instead be making sure that the
destination node has a new enough version to understand the request
first. Therefore with this commit we fail tests immediately on the first
unknown action they encounter.

Relates #81116
David Turner 3 жил өмнө
parent
commit
1cf15a7ae3

+ 2 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java

@@ -56,7 +56,8 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
             threadPool::relativeTimeInMillis,
             transport.getInflightBreaker(),
             requestHandlers::getHandler,
-            transport::inboundMessage
+            transport::inboundMessage,
+            false
         );
     }
 

+ 2 - 1
plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java

@@ -44,7 +44,8 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
             threadPool::relativeTimeInMillis,
             breaker,
             requestHandlers::getHandler,
-            transport::inboundMessage
+            transport::inboundMessage,
+            false
         );
     }
 

+ 3 - 1
server/src/main/java/org/elasticsearch/transport/InboundAggregator.java

@@ -37,11 +37,13 @@ public class InboundAggregator implements Releasable {
 
     public InboundAggregator(
         Supplier<CircuitBreaker> circuitBreaker,
-        Function<String, RequestHandlerRegistry<TransportRequest>> registryFunction
+        Function<String, RequestHandlerRegistry<TransportRequest>> registryFunction,
+        boolean ignoreDeserializationErrors
     ) {
         this(circuitBreaker, (Predicate<String>) actionName -> {
             final RequestHandlerRegistry<TransportRequest> reg = registryFunction.apply(actionName);
             if (reg == null) {
+                assert ignoreDeserializationErrors : actionName;
                 throw new ActionNotFoundTransportException(actionName);
             } else {
                 return reg.canTripCircuitBreaker();

+ 3 - 2
server/src/main/java/org/elasticsearch/transport/InboundPipeline.java

@@ -46,13 +46,14 @@ public class InboundPipeline implements Releasable {
         LongSupplier relativeTimeInMillis,
         Supplier<CircuitBreaker> circuitBreaker,
         Function<String, RequestHandlerRegistry<TransportRequest>> registryFunction,
-        BiConsumer<TcpChannel, InboundMessage> messageHandler
+        BiConsumer<TcpChannel, InboundMessage> messageHandler,
+        boolean ignoreDeserializationErrors
     ) {
         this(
             statsTracker,
             relativeTimeInMillis,
             new InboundDecoder(version, recycler),
-            new InboundAggregator(circuitBreaker, registryFunction),
+            new InboundAggregator(circuitBreaker, registryFunction, ignoreDeserializationErrors),
             messageHandler
         );
     }

+ 1 - 0
server/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -901,6 +901,7 @@ public class TransportService extends AbstractLifecycleComponent
             @SuppressWarnings("unchecked")
             final RequestHandlerRegistry<TransportRequest> reg = (RequestHandlerRegistry<TransportRequest>) getRequestHandler(action);
             if (reg == null) {
+                assert false : action;
                 throw new ActionNotFoundTransportException("Action [" + action + "] not found");
             }
             final String executor = reg.getExecutor();

+ 5 - 1
server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.test.NodeRoles.onlyRole;
 import static org.elasticsearch.test.NodeRoles.removeRoles;
+import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
 import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -42,7 +43,10 @@ public class RemoteClusterClientTests extends ESTestCase {
     }
 
     public void testConnectAndExecuteRequest() throws Exception {
-        Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build();
+        Settings remoteSettings = Settings.builder()
+            .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster")
+            .put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true) // suppress assertions to test production error-handling
+            .build();
         try (
             MockTransportService remoteTransport = startTransport(
                 "remote_node",

+ 1 - 2
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -895,13 +895,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
                         final String info = sender + "_B_" + iter;
                         serviceB.sendRequest(
                             nodeA,
-                            "test",
+                            "internal:test",
                             new TestRequest(info),
                             new ActionListenerResponseHandler<>(listener, TestResponse::new)
                         );
                         try {
                             listener.actionGet();
-
                         } catch (Exception e) {
                             logger.trace(
                                 (Supplier<?>) () -> new ParameterizedMessage("caught exception while sending to node {}", nodeA),

+ 4 - 1
test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

@@ -73,6 +73,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
 import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
+import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
 
 public class MockNioTransport extends TcpTransport {
     private static final Logger logger = LogManager.getLogger(MockNioTransport.class);
@@ -330,7 +331,9 @@ public class MockNioTransport extends TcpTransport {
                 threadPool::relativeTimeInMillis,
                 breaker,
                 requestHandlers::getHandler,
-                transport::inboundMessage
+                transport::inboundMessage,
+                (transport instanceof MockNioTransport)
+                    && IGNORE_DESERIALIZATION_ERRORS_SETTING.get(((MockNioTransport) transport).settings)
             );
         }