浏览代码

Watcher dont add watches post index if stopped (#56556)

Watcher adds watches to the trigger service on the postIndex action
for the .watches index. This has the (intentional) side effect of also
adding the watches to the stats. The tests rely on these stats for their
assertions. The tests also start and stop Watcher between each test for
a clean slate.

When Watcher executes it updates the .watches index and upon this update
it will go through the postIndex method and end up added that watch to the
trigger service (and stats). Functionally this is not a problem, if Watcher
is stopping or stopped since Watcher is also paused and will not execute
the watch. However, with specific timing and expectations of a clean slate
can cause issues the test assertions against the stats.

This commit ensures that the postIndex action only adds to the trigger service
if the Watcher state is not stopping or stopped. When started back up it will
re-read index .watches.

This commit also un-mutes the tests related to #53177 and #56534
Jake Landis 5 年之前
父节点
当前提交
5aa36efa2d

+ 0 - 2
x-pack/plugin/watcher/qa/rest/src/test/java/org/elasticsearch/smoketest/WatcherYamlRestIT.java

@@ -5,14 +5,12 @@
  */
 package org.elasticsearch.smoketest;
 
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
 import org.elasticsearch.xpack.watcher.WatcherYamlSuiteTestCase;
 
 /**
  * Runs the YAML rest tests against an external cluster
  */
-@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53177")
 public class WatcherYamlRestIT extends WatcherYamlSuiteTestCase {
     public WatcherYamlRestIT(ClientYamlTestCandidate testCandidate) {
         super(testCandidate);

+ 0 - 2
x-pack/plugin/watcher/qa/with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java

@@ -7,7 +7,6 @@ package org.elasticsearch.smoketest;
 
 import com.carrotsearch.randomizedtesting.annotations.Name;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.settings.Settings;
@@ -19,7 +18,6 @@ import org.junit.Before;
 
 import static org.elasticsearch.xpack.test.SecuritySettingsSourceField.basicAuthHeaderValue;
 
-@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53177")
 public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends WatcherYamlSuiteTestCase {
 
     private static final String TEST_ADMIN_USERNAME = "test_admin";

+ 1 - 1
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

@@ -423,7 +423,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
         final WatcherLifeCycleService watcherLifeCycleService =
                 new WatcherLifeCycleService(clusterService, watcherService);
 
-        listener = new WatcherIndexingListener(watchParser, getClock(), triggerService);
+        listener = new WatcherIndexingListener(watchParser, getClock(), triggerService, watcherLifeCycleService.getState());
         clusterService.addListener(listener);
 
         // note: clock is needed here until actions can be constructed directly instead of by guice

+ 10 - 4
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexingOperationListener;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.xpack.core.watcher.WatcherState;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
 import org.elasticsearch.xpack.watcher.trigger.TriggerService;
 import org.elasticsearch.xpack.watcher.watch.WatchParser;
@@ -38,11 +39,13 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -66,12 +69,14 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
     private final WatchParser parser;
     private final Clock clock;
     private final TriggerService triggerService;
+    private final Supplier<WatcherState> watcherState;
     private volatile Configuration configuration = INACTIVE;
 
-    WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService) {
+    WatcherIndexingListener(WatchParser parser, Clock clock, TriggerService triggerService, Supplier<WatcherState> watcherState) {
         this.parser = parser;
         this.clock = clock;
         this.triggerService = triggerService;
+        this.watcherState = watcherState;
     }
 
     // package private for testing
@@ -119,8 +124,9 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
                 }
 
                 boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
-                if (shouldBeTriggered) {
-                    if (watch.status().state().isActive()) {
+                WatcherState currentState = watcherState.get();
+                if (shouldBeTriggered && EnumSet.of(WatcherState.STOPPING, WatcherState.STOPPED).contains(currentState) == false) {
+                    if (watch.status().state().isActive() ) {
                         logger.debug("adding watch [{}] to trigger service", watch.id());
                         triggerService.add(watch);
                     } else {
@@ -128,7 +134,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
                         triggerService.remove(watch.id());
                     }
                 } else {
-                    logger.debug("watch [{}] should not be triggered", watch.id());
+                    logger.debug("watch [{}] should not be triggered. watcher state [{}]", watch.id(), currentState);
                 }
             } catch (IOException e) {
                 throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());

+ 3 - 2
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

@@ -31,6 +31,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -203,7 +204,7 @@ public class WatcherLifeCycleService implements ClusterStateListener {
         return previousShardRoutings.get();
     }
 
-    public WatcherState getState() {
-        return state.get();
+    public Supplier<WatcherState> getState(){
+        return () -> state.get();
     }
 }

+ 1 - 1
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsAction.java

@@ -67,7 +67,7 @@ public class TransportWatcherStatsAction extends TransportNodesAction<WatcherSta
     @Override
     protected WatcherStatsResponse.Node nodeOperation(WatcherStatsRequest.Node request, Task task) {
         WatcherStatsResponse.Node statsResponse = new WatcherStatsResponse.Node(clusterService.localNode());
-        statsResponse.setWatcherState(lifeCycleService.getState());
+        statsResponse.setWatcherState(lifeCycleService.getState().get());
         statsResponse.setThreadPoolQueueSize(executionService.executionThreadPoolQueueSize());
         statsResponse.setThreadPoolMaxSize(executionService.executionThreadPoolMaxSize());
         if (request.includeCurrentWatches()) {

+ 25 - 1
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.index.Index;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.watcher.WatcherState;
 import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
 import org.elasticsearch.xpack.core.watcher.watch.Watch;
 import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
@@ -88,7 +89,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
     @Before
     public void setup() throws Exception {
         clock.freeze();
-        listener = new WatcherIndexingListener(parser, clock, triggerService);
+        listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STARTED);
 
         Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
         map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
@@ -139,6 +140,29 @@ public class WatcherIndexingListenerTests extends ESTestCase {
         }
     }
 
+    public void testPostIndexWhenStopped() throws Exception {
+        listener = new WatcherIndexingListener(parser, clock, triggerService, () -> WatcherState.STOPPED);
+        Map<ShardId, ShardAllocationConfiguration> map = new HashMap<>();
+        map.put(shardId, new ShardAllocationConfiguration(0, 1, Collections.singletonList("foo")));
+        listener.setConfiguration(new Configuration(Watch.INDEX, map));
+        when(operation.id()).thenReturn(randomAlphaOfLength(10));
+        when(operation.source()).thenReturn(BytesArray.EMPTY);
+        when(shardId.getIndexName()).thenReturn(Watch.INDEX);
+        List<Engine.Result.Type> types = new ArrayList<>(List.of(Engine.Result.Type.values()));
+        types.remove(Engine.Result.Type.FAILURE);
+        when(result.getResultType()).thenReturn(randomFrom(types));
+
+        boolean watchActive = randomBoolean();
+        boolean isNewWatch = randomBoolean();
+        Watch watch = mockWatch("_id", watchActive, isNewWatch);
+        when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
+
+        listener.postIndex(shardId, operation, result);
+        ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
+        verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
+        verifyZeroInteractions(triggerService);
+    }
+
     // this test emulates an index with 10 shards, and ensures that triggering only happens on a
     // single shard
     public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {

+ 5 - 5
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

@@ -179,9 +179,9 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
         verify(watcherService, times(1))
             .stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
-        assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
+        assertEquals(WatcherState.STOPPING, lifeCycleService.getState().get());
         captor.getValue().run();
-        assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
+        assertEquals(WatcherState.STOPPED, lifeCycleService.getState().get());
 
         // Starting via cluster state update, as the watcher metadata block is removed/set to true
         reset(watcherService);
@@ -480,7 +480,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
                     new HashSet<>(roles), Version.CURRENT))).build();
 
         lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
-        assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
+        assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
     }
 
     public void testDataNodeWithoutDataCanStart() {
@@ -494,7 +494,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
             .build();
 
         lifeCycleService.clusterChanged(new ClusterChangedEvent("test", state, state));
-        assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
+        assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
     }
 
     // this emulates a node outage somewhere in the cluster that carried a watcher shard
@@ -584,7 +584,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
         when(watcherService.validate(state)).thenReturn(true);
 
         lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
-        assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
+        assertThat(lifeCycleService.getState().get(), is(WatcherState.STARTED));
         verify(watcherService, times(1)).reload(eq(state), anyString());
         assertThat(lifeCycleService.shardRoutings(), hasSize(1));
 

+ 1 - 1
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/stats/TransportWatcherStatsActionTests.java

@@ -58,7 +58,7 @@ public class TransportWatcherStatsActionTests extends ESTestCase {
         when(clusterService.state()).thenReturn(clusterState);
 
         WatcherLifeCycleService watcherLifeCycleService = mock(WatcherLifeCycleService.class);
-        when(watcherLifeCycleService.getState()).thenReturn(WatcherState.STARTED);
+        when(watcherLifeCycleService.getState()).thenReturn(() -> WatcherState.STARTED);
 
         ExecutionService executionService = mock(ExecutionService.class);
         when(executionService.executionThreadPoolQueueSize()).thenReturn(100L);