Explorar el Código

Resolve concurrency with watcher trigger service (#39092)

The watcher trigger service could attempt to modify the perWatchStats
map simultaneously from multiple threads. This would cause the
internal state to become inconsistent, in particular the count()
method may return an incorrect value for the number of watches.

This changes replaces the implementation of the map with a
ConcurrentHashMap so that its internal state remains consistent even
when accessed from mutiple threads.

Resolves: #39087
Tim Vernum hace 6 años
padre
commit
e694473de4

+ 2 - 1
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java

@@ -19,6 +19,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 
@@ -29,7 +30,7 @@ public class TriggerService {
 
     private final GroupedConsumer consumer = new GroupedConsumer();
     private final Map<String, TriggerEngine> engines;
-    private final Map<String, TriggerWatchStats> perWatchStats = new HashMap<>();
+    private final Map<String, TriggerWatchStats> perWatchStats = new ConcurrentHashMap<>();
 
     public TriggerService(Set<TriggerEngine> engines) {
         Map<String, TriggerEngine> builder = new HashMap<>();

+ 2 - 0
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

@@ -231,8 +231,10 @@ public class WatcherServiceTests extends ESTestCase {
         Trigger trigger = mock(Trigger.class);
         when(trigger.type()).thenReturn(engineType);
 
+        final String id = randomAlphaOfLengthBetween(3, 12);
         Watch watch = mock(Watch.class);
         when(watch.trigger()).thenReturn(trigger);
+        when(watch.id()).thenReturn(id);
         when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
         ExecutableNoneInput noneInput = new ExecutableNoneInput();
         when(watch.input()).thenReturn(noneInput);

+ 0 - 1
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java

@@ -141,7 +141,6 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
         });
     }
 
-    @AwaitsFix(bugUrl = "Supposedly fixed; https://github.com/elastic/x-pack-elasticsearch/issues/1915")
     public void testLoadExistingWatchesUponStartup() throws Exception {
         stopWatcher();