|
|
@@ -8,14 +8,7 @@
|
|
|
|
|
|
package org.elasticsearch.reservedstate.service;
|
|
|
|
|
|
-import org.elasticsearch.Build;
|
|
|
import org.elasticsearch.Version;
|
|
|
-import org.elasticsearch.action.ActionListener;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
|
|
-import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
|
|
-import org.elasticsearch.client.internal.ClusterAdminClient;
|
|
|
-import org.elasticsearch.client.internal.node.NodeClient;
|
|
|
-import org.elasticsearch.cluster.ClusterChangedEvent;
|
|
|
import org.elasticsearch.cluster.ClusterName;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
@@ -25,12 +18,11 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.settings.ClusterSettings;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.env.Environment;
|
|
|
-import org.elasticsearch.ingest.IngestInfo;
|
|
|
-import org.elasticsearch.ingest.ProcessorInfo;
|
|
|
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
|
|
|
import org.elasticsearch.test.ESTestCase;
|
|
|
import org.elasticsearch.threadpool.TestThreadPool;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
+import org.elasticsearch.xcontent.XContentParser;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.mockito.Mockito;
|
|
|
@@ -52,16 +44,11 @@ import java.util.List;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
-import static java.util.Collections.emptyMap;
|
|
|
-import static java.util.Collections.emptySet;
|
|
|
-import static org.hamcrest.Matchers.allOf;
|
|
|
-import static org.hamcrest.Matchers.hasToString;
|
|
|
-import static org.hamcrest.Matchers.instanceOf;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.ArgumentMatchers.anyInt;
|
|
|
-import static org.mockito.Mockito.clearInvocations;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.doThrow;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
@@ -75,9 +62,6 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
private FileSettingsService fileSettingsService;
|
|
|
private ReservedClusterStateService controller;
|
|
|
private ThreadPool threadpool;
|
|
|
- private NodeClient nodeClient;
|
|
|
- private ClusterAdminClient clusterAdminClient;
|
|
|
- private NodeInfo nodeInfo;
|
|
|
|
|
|
@Before
|
|
|
@SuppressWarnings("unchecked")
|
|
|
@@ -109,42 +93,7 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
|
|
|
|
|
controller = new ReservedClusterStateService(clusterService, List.of(new ReservedClusterSettingsAction(clusterSettings)));
|
|
|
-
|
|
|
- DiscoveryNode discoveryNode = new DiscoveryNode(
|
|
|
- "_node_id",
|
|
|
- buildNewFakeTransportAddress(),
|
|
|
- emptyMap(),
|
|
|
- emptySet(),
|
|
|
- Version.CURRENT
|
|
|
- );
|
|
|
-
|
|
|
- nodeInfo = new NodeInfo(
|
|
|
- Version.CURRENT,
|
|
|
- Build.CURRENT,
|
|
|
- discoveryNode,
|
|
|
- Settings.EMPTY,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))),
|
|
|
- null,
|
|
|
- null
|
|
|
- );
|
|
|
- NodesInfoResponse response = new NodesInfoResponse(new ClusterName("elasticsearch"), List.of(nodeInfo), List.of());
|
|
|
-
|
|
|
- clusterAdminClient = mock(ClusterAdminClient.class);
|
|
|
- doAnswer(i -> {
|
|
|
- ((ActionListener<NodesInfoResponse>) i.getArgument(1)).onResponse(response);
|
|
|
- return null;
|
|
|
- }).when(clusterAdminClient).nodesInfo(any(), any());
|
|
|
-
|
|
|
- nodeClient = mock(NodeClient.class);
|
|
|
- fileSettingsService = spy(new FileSettingsService(clusterService, controller, env, nodeClient));
|
|
|
- doAnswer(i -> clusterAdminClient).when(fileSettingsService).clusterAdminClient();
|
|
|
+ fileSettingsService = spy(new FileSettingsService(clusterService, controller, env));
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
@@ -188,6 +137,7 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
|
|
|
public void testStartStop() {
|
|
|
fileSettingsService.start();
|
|
|
+ fileSettingsService.startWatcher(clusterService.state());
|
|
|
assertTrue(fileSettingsService.watching());
|
|
|
fileSettingsService.stop();
|
|
|
assertFalse(fileSettingsService.watching());
|
|
|
@@ -204,6 +154,7 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
}).when(service).processFileSettings(any());
|
|
|
|
|
|
service.start();
|
|
|
+ service.startWatcher(clusterService.state());
|
|
|
assertTrue(service.watching());
|
|
|
|
|
|
Files.createDirectories(service.operatorSettingsDir());
|
|
|
@@ -214,8 +165,8 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
// on Linux is instantaneous. Windows is instantaneous too.
|
|
|
processFileLatch.await(30, TimeUnit.SECONDS);
|
|
|
|
|
|
- verify(service, Mockito.atLeast(1)).watchedFileChanged(any());
|
|
|
- verify(service, times(1)).processFileSettings(any());
|
|
|
+ verify(service, Mockito.atLeast(1)).processSettingsAndNotifyListeners();
|
|
|
+ verify(service, Mockito.atLeast(1)).processFileSettings(any());
|
|
|
|
|
|
service.stop();
|
|
|
assertFalse(service.watching());
|
|
|
@@ -223,50 +174,81 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- public void testInitialFile() throws Exception {
|
|
|
+ public void testInitialFileError() throws Exception {
|
|
|
ReservedClusterStateService stateService = mock(ReservedClusterStateService.class);
|
|
|
|
|
|
doAnswer((Answer<Void>) invocation -> {
|
|
|
((Consumer<Exception>) invocation.getArgument(2)).accept(new IllegalStateException("Some exception"));
|
|
|
return null;
|
|
|
- }).when(stateService).process(any(), (ReservedStateChunk) any(), any());
|
|
|
+ }).when(stateService).process(any(), (XContentParser) any(), any());
|
|
|
|
|
|
- FileSettingsService service = spy(new FileSettingsService(clusterService, stateService, env, nodeClient));
|
|
|
- doAnswer(i -> clusterAdminClient).when(service).clusterAdminClient();
|
|
|
+ AtomicBoolean settingsChanged = new AtomicBoolean(false);
|
|
|
+ CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
|
- Files.createDirectories(service.operatorSettingsDir());
|
|
|
+ final FileSettingsService service = spy(new FileSettingsService(clusterService, stateService, env));
|
|
|
+
|
|
|
+ service.addFileSettingsChangedListener(() -> settingsChanged.set(true));
|
|
|
+
|
|
|
+ doAnswer((Answer<Void>) invocation -> {
|
|
|
+ invocation.callRealMethod();
|
|
|
+ latch.countDown();
|
|
|
+ return null;
|
|
|
+ }).when(service).processSettingsAndNotifyListeners();
|
|
|
|
|
|
+ Files.createDirectories(service.operatorSettingsDir());
|
|
|
// contents of the JSON don't matter, we just need a file to exist
|
|
|
writeTestFile(service.operatorSettingsFile(), "{}");
|
|
|
|
|
|
- Exception startupException = expectThrows(IllegalStateException.class, () -> service.start());
|
|
|
- assertThat(
|
|
|
- startupException.getCause(),
|
|
|
- allOf(
|
|
|
- instanceOf(FileSettingsService.FileSettingsStartupException.class),
|
|
|
- hasToString(
|
|
|
- "org.elasticsearch.reservedstate.service.FileSettingsService$FileSettingsStartupException: "
|
|
|
- + "Error applying operator settings"
|
|
|
- )
|
|
|
- )
|
|
|
- );
|
|
|
+ service.start();
|
|
|
+ service.startWatcher(clusterService.state());
|
|
|
+
|
|
|
+ // wait until the watcher thread has started, and it has discovered the file
|
|
|
+ assertTrue(latch.await(20, TimeUnit.SECONDS));
|
|
|
|
|
|
verify(service, times(1)).processFileSettings(any());
|
|
|
+ // assert we never notified any listeners of successful application of file based settings
|
|
|
+ assertFalse(settingsChanged.get());
|
|
|
|
|
|
service.stop();
|
|
|
+ service.close();
|
|
|
+ }
|
|
|
|
|
|
- clearInvocations(service);
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ public void testInitialFileWorks() throws Exception {
|
|
|
+ ReservedClusterStateService stateService = mock(ReservedClusterStateService.class);
|
|
|
|
|
|
// Let's check that if we didn't throw an error that everything works
|
|
|
doAnswer((Answer<Void>) invocation -> {
|
|
|
((Consumer<Exception>) invocation.getArgument(2)).accept(null);
|
|
|
return null;
|
|
|
- }).when(stateService).process(any(), (ReservedStateChunk) any(), any());
|
|
|
+ }).when(stateService).process(any(), (XContentParser) any(), any());
|
|
|
+
|
|
|
+ AtomicBoolean settingsChanged = new AtomicBoolean(false);
|
|
|
+ CountDownLatch latch = new CountDownLatch(1);
|
|
|
+
|
|
|
+ final FileSettingsService service = spy(new FileSettingsService(clusterService, stateService, env));
|
|
|
+
|
|
|
+ service.addFileSettingsChangedListener(() -> settingsChanged.set(true));
|
|
|
+
|
|
|
+ doAnswer((Answer<Void>) invocation -> {
|
|
|
+ invocation.callRealMethod();
|
|
|
+ latch.countDown();
|
|
|
+ return null;
|
|
|
+ }).when(service).processSettingsAndNotifyListeners();
|
|
|
+
|
|
|
+ Files.createDirectories(service.operatorSettingsDir());
|
|
|
+ // contents of the JSON don't matter, we just need a file to exist
|
|
|
+ writeTestFile(service.operatorSettingsFile(), "{}");
|
|
|
|
|
|
service.start();
|
|
|
- service.startWatcher(clusterService.state(), true);
|
|
|
+ service.startWatcher(clusterService.state());
|
|
|
+
|
|
|
+ // wait until the watcher thread has started, and it has discovered the file
|
|
|
+ assertTrue(latch.await(20, TimeUnit.SECONDS));
|
|
|
|
|
|
verify(service, times(1)).processFileSettings(any());
|
|
|
+ // assert we notified the listeners the file settings have changed, they were successfully applied
|
|
|
+ assertTrue(settingsChanged.get());
|
|
|
|
|
|
service.stop();
|
|
|
service.close();
|
|
|
@@ -275,9 +257,8 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testStopWorksInMiddleOfProcessing() throws Exception {
|
|
|
var spiedController = spy(controller);
|
|
|
- var fsService = new FileSettingsService(clusterService, spiedController, env, nodeClient);
|
|
|
+ var fsService = new FileSettingsService(clusterService, spiedController, env);
|
|
|
FileSettingsService service = spy(fsService);
|
|
|
- doAnswer(i -> clusterAdminClient).when(service).clusterAdminClient();
|
|
|
|
|
|
CountDownLatch processFileLatch = new CountDownLatch(1);
|
|
|
CountDownLatch deadThreadLatch = new CountDownLatch(1);
|
|
|
@@ -297,6 +278,7 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
}).when(spiedController).parse(any(String.class), any());
|
|
|
|
|
|
service.start();
|
|
|
+ service.startWatcher(clusterService.state());
|
|
|
assertTrue(service.watching());
|
|
|
|
|
|
Files.createDirectories(service.operatorSettingsDir());
|
|
|
@@ -319,10 +301,8 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testStopWorksIfProcessingDidntReturnYet() throws Exception {
|
|
|
var spiedController = spy(controller);
|
|
|
- var fsService = new FileSettingsService(clusterService, spiedController, env, nodeClient);
|
|
|
+ var service = new FileSettingsService(clusterService, spiedController, env);
|
|
|
|
|
|
- FileSettingsService service = spy(fsService);
|
|
|
- doAnswer(i -> clusterAdminClient).when(service).clusterAdminClient();
|
|
|
CountDownLatch processFileLatch = new CountDownLatch(1);
|
|
|
CountDownLatch deadThreadLatch = new CountDownLatch(1);
|
|
|
|
|
|
@@ -343,6 +323,7 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
}).when(spiedController).parse(any(String.class), any());
|
|
|
|
|
|
service.start();
|
|
|
+ service.startWatcher(clusterService.state());
|
|
|
assertTrue(service.watching());
|
|
|
|
|
|
Files.createDirectories(service.operatorSettingsDir());
|
|
|
@@ -362,120 +343,6 @@ public class FileSettingsServiceTests extends ESTestCase {
|
|
|
deadThreadLatch.countDown();
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public void testNodeInfosRefresh() throws Exception {
|
|
|
- var spiedController = spy(controller);
|
|
|
- var csAdminClient = spy(clusterAdminClient);
|
|
|
- var response = new NodesInfoResponse(new ClusterName("elasticsearch"), List.of(nodeInfo), List.of());
|
|
|
-
|
|
|
- doAnswer(i -> {
|
|
|
- ((ActionListener<NodesInfoResponse>) i.getArgument(1)).onResponse(response);
|
|
|
- return null;
|
|
|
- }).when(csAdminClient).nodesInfo(any(), any());
|
|
|
-
|
|
|
- var service = spy(new FileSettingsService(clusterService, spiedController, env, nodeClient));
|
|
|
- doAnswer(i -> csAdminClient).when(service).clusterAdminClient();
|
|
|
-
|
|
|
- doAnswer(
|
|
|
- (Answer<ReservedStateChunk>) invocation -> new ReservedStateChunk(
|
|
|
- Collections.emptyMap(),
|
|
|
- new ReservedStateVersion(1L, Version.CURRENT)
|
|
|
- )
|
|
|
- ).when(spiedController).parse(any(String.class), any());
|
|
|
-
|
|
|
- Files.createDirectories(service.operatorSettingsDir());
|
|
|
- // Make some fake settings file to cause the file settings service to process it
|
|
|
- writeTestFile(service.operatorSettingsFile(), "{}");
|
|
|
-
|
|
|
- clearInvocations(csAdminClient);
|
|
|
- clearInvocations(spiedController);
|
|
|
-
|
|
|
- // we haven't fetched the node infos ever, since we haven't done any file processing
|
|
|
- assertNull(service.nodeInfos());
|
|
|
-
|
|
|
- // call the processing twice
|
|
|
- service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
|
|
|
- if (e != null) {
|
|
|
- fail("shouldn't get an exception");
|
|
|
- }
|
|
|
- });
|
|
|
- // after the first processing we should have node infos
|
|
|
- assertEquals(1, service.nodeInfos().getNodes().size());
|
|
|
-
|
|
|
- service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
|
|
|
- if (e != null) {
|
|
|
- fail("shouldn't get an exception");
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- // node infos should have been fetched only once
|
|
|
- verify(csAdminClient, times(1)).nodesInfo(any(), any());
|
|
|
- verify(spiedController, times(2)).process(any(), any(ReservedStateChunk.class), any());
|
|
|
-
|
|
|
- // pretend we added a new node
|
|
|
-
|
|
|
- final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
|
|
|
-
|
|
|
- NodeInfo localNodeInfo = new NodeInfo(
|
|
|
- Version.CURRENT,
|
|
|
- Build.CURRENT,
|
|
|
- localNode,
|
|
|
- Settings.EMPTY,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- null,
|
|
|
- new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))),
|
|
|
- null,
|
|
|
- null
|
|
|
- );
|
|
|
- var newResponse = new NodesInfoResponse(new ClusterName("elasticsearch"), List.of(nodeInfo, localNodeInfo), List.of());
|
|
|
-
|
|
|
- final ClusterState prevState = clusterService.state();
|
|
|
- final ClusterState clusterState = ClusterState.builder(prevState)
|
|
|
- .nodes(
|
|
|
- DiscoveryNodes.builder(prevState.getNodes()).add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())
|
|
|
- )
|
|
|
- .build();
|
|
|
-
|
|
|
- ClusterChangedEvent event = new ClusterChangedEvent("transport", clusterState, prevState);
|
|
|
- assertTrue(event.nodesChanged());
|
|
|
- service.clusterChanged(event);
|
|
|
-
|
|
|
- doAnswer(i -> {
|
|
|
- ((ActionListener<NodesInfoResponse>) i.getArgument(1)).onResponse(newResponse);
|
|
|
- return null;
|
|
|
- }).when(csAdminClient).nodesInfo(any(), any());
|
|
|
-
|
|
|
- // this wouldn't change yet, node fetch transport action is invoked on demand, when we need to process file changes,
|
|
|
- // not every time we update the cluster state
|
|
|
- assertEquals(1, service.nodeInfos().getNodes().size());
|
|
|
-
|
|
|
- // call the processing twice
|
|
|
- service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
|
|
|
- if (e != null) {
|
|
|
- fail("shouldn't get an exception");
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- assertEquals(2, service.nodeInfos().getNodes().size());
|
|
|
-
|
|
|
- service.processFileSettings(service.operatorSettingsFile()).whenComplete((o, e) -> {
|
|
|
- if (e != null) {
|
|
|
- fail("shouldn't get an exception");
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- assertEquals(2, service.nodeInfos().getNodes().size());
|
|
|
-
|
|
|
- // node infos should have been fetched one more time
|
|
|
- verify(csAdminClient, times(2)).nodesInfo(any(), any());
|
|
|
- verify(spiedController, times(4)).process(any(), any(ReservedStateChunk.class), any());
|
|
|
- }
|
|
|
-
|
|
|
public void testRegisterWatchKeyRetry() throws IOException, InterruptedException {
|
|
|
var service = spy(fileSettingsService);
|
|
|
doAnswer(i -> 0L).when(service).retryDelayMillis(anyInt());
|