Browse Source

Ensure config reload on ..data symlink switch for CSI driver support (#127628)

* Reprocess all changed files in settings dir if symlink dir is updated
Johannes Fredén 5 months ago
parent
commit
cc5aa91119

+ 5 - 0
docs/changelog/127628.yaml

@@ -0,0 +1,5 @@
+pr: 127628
+summary: Ensure config reload on ..data symlink switch for CSI driver support
+area: Infra/Settings
+type: enhancement
+issues: []

+ 36 - 3
server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

@@ -44,7 +44,6 @@ import java.util.stream.Stream;
 import static org.elasticsearch.health.HealthStatus.YELLOW;
 import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
 import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
-import static org.elasticsearch.test.NodeRoles.dataNode;
 import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
 import static org.elasticsearch.test.NodeRoles.masterNode;
 import static org.hamcrest.Matchers.allOf;
@@ -139,6 +138,11 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
 
     public static void writeJSONFile(String node, String json, Logger logger, Long version) throws Exception {
         FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
+        writeJSONFile(node, json, logger, version, fileSettingsService.watchedFile());
+    }
+
+    public static void writeJSONFile(String node, String json, Logger logger, Long version, Path targetPath) throws Exception {
+        FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
 
         Files.createDirectories(fileSettingsService.watchedFileDir());
         Path tempFilePath = createTempFile();
@@ -152,8 +156,8 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
         do {
             try {
                 // this can fail on Windows because of timing
-                Files.move(tempFilePath, fileSettingsService.watchedFile(), StandardCopyOption.ATOMIC_MOVE);
-                logger.info("--> after writing JSON config to node {} with path {}", node, tempFilePath);
+                Files.move(tempFilePath, targetPath, StandardCopyOption.ATOMIC_MOVE);
+                logger.info("--> after writing JSON config to node {} with path {}", node, targetPath);
                 return;
             } catch (IOException e) {
                 logger.info("--> retrying writing a settings file [{}]", retryCount);
@@ -503,6 +507,35 @@ public class FileSettingsServiceIT extends ESIntegTestCase {
         assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
     }
 
+    public void testSymlinkUpdateTriggerReload() throws Exception {
+        internalCluster().setBootstrapMasterNodeIndex(0);
+        final String masterNode = internalCluster().startMasterOnlyNode();
+        FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
+        Path baseDir = masterFileSettingsService.watchedFileDir();
+        assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
+
+        {
+            var savedClusterState = setupClusterStateListener(masterNode);
+            // Create the settings.json as a symlink to simulate k8 setup
+            // settings.json -> ..data/settings.json
+            // ..data -> ..TIMESTAMP_TEMP_FOLDER_1
+            var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_1"));
+            writeJSONFile(masterNode, testJSON, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
+            var dataDir = Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
+            Files.createSymbolicLink(baseDir.resolve("settings.json"), dataDir.getFileName().resolve("settings.json"));
+            assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "50mb");
+        }
+        {
+            var savedClusterState = setupClusterStateListener(masterNode);
+            // Update ..data symlink to ..data -> ..TIMESTAMP_TEMP_FOLDER_2 to simulate kubernetes secret update
+            var fileDir = Files.createDirectories(baseDir.resolve("..TIMESTAMP_TEMP_FOLDER_2"));
+            writeJSONFile(masterNode, testJSON43mb, logger, versionCounter.incrementAndGet(), fileDir.resolve("settings.json"));
+            Files.deleteIfExists(baseDir.resolve("..data"));
+            Files.createSymbolicLink(baseDir.resolve("..data"), fileDir.getFileName());
+            assertClusterStateSaveOK(savedClusterState.v1(), savedClusterState.v2(), "43mb");
+        }
+    }
+
     public void testHealthIndicatorWithSingleNode() throws Exception {
         internalCluster().setBootstrapMasterNodeIndex(0);
         logger.info("--> start the node");

+ 25 - 17
server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java

@@ -27,11 +27,11 @@ import java.nio.file.WatchService;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -237,14 +237,16 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
                 key.reset();
 
                 if (key == settingsDirWatchKey) {
-                    // there may be multiple events for the same file - we only want to re-read once
-                    Set<Path> processedFiles = new HashSet<>();
-                    for (WatchEvent<?> e : events) {
-                        Path fullFile = settingsDir.resolve(e.context().toString());
-                        if (processedFiles.add(fullFile)) {
-                            if (fileChanged(fullFile)) {
-                                process(fullFile);
-                            }
+                    Set<Path> changedPaths = events.stream()
+                        .map(event -> settingsDir.resolve(event.context().toString()))
+                        .collect(Collectors.toSet());
+                    for (var changedPath : changedPaths) {
+                        // If a symlinked dir changed in the settings dir, it could be linked to other symlinks, so reprocess all files
+                        if (filesIsDirectory(changedPath) && filesIsSymbolicLink(changedPath)) {
+                            reprocessAllChangedFilesInSettingsDir();
+                            break;
+                        } else if (fileChanged(changedPath)) {
+                            process(changedPath);
                         }
                     }
                 } else if (key == configDirWatchKey) {
@@ -257,14 +259,7 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
                         settingsDirWatchKey = enableDirectoryWatcher(settingsDirWatchKey, settingsDir);
 
                         // re-read the settings directory, and ping for any changes
-                        try (Stream<Path> files = filesList(settingsDir)) {
-                            for (var f = files.iterator(); f.hasNext();) {
-                                Path file = f.next();
-                                if (fileChanged(file)) {
-                                    process(file);
-                                }
-                            }
-                        }
+                        reprocessAllChangedFilesInSettingsDir();
                     } else if (settingsDirWatchKey != null) {
                         settingsDirWatchKey.cancel();
                     }
@@ -279,6 +274,17 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
         }
     }
 
+    private void reprocessAllChangedFilesInSettingsDir() throws IOException, InterruptedException {
+        try (Stream<Path> files = filesList(settingsDir)) {
+            for (var f = files.iterator(); f.hasNext();) {
+                Path file = f.next();
+                if (fileChanged(file)) {
+                    process(file);
+                }
+            }
+        }
+    }
+
     protected final synchronized void stopWatcher() {
         if (watching()) {
             logger.debug("stopping watcher ...");
@@ -378,6 +384,8 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
 
     protected abstract boolean filesIsDirectory(Path path);
 
+    protected abstract boolean filesIsSymbolicLink(Path path);
+
     protected abstract <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException;
 
     protected abstract Stream<Path> filesList(Path dir) throws IOException;

+ 5 - 0
server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java

@@ -467,6 +467,11 @@ public class FileSettingsService extends MasterNodeFileWatchingService implement
         return Files.isDirectory(path);
     }
 
+    @Override
+    protected boolean filesIsSymbolicLink(Path path) {
+        return Files.isSymbolicLink(path);
+    }
+
     @Override
     protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
         return Files.readAttributes(path, clazz);

+ 5 - 0
server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java

@@ -98,6 +98,11 @@ public class AbstractFileWatchingServiceTests extends ESTestCase {
             return Files.isDirectory(path);
         }
 
+        @Override
+        protected boolean filesIsSymbolicLink(Path path) {
+            return Files.isSymbolicLink(path);
+        }
+
         @Override
         protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
             return Files.readAttributes(path, clazz);

+ 5 - 0
server/src/test/java/org/elasticsearch/common/file/MasterNodeFileWatchingServiceTests.java

@@ -80,6 +80,11 @@ public class MasterNodeFileWatchingServiceTests extends ESTestCase {
                 return Files.isDirectory(path);
             }
 
+            @Override
+            protected boolean filesIsSymbolicLink(Path path) {
+                return Files.isSymbolicLink(path);
+            }
+
             @Override
             protected <A extends BasicFileAttributes> A filesReadAttributes(Path path, Class<A> clazz) throws IOException {
                 return Files.readAttributes(path, clazz);