1
0
Эх сурвалжийг харах

Refactor FileSettingsService threading (#90417)

Use thread interruption rather than latches to signal the watcher thread on shutdown
General tidying up & simplifying the threading operations
Simon Cooper 3 жил өмнө
parent
commit
1bb34e0982

+ 105 - 118
server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java

@@ -36,8 +36,8 @@ import java.nio.file.WatchService;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
 import java.nio.file.attribute.FileTime;
 import java.time.Instant;
 import java.time.Instant;
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Consumer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 
 import static org.elasticsearch.xcontent.XContentType.JSON;
 import static org.elasticsearch.xcontent.XContentType.JSON;
 
 
@@ -64,12 +64,10 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
     private final NodeClient nodeClient;
     private final NodeClient nodeClient;
 
 
     private WatchService watchService; // null;
     private WatchService watchService; // null;
-    private CountDownLatch watcherThreadLatch;
-    private volatile CountDownLatch processingLatch;
-
-    private volatile FileUpdateState fileUpdateState = null;
-    private volatile WatchKey settingsDirWatchKey = null;
-    private volatile WatchKey configDirWatchKey = null;
+    private Thread watcherThread;
+    private FileUpdateState fileUpdateState;
+    private WatchKey settingsDirWatchKey;
+    private WatchKey configDirWatchKey;
 
 
     private volatile boolean active = false;
     private volatile boolean active = false;
     private volatile boolean initialState = true;
     private volatile boolean initialState = true;
@@ -103,7 +101,7 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
     }
     }
 
 
     public Path operatorSettingsFile() {
     public Path operatorSettingsFile() {
-        return operatorSettingsDir().resolve(SETTINGS_FILE_NAME);
+        return operatorSettingsDir.resolve(SETTINGS_FILE_NAME);
     }
     }
 
 
     // platform independent way to tell if a file changed
     // platform independent way to tell if a file changed
@@ -218,18 +216,7 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
     }
     }
 
 
     public boolean watching() {
     public boolean watching() {
-        return this.watchService != null;
-    }
-
-    private void cleanupWatchKeys() {
-        if (settingsDirWatchKey != null) {
-            settingsDirWatchKey.cancel();
-            settingsDirWatchKey = null;
-        }
-        if (configDirWatchKey != null) {
-            configDirWatchKey.cancel();
-            configDirWatchKey = null;
-        }
+        return watcherThread != null;
     }
     }
 
 
     synchronized void startWatcher(ClusterState clusterState, boolean onStartup) {
     synchronized void startWatcher(ClusterState clusterState, boolean onStartup) {
@@ -241,43 +228,46 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
 
 
         logger.info("starting file settings watcher ...");
         logger.info("starting file settings watcher ...");
 
 
-        Path settingsDir = operatorSettingsDir();
-
         /*
         /*
          * We essentially watch for two things:
          * We essentially watch for two things:
          *  - the creation of the operator directory (if it doesn't exist), symlink changes to the operator directory
          *  - the creation of the operator directory (if it doesn't exist), symlink changes to the operator directory
          *  - any changes to files inside the operator directory if it exists, filtering for settings.json
          *  - any changes to files inside the operator directory if it exists, filtering for settings.json
          */
          */
         try {
         try {
-            this.watchService = operatorSettingsDir().getParent().getFileSystem().newWatchService();
-            if (Files.exists(settingsDir)) {
+            Path settingsDirPath = operatorSettingsDir();
+            this.watchService = settingsDirPath.getParent().getFileSystem().newWatchService();
+            if (Files.exists(settingsDirPath)) {
                 Path settingsFilePath = operatorSettingsFile();
                 Path settingsFilePath = operatorSettingsFile();
                 if (Files.exists(settingsFilePath)) {
                 if (Files.exists(settingsFilePath)) {
                     logger.debug("found initial operator settings file [{}], applying...", settingsFilePath);
                     logger.debug("found initial operator settings file [{}], applying...", settingsFilePath);
                     // we make a distinction here for startup, so that if we had operator settings before the node started
                     // we make a distinction here for startup, so that if we had operator settings before the node started
                     // we would fail startup.
                     // we would fail startup.
-                    processFileSettings(settingsFilePath, (e) -> {
+                    try {
+                        processFileSettings(settingsFilePath).get();
+                    } catch (ExecutionException e) {
                         if (onStartup) {
                         if (onStartup) {
-                            throw new FileSettingsStartupException("Error applying operator settings", e);
+                            throw new FileSettingsStartupException("Error applying operator settings", e.getCause());
                         } else {
                         } else {
-                            logger.error("Error processing operator settings json file", e);
+                            logger.error("Error processing operator settings json file", e.getCause());
                         }
                         }
-                    }).await();
+                    }
                 }
                 }
-                settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDir);
+                settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDirPath);
             } else {
             } else {
-                logger.debug("operator settings directory [{}] not found, will watch for its creation...", settingsDir);
+                logger.debug("operator settings directory [{}] not found, will watch for its creation...", settingsDirPath);
             }
             }
             // We watch the config directory always, even if initially we had an operator directory
             // We watch the config directory always, even if initially we had an operator directory
             // it can be deleted and created later. The config directory never goes away, we only
             // it can be deleted and created later. The config directory never goes away, we only
             // register it once for watching.
             // register it once for watching.
-            configDirWatchKey = enableSettingsWatcher(configDirWatchKey, operatorSettingsDir().getParent());
+            configDirWatchKey = enableSettingsWatcher(configDirWatchKey, settingsDirPath.getParent());
         } catch (Exception e) {
         } catch (Exception e) {
             if (watchService != null) {
             if (watchService != null) {
                 try {
                 try {
-                    cleanupWatchKeys();
+                    // this will also close any keys
                     this.watchService.close();
                     this.watchService.close();
-                } catch (Exception ignore) {} finally {
+                } catch (Exception ce) {
+                    e.addSuppressed(ce);
+                } finally {
                     this.watchService = null;
                     this.watchService = null;
                 }
                 }
             }
             }
@@ -285,95 +275,94 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
             throw new IllegalStateException("unable to launch a new watch service", e);
             throw new IllegalStateException("unable to launch a new watch service", e);
         }
         }
 
 
-        this.watcherThreadLatch = new CountDownLatch(1);
-
-        new Thread(() -> {
-            try {
-                logger.info("file settings service up and running [tid={}]", Thread.currentThread().getId());
-
-                WatchKey key;
-                while ((key = watchService.take()) != null) {
-                    /**
-                     * Reading and interpreting watch service events can vary from platform to platform. E.g:
-                     * MacOS symlink delete and set (rm -rf operator && ln -s <path to>/file_settings/ operator):
-                     *     ENTRY_MODIFY:operator
-                     *     ENTRY_CREATE:settings.json
-                     *     ENTRY_MODIFY:settings.json
-                     * Linux in Docker symlink delete and set (rm -rf operator && ln -s <path to>/file_settings/ operator):
-                     *     ENTRY_CREATE:operator
-                     * Windows
-                     *     ENTRY_CREATE:operator
-                     *     ENTRY_MODIFY:operator
-                     * After we get an indication that something has changed, we check the timestamp, file id,
-                     * real path of our desired file.
-                     */
-                    if (Files.exists(settingsDir)) {
-                        try {
-                            Path path = operatorSettingsFile();
-
-                            if (logger.isDebugEnabled()) {
-                                key.pollEvents().stream().forEach(e -> logger.debug("{}:{}", e.kind().toString(), e.context().toString()));
-                            }
+        watcherThread = new Thread(this::watcherThread, "elasticsearch[file-settings-watcher]");
+        watcherThread.start();
+    }
 
 
+    private void watcherThread() {
+        try {
+            logger.info("file settings service up and running [tid={}]", Thread.currentThread().getId());
+
+            WatchKey key;
+            while ((key = watchService.take()) != null) {
+                /*
+                 * Reading and interpreting watch service events can vary from platform to platform. E.g:
+                 * MacOS symlink delete and set (rm -rf operator && ln -s <path to>/file_settings/ operator):
+                 *     ENTRY_MODIFY:operator
+                 *     ENTRY_CREATE:settings.json
+                 *     ENTRY_MODIFY:settings.json
+                 * Linux in Docker symlink delete and set (rm -rf operator && ln -s <path to>/file_settings/ operator):
+                 *     ENTRY_CREATE:operator
+                 * Windows
+                 *     ENTRY_CREATE:operator
+                 *     ENTRY_MODIFY:operator
+                 * After we get an indication that something has changed, we check the timestamp, file id,
+                 * real path of our desired file. We don't actually care what changed, we just re-check ourselves.
+                 */
+                Path settingsPath = operatorSettingsDir();
+                if (Files.exists(settingsPath)) {
+                    try {
+                        Path path = operatorSettingsFile();
+
+                        if (logger.isDebugEnabled()) {
+                            key.pollEvents().forEach(e -> logger.debug("{}:{}", e.kind().toString(), e.context().toString()));
+                        } else {
                             key.pollEvents();
                             key.pollEvents();
-                            key.reset();
-
-                            // We re-register the settings directory watch key, because we don't know
-                            // if the file name maps to the same native file system file id. Symlinks
-                            // are one potential cause of inconsistency here, since their handling by
-                            // the WatchService is platform dependent.
-                            settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsDir);
-
-                            if (watchedFileChanged(path)) {
-                                processingLatch = processFileSettings(
-                                    path,
-                                    (e) -> logger.error("Error processing operator settings json file", e)
-                                );
-                                // After we get and set the processing latch, we need to check if stop wasn't
-                                // invoked in the meantime. Stop will invalidate all watch keys.
-                                if (configDirWatchKey != null) {
-                                    processingLatch.await();
-                                }
-                            }
-                        } catch (IOException e) {
-                            logger.warn("encountered I/O error while watching file settings", e);
                         }
                         }
-                    } else {
-                        key.pollEvents();
                         key.reset();
                         key.reset();
+
+                        // We re-register the settings directory watch key, because we don't know
+                        // if the file name maps to the same native file system file id. Symlinks
+                        // are one potential cause of inconsistency here, since their handling by
+                        // the WatchService is platform dependent.
+                        settingsDirWatchKey = enableSettingsWatcher(settingsDirWatchKey, settingsPath);
+
+                        if (watchedFileChanged(path)) {
+                            try {
+                                processFileSettings(path).get();
+                            } catch (ExecutionException e) {
+                                logger.error("Error processing operator settings json file", e.getCause());
+                            }
+                        }
+                    } catch (IOException e) {
+                        logger.warn("encountered I/O error while watching file settings", e);
                     }
                     }
+                } else {
+                    key.pollEvents();
+                    key.reset();
                 }
                 }
-            } catch (ClosedWatchServiceException | InterruptedException expected) {
-                logger.info("shutting down watcher thread");
-            } catch (Exception e) {
-                logger.error("shutting down watcher thread with exception", e);
-            } finally {
-                watcherThreadLatch.countDown();
             }
             }
-        }, "elasticsearch[file-settings-watcher]").start();
+        } catch (ClosedWatchServiceException | InterruptedException expected) {
+            logger.info("shutting down watcher thread");
+        } catch (Exception e) {
+            logger.error("shutting down watcher thread with exception", e);
+        }
     }
     }
 
 
     synchronized void stopWatcher() {
     synchronized void stopWatcher() {
         logger.debug("stopping watcher ...");
         logger.debug("stopping watcher ...");
         if (watching()) {
         if (watching()) {
-            try {
-                // make sure the watcher thread hits the processing latch correctly
-                cleanupWatchKeys();
-                fileUpdateState = null;
-                watchService.close();
-                if (processingLatch != null) {
-                    processingLatch.countDown();
+            // make sure watch service is closed whatever
+            // this will also close any outstanding keys
+            try (var ws = watchService) {
+                watcherThread.interrupt();
+                watcherThread.join();
+
+                // make sure any keys are closed - if watchService.close() throws, it may not close the keys first
+                if (configDirWatchKey != null) {
+                    configDirWatchKey.cancel();
                 }
                 }
-                if (watcherThreadLatch != null) {
-                    watcherThreadLatch.await();
+                if (settingsDirWatchKey != null) {
+                    settingsDirWatchKey.cancel();
                 }
                 }
-                // the watcher thread might have snuck in behind us and re-created the settings watch again
-                cleanupWatchKeys();
             } catch (IOException e) {
             } catch (IOException e) {
                 logger.warn("encountered exception while closing watch service", e);
                 logger.warn("encountered exception while closing watch service", e);
             } catch (InterruptedException interruptedException) {
             } catch (InterruptedException interruptedException) {
                 logger.info("interrupted while closing the watch service", interruptedException);
                 logger.info("interrupted while closing the watch service", interruptedException);
             } finally {
             } finally {
+                watcherThread = null;
+                settingsDirWatchKey = null;
+                configDirWatchKey = null;
                 watchService = null;
                 watchService = null;
                 logger.info("watcher service stopped");
                 logger.info("watcher service stopped");
             }
             }
@@ -394,8 +383,8 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
         );
         );
     }
     }
 
 
-    CountDownLatch processFileSettings(Path path, Consumer<Exception> errorHandler) {
-        CountDownLatch waitForCompletion = new CountDownLatch(1);
+    CompletableFuture<Void> processFileSettings(Path path) {
+        CompletableFuture<Void> completion = new CompletableFuture<>();
         logger.info("processing path [{}] for [{}]", path, NAMESPACE);
         logger.info("processing path [{}] for [{}]", path, NAMESPACE);
         try (
         try (
             var fis = Files.newInputStream(path);
             var fis = Files.newInputStream(path);
@@ -412,22 +401,22 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
                         // stash the latest node infos response and continue with processing the file
                         // stash the latest node infos response and continue with processing the file
                         nodesInfoResponse = response;
                         nodesInfoResponse = response;
                         nodeInfosRefreshRequired = false;
                         nodeInfosRefreshRequired = false;
-                        stateService.process(NAMESPACE, parsedState, (e) -> completeProcessing(e, errorHandler, waitForCompletion));
+                        stateService.process(NAMESPACE, parsedState, (e) -> completeProcessing(e, completion));
                     }
                     }
 
 
                     @Override
                     @Override
                     public void onFailure(Exception e) {
                     public void onFailure(Exception e) {
-                        completeProcessing(e, errorHandler, waitForCompletion);
+                        completion.completeExceptionally(e);
                     }
                     }
                 });
                 });
             } else {
             } else {
-                stateService.process(NAMESPACE, parsedState, (e) -> completeProcessing(e, errorHandler, waitForCompletion));
+                stateService.process(NAMESPACE, parsedState, (e) -> completeProcessing(e, completion));
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
-            completeProcessing(e, errorHandler, waitForCompletion);
+            completion.completeExceptionally(e);
         }
         }
 
 
-        return waitForCompletion;
+        return completion;
     }
     }
 
 
     // package private for testing, separate method so that it can be mocked in tests
     // package private for testing, separate method so that it can be mocked in tests
@@ -435,13 +424,11 @@ public class FileSettingsService extends AbstractLifecycleComponent implements C
         return nodeClient.admin().cluster();
         return nodeClient.admin().cluster();
     }
     }
 
 
-    private void completeProcessing(Exception e, Consumer<Exception> errorHandler, CountDownLatch completionLatch) {
-        try {
-            if (e != null) {
-                errorHandler.accept(e);
-            }
-        } finally {
-            completionLatch.countDown();
+    private void completeProcessing(Exception e, CompletableFuture<Void> completion) {
+        if (e != null) {
+            completion.completeExceptionally(e);
+        } else {
+            completion.complete(null);
         }
         }
     }
     }
 
 

+ 15 - 15
server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java

@@ -46,6 +46,7 @@ import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.ZoneOffset;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
@@ -192,10 +193,10 @@ public class FileSettingsServiceTests extends ESTestCase {
         FileSettingsService service = spy(fileSettingsService);
         FileSettingsService service = spy(fileSettingsService);
         CountDownLatch processFileLatch = new CountDownLatch(1);
         CountDownLatch processFileLatch = new CountDownLatch(1);
 
 
-        doAnswer((Answer<Void>) invocation -> {
+        doAnswer((Answer<CompletableFuture<Void>>) invocation -> {
             processFileLatch.countDown();
             processFileLatch.countDown();
-            return null;
-        }).when(service).processFileSettings(any(), any());
+            return CompletableFuture.completedFuture(null);
+        }).when(service).processFileSettings(any());
 
 
         service.start();
         service.start();
         assertTrue(service.watching());
         assertTrue(service.watching());
@@ -209,7 +210,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         processFileLatch.await(30, TimeUnit.SECONDS);
         processFileLatch.await(30, TimeUnit.SECONDS);
 
 
         verify(service, Mockito.atLeast(1)).watchedFileChanged(any());
         verify(service, Mockito.atLeast(1)).watchedFileChanged(any());
-        verify(service, times(1)).processFileSettings(any(), any());
+        verify(service, times(1)).processFileSettings(any());
 
 
         service.stop();
         service.stop();
         assertFalse(service.watching());
         assertFalse(service.watching());
@@ -245,7 +246,7 @@ public class FileSettingsServiceTests extends ESTestCase {
             )
             )
         );
         );
 
 
-        verify(service, times(1)).processFileSettings(any(), any());
+        verify(service, times(1)).processFileSettings(any());
 
 
         service.stop();
         service.stop();
 
 
@@ -260,7 +261,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         service.start();
         service.start();
         service.startWatcher(clusterService.state(), true);
         service.startWatcher(clusterService.state(), true);
 
 
-        verify(service, times(1)).processFileSettings(any(), any());
+        verify(service, times(1)).processFileSettings(any());
 
 
         service.stop();
         service.stop();
         service.close();
         service.close();
@@ -321,13 +322,12 @@ public class FileSettingsServiceTests extends ESTestCase {
         CountDownLatch deadThreadLatch = new CountDownLatch(1);
         CountDownLatch deadThreadLatch = new CountDownLatch(1);
 
 
         doAnswer((Answer<ReservedStateChunk>) invocation -> {
         doAnswer((Answer<ReservedStateChunk>) invocation -> {
-            processFileLatch.countDown();
             // allow the other thread to continue, but hold on a bit to avoid
             // allow the other thread to continue, but hold on a bit to avoid
-            // setting the count-down latch in the main watcher loop.
+            // completing the task immediately in the main watcher loop
             Thread.sleep(1_000);
             Thread.sleep(1_000);
+            processFileLatch.countDown();
             new Thread(() -> {
             new Thread(() -> {
-                // Simulate a thread that never comes back and decrements the
-                // countdown latch in FileSettingsService.processFileSettings
+                // Simulate a thread that never allows the completion to complete
                 try {
                 try {
                     deadThreadLatch.await();
                     deadThreadLatch.await();
                 } catch (InterruptedException e) {
                 } catch (InterruptedException e) {
@@ -349,7 +349,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         // on Linux is instantaneous. Windows is instantaneous too.
         // on Linux is instantaneous. Windows is instantaneous too.
         assertTrue(processFileLatch.await(30, TimeUnit.SECONDS));
         assertTrue(processFileLatch.await(30, TimeUnit.SECONDS));
 
 
-        // Stopping the service should interrupt the watcher thread, we should be able to stop
+        // Stopping the service should interrupt the watcher thread, allowing the whole thing to exit
         service.stop();
         service.stop();
         assertFalse(service.watching());
         assertFalse(service.watching());
         service.close();
         service.close();
@@ -389,7 +389,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         assertNull(service.nodeInfos());
         assertNull(service.nodeInfos());
 
 
         // call the processing twice
         // call the processing twice
-        service.processFileSettings(service.operatorSettingsFile(), (e) -> {
+        service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
             if (e != null) {
             if (e != null) {
                 fail("shouldn't get an exception");
                 fail("shouldn't get an exception");
             }
             }
@@ -397,7 +397,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         // after the first processing we should have node infos
         // after the first processing we should have node infos
         assertEquals(1, service.nodeInfos().getNodes().size());
         assertEquals(1, service.nodeInfos().getNodes().size());
 
 
-        service.processFileSettings(service.operatorSettingsFile(), (e) -> {
+        service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
             if (e != null) {
             if (e != null) {
                 fail("shouldn't get an exception");
                 fail("shouldn't get an exception");
             }
             }
@@ -450,7 +450,7 @@ public class FileSettingsServiceTests extends ESTestCase {
         assertEquals(1, service.nodeInfos().getNodes().size());
         assertEquals(1, service.nodeInfos().getNodes().size());
 
 
         // call the processing twice
         // call the processing twice
-        service.processFileSettings(service.operatorSettingsFile(), (e) -> {
+        service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
             if (e != null) {
             if (e != null) {
                 fail("shouldn't get an exception");
                 fail("shouldn't get an exception");
             }
             }
@@ -458,7 +458,7 @@ public class FileSettingsServiceTests extends ESTestCase {
 
 
         assertEquals(2, service.nodeInfos().getNodes().size());
         assertEquals(2, service.nodeInfos().getNodes().size());
 
 
-        service.processFileSettings(service.operatorSettingsFile(), (e) -> {
+        service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
             if (e != null) {
             if (e != null) {
                 fail("shouldn't get an exception");
                 fail("shouldn't get an exception");
             }
             }