Ver código fonte

Add optional content checking to ResourceWatcher (#79423)

In some cloud environments, there may be frequent synchronization of
configuration files from the orchestration layer to the ES container.

This can trigger frequent, unnecessary reloading of files.

Previously, code that used the ResourceWatcherService / FileWatcher
would need to detect "no-op" file changes itself. With the addition of
this content checking support, it can be handled efficiently by the
Resource Watcher Service.
Tim Vernum 4 anos atrás
pai
commit
f7454b8fb5

+ 17 - 0
server/src/main/java/org/elasticsearch/common/hash/MessageDigests.java

@@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.bytes.BytesReference;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Objects;
@@ -24,6 +25,8 @@ import java.util.Objects;
  */
 public final class MessageDigests {
 
+    static final int STREAM_DIGEST_BLOCK_SIZE = 1024;
+
     private static ThreadLocal<MessageDigest> createThreadLocalMessageDigest(String digest) {
         return ThreadLocal.withInitial(() -> {
             try {
@@ -142,4 +145,18 @@ public final class MessageDigests {
         return digest.digest();
     }
 
+    /**
+     * Reads bytes from the stream and updates the given digest. Returns the result of the digest.
+     * @return digest result
+     */
+    public static byte[] digest(InputStream stream, MessageDigest digest) throws IOException {
+        byte[] block = new byte[STREAM_DIGEST_BLOCK_SIZE];
+        int len = stream.read(block);
+        while (len > 0) {
+            digest.update(block, 0, len);
+            len = stream.read(block);
+        }
+        return digest.digest();
+    }
+
 }

+ 41 - 1
server/src/main/java/org/elasticsearch/watcher/FileWatcher.java

@@ -9,6 +9,7 @@ package org.elasticsearch.watcher;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.common.hash.MessageDigests;
 import org.elasticsearch.common.io.FileSystemUtils;
 import org.elasticsearch.common.util.CollectionUtils;
 
@@ -27,6 +28,7 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
 
     private FileObserver rootFileObserver;
     private final Path path;
+    private final boolean checkFileContents;
 
     private static final Logger logger = LogManager.getLogger(FileWatcher.class);
 
@@ -35,7 +37,19 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
      * @param path the directory to watch
      */
     public FileWatcher(Path path) {
+        this(path, false);
+    }
+
+    /**
+     * Creates new file watcher on the given directory
+     * @param path the directory to watch
+     * @param checkFileContents whether to inspect the content of the file for changes (via a message digest)
+     *                          - this is a "best efforts" check and will err on the side of sending extra change notifications if the file
+     *                          <em>might</em> have changed.
+     */
+    public FileWatcher(Path path, boolean checkFileContents) {
         this.path = path;
+        this.checkFileContents = checkFileContents;
         rootFileObserver = new FileObserver(path);
     }
 
@@ -65,11 +79,13 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
 
     private class FileObserver {
         private final Path path;
+
         private boolean exists;
         private long length;
         private long lastModified;
         private boolean isDirectory;
         private FileObserver[] children;
+        private byte[] digest;
 
         FileObserver(Path path) {
             this.path = path;
@@ -80,6 +96,7 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
             boolean prevIsDirectory = isDirectory;
             long prevLength = length;
             long prevLastModified = lastModified;
+            byte[] prevDigest = digest;
 
             exists = Files.exists(path);
             // TODO we might use the new NIO2 API to get real notification?
@@ -119,7 +136,14 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
                         } else {
                             // Remained file
                             if (prevLastModified != lastModified || prevLength != length) {
-                                onFileChanged();
+                                if (checkFileContents) {
+                                    digest = calculateDigest();
+                                    if (digest == null || Arrays.equals(prevDigest, digest) == false) {
+                                        onFileChanged();
+                                    }
+                                } else {
+                                    onFileChanged();
+                                }
                             }
                         }
                     }
@@ -144,6 +168,19 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
 
         }
 
+        private byte[] calculateDigest() {
+            try (var in = Files.newInputStream(path)) {
+                return MessageDigests.digest(in, MessageDigests.md5());
+            } catch (IOException e) {
+                logger.warn(
+                    "failed to read file [{}] while checking for file changes [{}], will assuming file has been modified",
+                    path,
+                    e.toString()
+                );
+                return null;
+            }
+        }
+
         private void init(boolean initial) throws IOException {
             exists = Files.exists(path);
             if (exists) {
@@ -154,6 +191,9 @@ public class FileWatcher extends AbstractResourceWatcher<FileChangesListener> {
                 } else {
                     length = attributes.size();
                     lastModified = attributes.lastModifiedTime().toMillis();
+                    if (checkFileContents) {
+                        digest = calculateDigest();
+                    }
                     onFileCreated(initial);
                 }
             }

+ 68 - 3
server/src/test/java/org/elasticsearch/common/hash/MessageDigestsTests.java

@@ -8,16 +8,35 @@
 
 package org.elasticsearch.common.hash;
 
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.test.ESTestCase;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasLength;
+
 public class MessageDigestsTests extends ESTestCase {
-    private void assertHash(String expected, String test, MessageDigest messageDigest) {
-        String actual = MessageDigests.toHexString(messageDigest.digest(test.getBytes(StandardCharsets.UTF_8)));
-        assertEquals(expected, actual);
+
+    private void assertHexString(String expected, byte[] bytes) {
+        final String actualDirect = MessageDigests.toHexString(bytes);
+        assertThat(actualDirect, equalTo(expected));
+    }
+
+    private void assertHash(String expected, String test, MessageDigest messageDigest) throws IOException {
+        final byte[] testBytes = test.getBytes(StandardCharsets.UTF_8);
+
+        assertHexString(expected, messageDigest.digest(testBytes));
+        assertHexString(expected, MessageDigests.digest(new BytesArray(testBytes), messageDigest));
+        try (var in = new ByteArrayInputStream(testBytes)) {
+            assertHexString(expected, MessageDigests.digest(in, messageDigest));
+        }
     }
 
     public void testMd5() throws Exception {
@@ -66,4 +85,50 @@ public class MessageDigestsTests extends ESTestCase {
         BigInteger actual = new BigInteger(hex, 16);
         assertEquals(expected, actual);
     }
+
+    public void testDigestFromStreamWithMultipleBlocks() throws Exception {
+        final String longString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".repeat(1000);
+        assertThat(longString, hasLength(26_000));
+
+        try (InputStream in = getInputStream(longString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.md5());
+            assertThat(MessageDigests.toHexString(md5), equalTo("5c48e92239a655cfe1762851c6708ddb"));
+        }
+        try (InputStream in = getInputStream(longString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha1());
+            assertThat(MessageDigests.toHexString(md5), equalTo("e363dfc35f4d170906aafcbb6b1f6fd1ae854808"));
+        }
+        try (InputStream in = getInputStream(longString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha256());
+            assertThat(MessageDigests.toHexString(md5), equalTo("e59a4d700410ce60f912bd6e5b24f77230cbc68b27838c5a9c06daef94737a8a"));
+        }
+    }
+
+    public void testDigestFromStreamWithExactlyOneBlock() throws Exception {
+        final String blockString = "ABCDEFGHIJKLMNOP".repeat(64);
+        assertThat(blockString, hasLength(MessageDigests.STREAM_DIGEST_BLOCK_SIZE));
+
+        try (InputStream in = getInputStream(blockString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.md5());
+            assertThat(MessageDigests.toHexString(md5), equalTo("2eda00073add15c6ee5c848797f8c0f4"));
+        }
+        try (InputStream in = getInputStream(blockString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha1());
+            assertThat(MessageDigests.toHexString(md5), equalTo("bb8275d97cb190cb02fd2c03e9bba2279955ace3"));
+        }
+        try (InputStream in = getInputStream(blockString)) {
+            final byte[] md5 = MessageDigests.digest(in, MessageDigests.sha256());
+            assertThat(MessageDigests.toHexString(md5), equalTo("36350546f9cc3cbd56d3b655ecae0e4281909d510687635b900ea7650976eb3b"));
+        }
+    }
+
+    private InputStream getInputStream(String str) {
+        InputStream in = randomBoolean()
+            ? new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8))
+            : new BytesArray(str).streamInput();
+        if (randomBoolean()) {
+            in = new BufferedInputStream(in);
+        }
+        return in;
+    }
 }

+ 147 - 0
server/src/test/java/org/elasticsearch/watcher/FileWatcherTests.java

@@ -14,13 +14,17 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 
@@ -101,10 +105,62 @@ public class FileWatcherTests extends ESTestCase {
         fileWatcher.checkAndNotify();
         assertThat(changes.notifications(), hasSize(0));
 
+        // change modification date, but not contents [we set the time in the future to guarantee a change]
+        Files.setLastModifiedTime(testFile, FileTime.fromMillis(System.currentTimeMillis() + 1));
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileChanged: test.txt")));
+
+        changes.notifications().clear();
         Files.delete(testFile);
         fileWatcher.checkAndNotify();
         assertThat(changes.notifications(), contains(equalTo("onFileDeleted: test.txt")));
+    }
+
+    public void testSimpleFileOperationsWithContentChecking() throws IOException {
+        Path tempDir = createTempDir();
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        Path testFile = tempDir.resolve("test.txt");
+        touch(testFile);
+        FileWatcher fileWatcher = new FileWatcher(testFile, true);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(equalTo("onFileInit: test.txt")));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), empty());
+
+        append("Test", testFile, Charset.defaultCharset());
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileChanged: test.txt")));
+
+        changes.notifications().clear();
+
+        // change modification date, but not contents
+        Files.setLastModifiedTime(testFile, FileTime.fromMillis(System.currentTimeMillis() + 1));
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), empty());
+
+        changes.notifications().clear();
+
+        // change modification date again, but not contents
+        Files.setLastModifiedTime(testFile, FileTime.fromMillis(System.currentTimeMillis() + 2));
+        fileWatcher.checkAndNotify();
+        // This will not trigger a notification because the hash was calculated last time
+        assertThat(changes.notifications(), empty());
+
+        // Change file length without changing modification time
+        final FileTime modifiedTime = Files.getLastModifiedTime(testFile);
+        append("Modified", testFile, Charset.defaultCharset());
+        Files.setLastModifiedTime(testFile, modifiedTime);
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileChanged: test.txt")));
 
+        changes.notifications().clear();
+        Files.delete(testFile);
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(equalTo("onFileDeleted: test.txt")));
     }
 
     public void testSimpleDirectoryOperations() throws IOException {
@@ -196,6 +252,97 @@ public class FileWatcherTests extends ESTestCase {
 
     }
 
+    public void testSimpleDirectoryOperationsWithContentChecking() throws IOException {
+        final long startTime = System.currentTimeMillis();
+
+        Path tempDir = createTempDir();
+        RecordingChangeListener changes = new RecordingChangeListener(tempDir);
+        Path testDir = tempDir.resolve("test-dir");
+        Files.createDirectories(testDir);
+        for (String fileName : List.of("test1.txt", "test2.txt", "test3.txt", "test4.txt")) {
+            Files.writeString(testDir.resolve(fileName), "initial", StandardCharsets.UTF_8);
+        }
+
+        FileWatcher fileWatcher = new FileWatcher(testDir, true);
+        fileWatcher.addListener(changes);
+        fileWatcher.init();
+        assertThat(changes.notifications(), contains(
+                equalTo("onDirectoryInit: test-dir/"),
+                equalTo("onFileInit: test-dir/test1.txt"),
+                equalTo("onFileInit: test-dir/test2.txt"),
+                equalTo("onFileInit: test-dir/test3.txt"),
+                equalTo("onFileInit: test-dir/test4.txt")
+        ));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Modify the length of file #1
+        append("Test-1", testDir.resolve("test1.txt"), Charset.defaultCharset());
+
+        // Change lastModified on file #2 (set it to before this test started so there's no chance of accidental matching)
+        // However the contents haven't changed, so it won't be notified
+        Files.setLastModifiedTime(testDir.resolve("test2.txt"), FileTime.fromMillis(startTime - 100));
+
+        // Add a new file
+        Files.writeString(testDir.resolve("test5.txt"), "abc", StandardCharsets.UTF_8);
+
+        fileWatcher.checkAndNotify();
+        assertThat(
+            changes.notifications(),
+            containsInAnyOrder(
+                "onFileChanged: test-dir/test1.txt",
+                "onFileCreated: test-dir/test5.txt"
+            )
+        );
+
+        // Change file #2 but don't change the size
+        Files.writeString(testDir.resolve("test2.txt"), "changed", StandardCharsets.UTF_8);
+        // Change lastModified on file #3 (newer than the last update, but still before the test started)
+        // But no change to contents, so no notification
+        Files.setLastModifiedTime(testDir.resolve("test3.txt"), FileTime.fromMillis(startTime - 50));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), containsInAnyOrder(
+            equalTo("onFileChanged: test-dir/test2.txt")
+        ));
+
+        // change the contents of files #2 (change in size) and #3 (same size)
+        Files.writeString(testDir.resolve("test2.txt"), "new contents", StandardCharsets.UTF_8);
+        Files.writeString(testDir.resolve("test3.txt"), "updated", StandardCharsets.UTF_8);
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), containsInAnyOrder(
+            equalTo("onFileChanged: test-dir/test2.txt"),
+            equalTo("onFileChanged: test-dir/test3.txt")
+        ));
+
+        // Change lastModified on files #2 & #3, but not the contents
+        Files.setLastModifiedTime(testDir.resolve("test2.txt"), FileTime.fromMillis(System.currentTimeMillis() + 3));
+        Files.setLastModifiedTime(testDir.resolve("test3.txt"), FileTime.fromMillis(System.currentTimeMillis() + 3));
+
+        changes.notifications().clear();
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Do nothing
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), hasSize(0));
+
+        // Delete files
+        Files.delete(testDir.resolve("test1.txt"));
+        Files.delete(testDir.resolve("test2.txt"));
+
+        fileWatcher.checkAndNotify();
+        assertThat(changes.notifications(), contains(
+            equalTo("onFileDeleted: test-dir/test1.txt"),
+            equalTo("onFileDeleted: test-dir/test2.txt")
+        ));
+    }
+
     public void testNestedDirectoryOperations() throws IOException {
         Path tempDir = createTempDir();
         RecordingChangeListener changes = new RecordingChangeListener(tempDir);

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/operator/FileOperatorUsersStore.java

@@ -54,7 +54,7 @@ public class FileOperatorUsersStore {
     public FileOperatorUsersStore(Environment env, ResourceWatcherService watcherService) {
         this.file =  XPackPlugin.resolveConfigFile(env, "operator_users.yml");
         this.operatorUsersDescriptor = parseFile(this.file, logger);
-        FileWatcher watcher = new FileWatcher(file.getParent());
+        FileWatcher watcher = new FileWatcher(file.getParent(), true);
         watcher.addListener(new FileOperatorUsersStore.FileListener());
         try {
             watcherService.add(watcher, ResourceWatcherService.Frequency.HIGH);