Selaa lähdekoodia

Watcher: Ensure trigger service pauses execution (#30363)

When the watcher service pauses execution due to a cluster state update,
the trigger service and its engines also need to pause properly instead
of keeping going. This is also important when the .watches index is 
deleted, so that watches don't stay in a triggered mode.
Alexander Reelsen 7 vuotta sitten
vanhempi
commit
0e6cbbd811

+ 2 - 0
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

@@ -167,6 +167,7 @@ public class WatcherService extends AbstractComponent {
     void stopExecutor() {
     void stopExecutor() {
         ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
         ThreadPool.terminate(executor, 10L, TimeUnit.SECONDS);
     }
     }
+
     /**
     /**
      * Reload the watcher service, does not switch the state from stopped to started, just keep going
      * Reload the watcher service, does not switch the state from stopped to started, just keep going
      * @param state cluster state, which is needed to find out about local shards
      * @param state cluster state, which is needed to find out about local shards
@@ -231,6 +232,7 @@ public class WatcherService extends AbstractComponent {
      * manual watch execution, i.e. via the execute watch API
      * manual watch execution, i.e. via the execute watch API
      */
      */
     public void pauseExecution(String reason) {
     public void pauseExecution(String reason) {
+        triggerService.pauseExecution();
         int cancelledTaskCount = executionService.pause();
         int cancelledTaskCount = executionService.pause();
         logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
         logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
     }
     }

+ 34 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

@@ -43,10 +43,14 @@ import org.elasticsearch.search.SearchShardTarget;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.watcher.trigger.Trigger;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
 import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
 import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
+import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
 import org.elasticsearch.xpack.watcher.execution.ExecutionService;
 import org.elasticsearch.xpack.watcher.execution.ExecutionService;
 import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
 import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
+import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput;
+import org.elasticsearch.xpack.watcher.trigger.TriggerEngine;
 import org.elasticsearch.xpack.watcher.trigger.TriggerService;
 import org.elasticsearch.xpack.watcher.trigger.TriggerService;
 import org.elasticsearch.xpack.watcher.watch.WatchParser;
 import org.elasticsearch.xpack.watcher.watch.WatchParser;
 import org.joda.time.DateTime;
 import org.joda.time.DateTime;
@@ -204,6 +208,36 @@ public class WatcherServiceTests extends ESTestCase {
         assertThat(watches, hasSize(activeWatchCount));
         assertThat(watches, hasSize(activeWatchCount));
     }
     }
 
 
+    public void testPausingWatcherServiceAlsoPausesTriggerService() {
+        String engineType = "foo";
+        TriggerEngine triggerEngine = mock(TriggerEngine.class);
+        when(triggerEngine.type()).thenReturn(engineType);
+        TriggerService triggerService = new TriggerService(Settings.EMPTY, Collections.singleton(triggerEngine));
+
+        Trigger trigger = mock(Trigger.class);
+        when(trigger.type()).thenReturn(engineType);
+
+        Watch watch = mock(Watch.class);
+        when(watch.trigger()).thenReturn(trigger);
+        when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
+        ExecutableNoneInput noneInput = new ExecutableNoneInput(logger);
+        when(watch.input()).thenReturn(noneInput);
+
+        triggerService.add(watch);
+        assertThat(triggerService.count(), is(1L));
+
+        WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
+            mock(ExecutionService.class), mock(WatchParser.class), mock(Client.class), executorService) {
+            @Override
+            void stopExecutor() {
+            }
+        };
+
+        service.pauseExecution("pausing");
+        assertThat(triggerService.count(), is(0L));
+        verify(triggerEngine).pauseExecution();
+    }
+
     private static DiscoveryNode newNode() {
     private static DiscoveryNode newNode() {
         return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
         return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
                 new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
                 new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);