浏览代码

Introduce new handshake version for v9 (#120109) (#120129)

This commit introduces `V9_HANDSHAKE_VERSION` which is the transport
version that will eventually be for handshakes sent by v9 nodes. It does
not adjust the handshake version yet, because we must first backport
this to the v8.18 branch so that v8.18 nodes can understand the new v9
handshakes.
David Turner 9 月之前
父节点
当前提交
898c78ee5d

+ 16 - 6
server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

@@ -41,9 +41,14 @@ final class TransportHandshaker {
      * ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
      * messages.
      *
-     * This version supports two handshake protocols, v6080099 and v7170099, which respectively have the same message structure as the
-     * transport protocols of v6.8.0 and v7.17.0. This node only sends v7170099 requests, but it can send a valid response to any v6080099
-     * requests that it receives.
+     * This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
+     * as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
+     * to any v6080099 or v8800000 requests that it receives.
+     *
+     * Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
+     * stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
+     * rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
+     * true.
      *
      * Here are some example messages, broken down to show their structure:
      *
@@ -79,7 +84,7 @@ final class TransportHandshaker {
      *    c3 f9 eb 03                   -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
      *
      *
-     * ## v7170099 Request:
+     * ## v7170099 and v8800000 Requests:
      *
      * 45 53                            -- 'ES' marker
      * 00 00 00 31                      -- total message length
@@ -98,7 +103,7 @@ final class TransportHandshaker {
      *    04                            -- payload length
      *       c3 f9 eb 03                -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
      *
-     * ## v7170099 Response:
+     * ## v7170099 and v8800000 Responses:
      *
      * 45 53                            -- 'ES' marker
      * 00 00 00 17                      -- total message length
@@ -118,7 +123,12 @@ final class TransportHandshaker {
 
     static final TransportVersion EARLIEST_HANDSHAKE_VERSION = TransportVersion.fromId(6080099);
     static final TransportVersion REQUEST_HANDSHAKE_VERSION = TransportVersions.MINIMUM_COMPATIBLE;
-    static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(EARLIEST_HANDSHAKE_VERSION, REQUEST_HANDSHAKE_VERSION);
+    static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
+    static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
+        EARLIEST_HANDSHAKE_VERSION,
+        REQUEST_HANDSHAKE_VERSION,
+        V9_HANDSHAKE_VERSION
+    );
 
     static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
     private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();

+ 55 - 12
server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

@@ -27,6 +27,7 @@ import java.util.ArrayList;
 
 import static org.elasticsearch.common.bytes.ReleasableBytesReferenceStreamInputTests.wrapAsReleasable;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 
@@ -182,7 +183,7 @@ public class InboundDecoderTests extends ESTestCase {
         }
     }
 
-    public void testDecodeHandshakeCompatibility() throws IOException {
+    public void testDecodeHandshakeV7Compatibility() throws IOException {
         String action = "test-request";
         long requestId = randomNonNegativeLong();
         final String headerKey = randomAlphaOfLength(10);
@@ -223,6 +224,55 @@ public class InboundDecoderTests extends ESTestCase {
 
     }
 
+    public void testDecodeHandshakeV8Compatibility() throws IOException {
+        doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, null);
+        doHandshakeCompatibilityTest(TransportHandshaker.REQUEST_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
+    }
+
+    public void testDecodeHandshakeV9Compatibility() throws IOException {
+        doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, null);
+        doHandshakeCompatibilityTest(TransportHandshaker.V9_HANDSHAKE_VERSION, Compression.Scheme.DEFLATE);
+    }
+
+    private void doHandshakeCompatibilityTest(TransportVersion transportVersion, Compression.Scheme compressionScheme) throws IOException {
+        String action = "test-request";
+        long requestId = randomNonNegativeLong();
+        final String headerKey = randomAlphaOfLength(10);
+        final String headerValue = randomAlphaOfLength(20);
+        threadContext.putHeader(headerKey, headerValue);
+        OutboundMessage message = new OutboundMessage.Request(
+            threadContext,
+            new TestRequest(randomAlphaOfLength(100)),
+            transportVersion,
+            action,
+            requestId,
+            true,
+            compressionScheme
+        );
+
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference bytes = message.serialize(os);
+            int totalHeaderSize = TcpHeader.headerSize(transportVersion);
+
+            InboundDecoder decoder = new InboundDecoder(recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = wrapAsReleasable(bytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertThat(bytesConsumed, greaterThan(totalHeaderSize));
+            assertTrue(releasable1.hasReferences());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(transportVersion, header.getVersion());
+            assertEquals(compressionScheme == Compression.Scheme.DEFLATE, header.isCompressed());
+            assertTrue(header.isHandshake());
+            assertTrue(header.isRequest());
+            assertFalse(header.needsToReadVariableHeader());
+            assertEquals(headerValue, header.getRequestHeaders().get(headerKey));
+            fragments.clear();
+        }
+    }
+
     public void testClientChannelTypeFailsDecodingRequests() throws Exception {
         String action = "test-request";
         long requestId = randomNonNegativeLong();
@@ -488,23 +538,16 @@ public class InboundDecoderTests extends ESTestCase {
     }
 
     public void testCheckHandshakeCompatibility() {
-        try {
-            InboundDecoder.checkHandshakeVersionCompatibility(randomFrom(TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS));
-        } catch (IllegalStateException e) {
-            throw new AssertionError(e);
-        }
+        for (final var allowedHandshakeVersion : TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS) {
+            InboundDecoder.checkHandshakeVersionCompatibility(allowedHandshakeVersion); // should not throw
 
-        var invalid = TransportVersion.fromId(TransportHandshaker.EARLIEST_HANDSHAKE_VERSION.id() - 1);
-        try {
-            InboundDecoder.checkHandshakeVersionCompatibility(invalid);
-            fail();
-        } catch (IllegalStateException expected) {
+            var invalid = TransportVersion.fromId(allowedHandshakeVersion.id() + randomFrom(-1, +1));
             assertEquals(
                 "Received message from unsupported version: ["
                     + invalid
                     + "] allowed versions are: "
                     + TransportHandshaker.ALLOWED_HANDSHAKE_VERSIONS,
-                expected.getMessage()
+                expectThrows(IllegalStateException.class, () -> InboundDecoder.checkHandshakeVersionCompatibility(invalid)).getMessage()
             );
         }
     }