Просмотр исходного кода

Fix getDatabaseType for unusual MMDBs (#112888) (#112954)

Joe Gallo 1 год назад
Родитель
Сommit
7463b775fa

+ 5 - 0
docs/changelog/112888.yaml

@@ -0,0 +1,5 @@
+pr: 112888
+summary: Fix `getDatabaseType` for unusual MMDBs
+area: Ingest Node
+type: bug
+issues: []

+ 3 - 70
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

@@ -34,9 +34,7 @@ import org.elasticsearch.core.SuppressForbidden;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetAddress;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Objects;
@@ -66,23 +64,16 @@ class DatabaseReaderLazyLoader implements GeoIpDatabase, Closeable {
     private final AtomicInteger currentUsages = new AtomicInteger(0);
 
     DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5) {
-        this(cache, databasePath, md5, createDatabaseLoader(databasePath));
-    }
-
-    DatabaseReaderLazyLoader(GeoIpCache cache, Path databasePath, String md5, CheckedSupplier<DatabaseReader, IOException> loader) {
         this.cache = cache;
         this.databasePath = Objects.requireNonNull(databasePath);
         this.md5 = md5;
-        this.loader = Objects.requireNonNull(loader);
+        this.loader = createDatabaseLoader(databasePath);
         this.databaseReader = new SetOnce<>();
         this.databaseType = new SetOnce<>();
     }
 
     /**
-     * Read the database type from the database. We do this manually instead of relying on the built-in mechanism to avoid reading the
-     * entire database into memory merely to read the type. This is especially important to maintain on master nodes where pipelines are
-     * validated. If we read the entire database into memory, we could potentially run into low-memory constraints on such nodes where
-     * loading this data would otherwise be wasteful if they are not also ingest nodes.
+     * Read the database type from the database and cache it for future calls.
      *
      * @return the database type
      * @throws IOException if an I/O exception occurs reading the database type
@@ -92,71 +83,13 @@ class DatabaseReaderLazyLoader implements GeoIpDatabase, Closeable {
         if (databaseType.get() == null) {
             synchronized (databaseType) {
                 if (databaseType.get() == null) {
-                    final long fileSize = databaseFileSize();
-                    if (fileSize <= 512) {
-                        throw new IOException("unexpected file length [" + fileSize + "] for [" + databasePath + "]");
-                    }
-                    final int[] databaseTypeMarker = { 'd', 'a', 't', 'a', 'b', 'a', 's', 'e', '_', 't', 'y', 'p', 'e' };
-                    try (InputStream in = databaseInputStream()) {
-                        // read the last 512 bytes
-                        final long skipped = in.skip(fileSize - 512);
-                        if (skipped != fileSize - 512) {
-                            throw new IOException("failed to skip [" + (fileSize - 512) + "] bytes while reading [" + databasePath + "]");
-                        }
-                        final byte[] tail = new byte[512];
-                        int read = 0;
-                        do {
-                            final int actualBytesRead = in.read(tail, read, 512 - read);
-                            if (actualBytesRead == -1) {
-                                throw new IOException("unexpected end of stream [" + databasePath + "] after reading [" + read + "] bytes");
-                            }
-                            read += actualBytesRead;
-                        } while (read != 512);
-
-                        // find the database_type header
-                        int metadataOffset = -1;
-                        int markerOffset = 0;
-                        for (int i = 0; i < tail.length; i++) {
-                            byte b = tail[i];
-
-                            if (b == databaseTypeMarker[markerOffset]) {
-                                markerOffset++;
-                            } else {
-                                markerOffset = 0;
-                            }
-                            if (markerOffset == databaseTypeMarker.length) {
-                                metadataOffset = i + 1;
-                                break;
-                            }
-                        }
-
-                        if (metadataOffset == -1) {
-                            throw new IOException("database type marker not found");
-                        }
-
-                        // read the database type
-                        final int offsetByte = tail[metadataOffset] & 0xFF;
-                        final int type = offsetByte >>> 5;
-                        if (type != 2) {
-                            throw new IOException("type must be UTF-8 string");
-                        }
-                        int size = offsetByte & 0x1f;
-                        databaseType.set(new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8));
-                    }
+                    databaseType.set(MMDBUtil.getDatabaseType(databasePath));
                 }
             }
         }
         return databaseType.get();
     }
 
-    long databaseFileSize() throws IOException {
-        return Files.size(databasePath);
-    }
-
-    InputStream databaseInputStream() throws IOException {
-        return Files.newInputStream(databasePath);
-    }
-
     @Nullable
     @Override
     public CityResponse getCity(InetAddress ipAddress) {

+ 101 - 0
modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/MMDBUtil.java

@@ -0,0 +1,101 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.geoip;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public final class MMDBUtil {
+
+    private MMDBUtil() {
+        // utility class
+    }
+
+    private static final byte[] DATABASE_TYPE_MARKER = "database_type".getBytes(StandardCharsets.UTF_8);
+
+    // note: technically the metadata can be up to 128k long, but we only handle it correctly as long as it's less than
+    // or equal to this buffer size. in practice, that seems to be plenty for ordinary files.
+    private static final int BUFFER_SIZE = 2048;
+
+    /**
+     * Read the database type from the database. We do this manually instead of relying on the built-in mechanism to avoid reading the
+     * entire database into memory merely to read the type. This is especially important to maintain on master nodes where pipelines are
+     * validated. If we read the entire database into memory, we could potentially run into low-memory constraints on such nodes where
+     * loading this data would otherwise be wasteful if they are not also ingest nodes.
+     *
+     * @return the database type
+     * @throws IOException if an I/O exception occurs reading the database type
+     */
+    public static String getDatabaseType(final Path database) throws IOException {
+        final long fileSize = Files.size(database);
+        try (InputStream in = Files.newInputStream(database)) {
+            // read the last BUFFER_SIZE bytes (or the fileSize, whichever is smaller)
+            final long skip = fileSize > BUFFER_SIZE ? fileSize - BUFFER_SIZE : 0;
+            final long skipped = in.skip(skip);
+            if (skipped != skip) {
+                throw new IOException("failed to skip [" + skip + "] bytes while reading [" + database + "]");
+            }
+            final byte[] tail = new byte[BUFFER_SIZE];
+            int read = 0;
+            int actualBytesRead;
+            do {
+                actualBytesRead = in.read(tail, read, BUFFER_SIZE - read);
+                read += actualBytesRead;
+            } while (actualBytesRead > 0);
+
+            // find the database_type header
+            int metadataOffset = -1;
+            int markerOffset = 0;
+            for (int i = 0; i < tail.length; i++) {
+                byte b = tail[i];
+
+                if (b == DATABASE_TYPE_MARKER[markerOffset]) {
+                    markerOffset++;
+                } else {
+                    markerOffset = 0;
+                }
+                if (markerOffset == DATABASE_TYPE_MARKER.length) {
+                    metadataOffset = i + 1;
+                    break;
+                }
+            }
+
+            if (metadataOffset == -1) {
+                throw new IOException("database type marker not found");
+            }
+
+            // read the database type
+            final int offsetByte = fromBytes(tail[metadataOffset]);
+            final int type = offsetByte >>> 5;
+            if (type != 2) { // 2 is the type indicator in the mmdb format for a UTF-8 string
+                throw new IOException("type must be UTF-8 string");
+            }
+            int size = offsetByte & 0x1f;
+            if (size == 29) {
+                // then we need to read in yet another byte and add it onto this size
+                // this can actually occur in practice, a 29+ character type description isn't that hard to imagine
+                size = 29 + fromBytes(tail[metadataOffset + 1]);
+                metadataOffset += 1;
+            } else if (size >= 30) {
+                // we'd need to read two or three more bytes to get the size, but this means the type length is >=285
+                throw new IOException("database_type too long [size indicator == " + size + "]");
+            }
+
+            return new String(tail, metadataOffset + 1, size, StandardCharsets.UTF_8);
+        }
+    }
+
+    private static int fromBytes(byte b1) {
+        return b1 & 0xFF;
+    }
+}

+ 47 - 54
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java

@@ -9,27 +9,27 @@
 
 package org.elasticsearch.ingest.geoip;
 
-import com.maxmind.geoip2.DatabaseReader;
-
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.util.set.Sets;
-import org.elasticsearch.core.PathUtils;
+import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.RandomDocumentPicks;
 import org.elasticsearch.ingest.geoip.Database.Property;
 import org.elasticsearch.test.ESTestCase;
+import org.junit.After;
+import org.junit.Before;
 
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
+import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -41,6 +41,19 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     private static final Set<Property> ALL_PROPERTIES = Set.of(Property.values());
 
+    // a temporary directory that mmdb files can be copied to and read from
+    Path tmpDir;
+
+    @Before
+    public void setup() {
+        tmpDir = createTempDir();
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        IOUtils.rm(tmpDir);
+    }
+
     public void testDatabasePropertyInvariants() {
         // the city database is like a specialization of the country database
         assertThat(Sets.difference(Database.Country.properties(), Database.City.properties()), is(empty()));
@@ -64,7 +77,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -99,7 +112,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -121,7 +134,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -140,7 +153,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -162,7 +175,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -181,7 +194,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -220,7 +233,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -246,7 +259,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-Country.mmdb"),
+            loader("GeoLite2-Country.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -276,7 +289,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-Country.mmdb"),
+            loader("GeoLite2-Country.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -303,7 +316,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-ASN.mmdb"),
+            loader("GeoLite2-ASN.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -333,7 +346,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoIP2-Anonymous-IP-Test.mmdb"),
+            loader("GeoIP2-Anonymous-IP-Test.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -366,7 +379,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoIP2-Connection-Type-Test.mmdb"),
+            loader("GeoIP2-Connection-Type-Test.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -394,7 +407,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoIP2-Domain-Test.mmdb"),
+            loader("GeoIP2-Domain-Test.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -422,7 +435,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoIP2-Enterprise-Test.mmdb"),
+            loader("GeoIP2-Enterprise-Test.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -475,7 +488,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoIP2-ISP-Test.mmdb"),
+            loader("GeoIP2-ISP-Test.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -508,7 +521,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -532,7 +545,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -553,7 +566,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -583,7 +596,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -613,7 +626,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -632,7 +645,7 @@ public class GeoIpProcessorTests extends ESTestCase {
 
     public void testListDatabaseReferenceCounting() throws Exception {
         AtomicBoolean closeCheck = new AtomicBoolean(false);
-        var loader = loader("/GeoLite2-City.mmdb", closeCheck);
+        var loader = loader("GeoLite2-City.mmdb", closeCheck);
         GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), null, "source_field", () -> {
             loader.preLookup();
             return loader;
@@ -664,7 +677,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -692,7 +705,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> true,
             "target_field",
             ALL_PROPERTIES,
@@ -714,7 +727,7 @@ public class GeoIpProcessorTests extends ESTestCase {
             randomAlphaOfLength(10),
             null,
             "source_field",
-            loader("/GeoLite2-City.mmdb"),
+            loader("GeoLite2-City.mmdb"),
             () -> false,
             "target_field",
             ALL_PROPERTIES,
@@ -782,32 +795,12 @@ public class GeoIpProcessorTests extends ESTestCase {
         return () -> loader;
     }
 
-    private DatabaseReaderLazyLoader loader(final String path, final AtomicBoolean closed) {
-        final Supplier<InputStream> databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path);
-        final CheckedSupplier<DatabaseReader, IOException> loader = () -> new DatabaseReader.Builder(databaseInputStreamSupplier.get())
-            .build();
-        final GeoIpCache cache = new GeoIpCache(1000);
-        return new DatabaseReaderLazyLoader(cache, PathUtils.get(path), null, loader) {
-
-            @Override
-            long databaseFileSize() throws IOException {
-                try (InputStream is = databaseInputStreamSupplier.get()) {
-                    long bytesRead = 0;
-                    do {
-                        final byte[] bytes = new byte[1 << 10];
-                        final int read = is.read(bytes);
-                        if (read == -1) break;
-                        bytesRead += read;
-                    } while (true);
-                    return bytesRead;
-                }
-            }
-
-            @Override
-            InputStream databaseInputStream() {
-                return databaseInputStreamSupplier.get();
-            }
+    private DatabaseReaderLazyLoader loader(final String databaseName, final AtomicBoolean closed) {
+        Path path = tmpDir.resolve(databaseName);
+        copyDatabase(databaseName, path);
 
+        final GeoIpCache cache = new GeoIpCache(1000);
+        return new DatabaseReaderLazyLoader(cache, path, null) {
             @Override
             protected void doClose() throws IOException {
                 if (closed != null) {

+ 69 - 0
modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/MMDBUtilTests.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.ingest.geoip;
+
+import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase;
+import static org.hamcrest.Matchers.endsWith;
+import static org.hamcrest.Matchers.hasLength;
+import static org.hamcrest.Matchers.is;
+
+public class MMDBUtilTests extends ESTestCase {
+
+    // a temporary directory that mmdb files can be copied to and read from
+    Path tmpDir;
+
+    @Before
+    public void setup() {
+        tmpDir = createTempDir();
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        IOUtils.rm(tmpDir);
+    }
+
+    public void testGetDatabaseTypeGeoIP2City() throws IOException {
+        Path database = tmpDir.resolve("GeoIP2-City.mmdb");
+        copyDatabase("GeoIP2-City-Test.mmdb", database);
+
+        String type = MMDBUtil.getDatabaseType(database);
+        assertThat(type, is("GeoIP2-City"));
+    }
+
+    public void testGetDatabaseTypeGeoLite2City() throws IOException {
+        Path database = tmpDir.resolve("GeoLite2-City.mmdb");
+        copyDatabase("GeoLite2-City-Test.mmdb", database);
+
+        String type = MMDBUtil.getDatabaseType(database);
+        assertThat(type, is("GeoLite2-City"));
+    }
+
+    public void testSmallFileWithALongDescription() throws IOException {
+        Path database = tmpDir.resolve("test-description.mmdb");
+        copyDatabase("test-description.mmdb", database);
+
+        // it was once the case that we couldn't read a database_type that was 29 characters or longer
+        String type = MMDBUtil.getDatabaseType(database);
+        assertThat(type, endsWith("long database_type"));
+        assertThat(type, hasLength(60)); // 60 is >= 29, ;)
+
+        // it was once the case that we couldn't process an mmdb that was smaller than 512 bytes
+        assertThat(Files.size(database), is(444L)); // 444 is <512
+    }
+}

BIN
modules/ingest-geoip/src/test/resources/test-description.mmdb