Browse Source

Refactor TransportVersion loading to support external consumers (#132694) (#132862)

This change moves transport version loading out of TransportVersion.VersionsHolder, so that is can be
consumed elsewhere by projects using the same resource file structure.

Jira: ES-12401
Jack Conradson 2 months ago
parent
commit
54709c5112

+ 89 - 70
server/src/main/java/org/elasticsearch/TransportVersion.java

@@ -79,23 +79,51 @@ public record TransportVersion(String name, int id, TransportVersion nextPatchVe
         this(null, id, null);
     }
 
+    interface BufferedReaderParser<T> {
+        T parse(String component, String path, BufferedReader bufferedReader);
+    }
+
+    static <T> T parseFromBufferedReader(
+        String component,
+        String path,
+        Function<String, InputStream> nameToStream,
+        BufferedReaderParser<T> parser
+    ) {
+        try (InputStream inputStream = nameToStream.apply(path)) {
+            if (inputStream == null) {
+                return null;
+            }
+            try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+                return parser.parse(component, path, bufferedReader);
+            }
+        } catch (IOException ioe) {
+            throw new UncheckedIOException("parsing error [" + component + ":" + path + "]", ioe);
+        }
+    }
+
     /**
      * Constructs a named transport version along with its set of compatible patch versions from x-content.
      * This method takes in the parameter {@code latest} which is the highest valid transport version id
      * supported by this node. Versions newer than the current transport version id for this node are discarded.
      */
-    public static TransportVersion fromInputStream(String path, boolean nameInFile, InputStream stream, Integer latest) {
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
-            String line = reader.readLine();
+    public static TransportVersion fromBufferedReader(
+        String component,
+        String path,
+        boolean nameInFile,
+        BufferedReader bufferedReader,
+        Integer latest
+    ) {
+        try {
+            String line = bufferedReader.readLine();
             String[] parts = line.replaceAll("\\s+", "").split(",");
             String check;
-            while ((check = reader.readLine()) != null) {
+            while ((check = bufferedReader.readLine()) != null) {
                 if (check.replaceAll("\\s+", "").isEmpty() == false) {
-                    throw new IllegalArgumentException("invalid transport version file format [" + path + "]");
+                    throw new IllegalArgumentException("invalid transport version file format [" + toComponentPath(component, path) + "]");
                 }
             }
             if (parts.length < (nameInFile ? 2 : 1)) {
-                throw new IllegalStateException("invalid transport version file format [" + path + "]");
+                throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
             }
             String name = nameInFile ? parts[0] : path.substring(path.lastIndexOf('/') + 1, path.length() - 4);
             List<Integer> ids = new ArrayList<>();
@@ -103,12 +131,17 @@ public record TransportVersion(String name, int id, TransportVersion nextPatchVe
                 try {
                     ids.add(Integer.parseInt(parts[i]));
                 } catch (NumberFormatException nfe) {
-                    throw new IllegalStateException("invalid transport version file format [" + path + "]", nfe);
+                    throw new IllegalStateException(
+                        "invalid transport version file format [" + toComponentPath(component, path) + "]",
+                        nfe
+                    );
                 }
             }
-            ids.sort(Integer::compareTo);
             TransportVersion transportVersion = null;
-            for (int idIndex = 0; idIndex < ids.size(); ++idIndex) {
+            for (int idIndex = ids.size() - 1; idIndex >= 0; --idIndex) {
+                if (idIndex > 0 && ids.get(idIndex - 1) <= ids.get(idIndex)) {
+                    throw new IllegalStateException("invalid transport version file format [" + toComponentPath(component, path) + "]");
+                }
                 if (ids.get(idIndex) > latest) {
                     break;
                 }
@@ -116,10 +149,51 @@ public record TransportVersion(String name, int id, TransportVersion nextPatchVe
             }
             return transportVersion;
         } catch (IOException ioe) {
-            throw new UncheckedIOException("cannot parse transport version [" + path + "]", ioe);
+            throw new UncheckedIOException("invalid transport version file format [" + toComponentPath(component, path) + "]", ioe);
         }
     }
 
+    public static Map<String, TransportVersion> collectFromInputStreams(
+        String component,
+        Function<String, InputStream> nameToStream,
+        String latestFileName
+    ) {
+        TransportVersion latest = parseFromBufferedReader(
+            component,
+            "/transport/latest/" + latestFileName,
+            nameToStream,
+            (c, p, br) -> fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
+        );
+        if (latest != null) {
+            List<String> versionFilesNames = parseFromBufferedReader(
+                component,
+                "/transport/defined/manifest.txt",
+                nameToStream,
+                (c, p, br) -> br.lines().filter(line -> line.isBlank() == false).toList()
+            );
+            if (versionFilesNames != null) {
+                Map<String, TransportVersion> transportVersions = new HashMap<>();
+                for (String versionFileName : versionFilesNames) {
+                    TransportVersion transportVersion = parseFromBufferedReader(
+                        component,
+                        "/transport/defined/" + versionFileName,
+                        nameToStream,
+                        (c, p, br) -> fromBufferedReader(c, p, false, br, latest.id())
+                    );
+                    if (transportVersion != null) {
+                        transportVersions.put(versionFileName.substring(0, versionFileName.length() - 4), transportVersion);
+                    }
+                }
+                return transportVersions;
+            }
+        }
+        return Map.of();
+    }
+
+    private static String toComponentPath(String component, String path) {
+        return component + ":" + path;
+    }
+
     public static TransportVersion readVersion(StreamInput in) throws IOException {
         return fromId(in.readVInt());
     }
@@ -337,7 +411,11 @@ public record TransportVersion(String name, int id, TransportVersion nextPatchVe
         static {
             // collect all the transport versions from server and es modules/plugins (defined in server)
             List<TransportVersion> allVersions = new ArrayList<>(TransportVersions.DEFINED_VERSIONS);
-            Map<String, TransportVersion> allVersionsByName = loadTransportVersionsByName();
+            Map<String, TransportVersion> allVersionsByName = collectFromInputStreams(
+                "<server>",
+                TransportVersion.class::getResourceAsStream,
+                Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv"
+            );
             addTransportVersions(allVersionsByName.values(), allVersions).sort(TransportVersion::compareTo);
 
             // set the transport version lookups
@@ -351,65 +429,6 @@ public record TransportVersion(String name, int id, TransportVersion nextPatchVe
             CURRENT = ALL_VERSIONS.get(ALL_VERSIONS.size() - 1);
         }
 
-        private static Map<String, TransportVersion> loadTransportVersionsByName() {
-            Map<String, TransportVersion> transportVersions = new HashMap<>();
-
-            String latestLocation = "/transport/latest/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv";
-            int latestId = -1;
-            try (InputStream inputStream = TransportVersion.class.getResourceAsStream(latestLocation)) {
-                // this check is required until bootstrapping for the new transport versions format is completed;
-                // when load is false, we will only use the transport versions in the legacy format;
-                // load becomes false if we don't find the latest or manifest files required for the new format
-                if (inputStream != null) {
-                    TransportVersion latest = fromInputStream(latestLocation, true, inputStream, Integer.MAX_VALUE);
-                    if (latest == null) {
-                        throw new IllegalStateException(
-                            "invalid latest transport version for minor version ["
-                                + Version.CURRENT.major
-                                + "."
-                                + Version.CURRENT.minor
-                                + "]"
-                        );
-                    }
-                    latestId = latest.id();
-                }
-            } catch (IOException ioe) {
-                throw new UncheckedIOException("latest transport version file not found at [" + latestLocation + "]", ioe);
-            }
-
-            String manifestLocation = "/transport/defined/manifest.txt";
-            List<String> versionFileNames = null;
-            if (latestId > -1) {
-                try (InputStream inputStream = TransportVersion.class.getResourceAsStream(manifestLocation)) {
-                    if (inputStream != null) {
-                        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
-                        versionFileNames = reader.lines().filter(line -> line.isBlank() == false).toList();
-                    }
-                } catch (IOException ioe) {
-                    throw new UncheckedIOException("transport version manifest file not found at [" + manifestLocation + "]", ioe);
-                }
-            }
-
-            if (versionFileNames != null) {
-                for (String name : versionFileNames) {
-                    String versionLocation = "/transport/defined/" + name;
-                    try (InputStream inputStream = TransportVersion.class.getResourceAsStream(versionLocation)) {
-                        if (inputStream == null) {
-                            throw new IllegalStateException("transport version file not found at [" + versionLocation + "]");
-                        }
-                        TransportVersion transportVersion = TransportVersion.fromInputStream(versionLocation, false, inputStream, latestId);
-                        if (transportVersion != null) {
-                            transportVersions.put(transportVersion.name(), transportVersion);
-                        }
-                    } catch (IOException ioe) {
-                        throw new UncheckedIOException("transport version file not found at [ " + versionLocation + "]", ioe);
-                    }
-                }
-            }
-
-            return transportVersions;
-        }
-
         private static List<TransportVersion> addTransportVersions(Collection<TransportVersion> addFrom, List<TransportVersion> addTo) {
             for (TransportVersion transportVersion : addFrom) {
                 addTo.add(transportVersion);

+ 52 - 31
server/src/test/java/org/elasticsearch/TransportVersionTests.java

@@ -12,7 +12,11 @@ package org.elasticsearch;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
 
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.InputStreamReader;
 import java.lang.reflect.Modifier;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -268,42 +272,38 @@ public class TransportVersionTests extends ESTestCase {
         }
     }
 
-    public void testFromName() {
-        assertThat(TransportVersion.fromName("test_0"), is(new TransportVersion("test_0", 3001000, null)));
-        assertThat(TransportVersion.fromName("test_1"), is(new TransportVersion("test_1", 3002000, null)));
-        assertThat(
-            TransportVersion.fromName("test_2"),
-            is(
-                new TransportVersion(
-                    "test_2",
-                    3003000,
-                    new TransportVersion("test_2", 2001001, new TransportVersion("test_2", 1001001, null))
-                )
-            )
-        );
-        assertThat(
-            TransportVersion.fromName("test_3"),
-            is(new TransportVersion("test_3", 3003001, new TransportVersion("test_3", 2001002, null)))
-        );
-        assertThat(
-            TransportVersion.fromName("test_4"),
-            is(
-                new TransportVersion(
-                    "test_4",
-                    3003002,
-                    new TransportVersion("test_4", 2001003, new TransportVersion("test_4", 1001002, null))
-                )
-            )
+    public void testLatest() {
+        TransportVersion latest = TransportVersion.parseFromBufferedReader(
+            "<test>",
+            "/transport/defined/" + Version.CURRENT.major + "." + Version.CURRENT.minor + ".csv",
+            TransportVersion.class::getResourceAsStream,
+            (c, p, br) -> TransportVersion.fromBufferedReader(c, p, true, br, Integer.MAX_VALUE)
         );
+        // TODO: once placeholder is removed, test the latest known version can be found fromName
+        // assertThat(latest, is(TransportVersion.fromName(latest.name())));
     }
 
     public void testSupports() {
-        TransportVersion test0 = TransportVersion.fromName("test_0");
+        byte[] data0 = "100001000,3001000".getBytes(StandardCharsets.UTF_8);
+        TransportVersion test0 = TransportVersion.fromBufferedReader(
+            "<test>",
+            "testSupports0",
+            false,
+            new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data0), StandardCharsets.UTF_8)),
+            5000000
+        );
         assertThat(new TransportVersion(null, 2003000, null).supports(test0), is(false));
         assertThat(new TransportVersion(null, 3001000, null).supports(test0), is(true));
         assertThat(new TransportVersion(null, 100001001, null).supports(test0), is(true));
 
-        TransportVersion test1 = TransportVersion.fromName("test_1");
+        byte[] data1 = "3002000".getBytes(StandardCharsets.UTF_8);
+        TransportVersion test1 = TransportVersion.fromBufferedReader(
+            "<test>",
+            "testSupports1",
+            false,
+            new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data1), StandardCharsets.UTF_8)),
+            5000000
+        );
         assertThat(new TransportVersion(null, 2003000, null).supports(test1), is(false));
         assertThat(new TransportVersion(null, 3001000, null).supports(test1), is(false));
         assertThat(new TransportVersion(null, 3001001, null).supports(test1), is(false));
@@ -311,7 +311,14 @@ public class TransportVersionTests extends ESTestCase {
         assertThat(new TransportVersion(null, 100001000, null).supports(test1), is(true));
         assertThat(new TransportVersion(null, 100001001, null).supports(test1), is(true));
 
-        TransportVersion test2 = TransportVersion.fromName("test_2");
+        byte[] data2 = "3003000,2001001,1001001".getBytes(StandardCharsets.UTF_8);
+        TransportVersion test2 = TransportVersion.fromBufferedReader(
+            "<test>",
+            "testSupports2",
+            false,
+            new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data2), StandardCharsets.UTF_8)),
+            5000000
+        );
         assertThat(new TransportVersion(null, 1001000, null).supports(test2), is(false));
         assertThat(new TransportVersion(null, 1001001, null).supports(test2), is(true));
         assertThat(new TransportVersion(null, 1001002, null).supports(test2), is(true));
@@ -331,7 +338,14 @@ public class TransportVersionTests extends ESTestCase {
         assertThat(new TransportVersion(null, 100001000, null).supports(test2), is(true));
         assertThat(new TransportVersion(null, 100001001, null).supports(test2), is(true));
 
-        TransportVersion test3 = TransportVersion.fromName("test_3");
+        byte[] data3 = "100002000,3003001,2001002".getBytes(StandardCharsets.UTF_8);
+        TransportVersion test3 = TransportVersion.fromBufferedReader(
+            "<test>",
+            "testSupports3",
+            false,
+            new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data3), StandardCharsets.UTF_8)),
+            5000000
+        );
         assertThat(new TransportVersion(null, 1001001, null).supports(test3), is(false));
         assertThat(new TransportVersion(null, 1001002, null).supports(test3), is(false));
         assertThat(new TransportVersion(null, 1001003, null).supports(test3), is(false));
@@ -352,7 +366,14 @@ public class TransportVersionTests extends ESTestCase {
         assertThat(new TransportVersion(null, 100001000, null).supports(test3), is(true));
         assertThat(new TransportVersion(null, 100001001, null).supports(test3), is(true));
 
-        TransportVersion test4 = TransportVersion.fromName("test_4");
+        byte[] data4 = "100002000,3003002,2001003,1001002".getBytes(StandardCharsets.UTF_8);
+        TransportVersion test4 = TransportVersion.fromBufferedReader(
+            "<test>",
+            "testSupports3",
+            false,
+            new BufferedReader(new InputStreamReader(new ByteArrayInputStream(data4), StandardCharsets.UTF_8)),
+            5000000
+        );
         assertThat(new TransportVersion(null, 1001001, null).supports(test4), is(false));
         assertThat(new TransportVersion(null, 1001002, null).supports(test4), is(true));
         assertThat(new TransportVersion(null, 1001003, null).supports(test4), is(true));

+ 0 - 5
server/src/test/resources/transport/defined/manifest.txt

@@ -1,5 +0,0 @@
-test_0.csv
-test_1.csv
-test_2.csv
-test_3.csv
-test_4.csv

+ 0 - 1
server/src/test/resources/transport/defined/test_0.csv

@@ -1 +0,0 @@
-100001000,3001000

+ 0 - 2
server/src/test/resources/transport/defined/test_1.csv

@@ -1,2 +0,0 @@
-3002000
-

+ 0 - 1
server/src/test/resources/transport/defined/test_2.csv

@@ -1 +0,0 @@
-3003000,2001001,1001001

+ 0 - 1
server/src/test/resources/transport/defined/test_3.csv

@@ -1 +0,0 @@
-100002000,3003001,2001002

+ 0 - 1
server/src/test/resources/transport/defined/test_4.csv

@@ -1 +0,0 @@
-100002000,3003002,2001003,1001002