ReservedClusterStateServiceTests.java 48 KB


  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the "Elastic License
  4. * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
  5. * Public License v 1"; you may not use this file except in compliance with, at
  6. * your election, the "Elastic License 2.0", the "GNU Affero General Public
  7. * License v3.0 only", or the "Server Side Public License, v 1".
  8. */
  9. package org.elasticsearch.reservedstate.service;
  10. import org.elasticsearch.action.ActionListener;
  11. import org.elasticsearch.cluster.ClusterName;
  12. import org.elasticsearch.cluster.ClusterState;
  13. import org.elasticsearch.cluster.ClusterStateAckListener;
  14. import org.elasticsearch.cluster.ClusterStateTaskExecutor;
  15. import org.elasticsearch.cluster.ClusterStateTaskListener;
  16. import org.elasticsearch.cluster.metadata.Metadata;
  17. import org.elasticsearch.cluster.metadata.ProjectId;
  18. import org.elasticsearch.cluster.metadata.ProjectMetadata;
  19. import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
  20. import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
  21. import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
  22. import org.elasticsearch.cluster.project.ProjectStateRegistry;
  23. import org.elasticsearch.cluster.routing.RerouteService;
  24. import org.elasticsearch.cluster.service.ClusterService;
  25. import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
  26. import org.elasticsearch.common.Strings;
  27. import org.elasticsearch.common.settings.ClusterSettings;
  28. import org.elasticsearch.common.settings.Settings;
  29. import org.elasticsearch.core.Releasable;
  30. import org.elasticsearch.env.BuildVersion;
  31. import org.elasticsearch.env.BuildVersionTests;
  32. import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
  33. import org.elasticsearch.reservedstate.ReservedProjectStateHandler;
  34. import org.elasticsearch.reservedstate.ReservedStateHandler;
  35. import org.elasticsearch.reservedstate.TransformState;
  36. import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
  37. import org.elasticsearch.test.ESTestCase;
  38. import org.elasticsearch.xcontent.XContentParser;
  39. import org.elasticsearch.xcontent.XContentParserConfiguration;
  40. import org.elasticsearch.xcontent.XContentType;
  41. import org.junit.Assert;
  42. import org.mockito.ArgumentCaptor;
  43. import org.mockito.ArgumentMatchers;
  44. import java.io.IOException;
  45. import java.util.ArrayList;
  46. import java.util.Collection;
  47. import java.util.LinkedHashSet;
  48. import java.util.List;
  49. import java.util.Map;
  50. import java.util.Optional;
  51. import java.util.Set;
  52. import java.util.concurrent.atomic.AtomicBoolean;
  53. import java.util.concurrent.atomic.AtomicReference;
  54. import java.util.function.Consumer;
  55. import java.util.function.LongFunction;
  56. import static org.hamcrest.Matchers.anyOf;
  57. import static org.hamcrest.Matchers.contains;
  58. import static org.hamcrest.Matchers.containsString;
  59. import static org.hamcrest.Matchers.equalTo;
  60. import static org.hamcrest.Matchers.instanceOf;
  61. import static org.hamcrest.Matchers.is;
  62. import static org.hamcrest.Matchers.not;
  63. import static org.hamcrest.Matchers.notNullValue;
  64. import static org.hamcrest.Matchers.nullValue;
  65. import static org.hamcrest.Matchers.sameInstance;
  66. import static org.hamcrest.Matchers.startsWith;
  67. import static org.mockito.ArgumentMatchers.any;
  68. import static org.mockito.ArgumentMatchers.anyString;
  69. import static org.mockito.Mockito.doNothing;
  70. import static org.mockito.Mockito.doReturn;
  71. import static org.mockito.Mockito.mock;
  72. import static org.mockito.Mockito.never;
  73. import static org.mockito.Mockito.spy;
  74. import static org.mockito.Mockito.times;
  75. import static org.mockito.Mockito.verify;
  76. import static org.mockito.Mockito.verifyNoInteractions;
  77. import static org.mockito.Mockito.verifyNoMoreInteractions;
  78. import static org.mockito.Mockito.when;
  79. public class ReservedClusterStateServiceTests extends ESTestCase {
  80. private static final String TEST_CHUNK_TEMPLATE = """
  81. {
  82. "metadata": {
  83. "version": "%s",
  84. "compatibility": "8.4.0"
  85. },
  86. "state": {
  87. "%s": {
  88. "nothing": "useful"
  89. }
  90. }
  91. }
  92. """;
  93. @SuppressWarnings("unchecked")
  94. private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mockTaskQueue() {
  95. return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
  96. }
  97. private static class TestTaskContext<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor.TaskContext<T> {
  98. private final T task;
  99. private TestTaskContext(T task) {
  100. this.task = task;
  101. }
  102. @Override
  103. public T getTask() {
  104. return task;
  105. }
  106. @Override
  107. public void success(Runnable onPublicationSuccess) {
  108. onPublicationSuccess.run();
  109. }
  110. @Override
  111. public void success(Consumer<ClusterState> publishedStateConsumer) {}
  112. @Override
  113. public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {}
  114. @Override
  115. public void success(Consumer<ClusterState> publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {}
  116. @Override
  117. public void onFailure(Exception failure) {}
  118. @Override
  119. public Releasable captureResponseHeaders() {
  120. return null;
  121. }
  122. }
  123. private static class TestStateHandler implements ReservedStateHandler<Map<String, Object>> {
  124. private final String name;
  125. private TestStateHandler(String name) {
  126. this.name = name;
  127. }
  128. @Override
  129. public String name() {
  130. return name;
  131. }
  132. @Override
  133. public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
  134. return parser.map();
  135. }
  136. }
  137. private static class TestClusterStateHandler extends TestStateHandler implements ReservedClusterStateHandler<Map<String, Object>> {
  138. private TestClusterStateHandler(String name) {
  139. super(name);
  140. }
  141. @Override
  142. public TransformState transform(Map<String, Object> source, TransformState prevState) throws Exception {
  143. ClusterState newState = new ClusterState.Builder(prevState.state()).build();
  144. return new TransformState(newState, prevState.keys());
  145. }
  146. }
  147. private static class TestProjectStateHandler extends TestStateHandler implements ReservedProjectStateHandler<Map<String, Object>> {
  148. private TestProjectStateHandler(String name) {
  149. super(name);
  150. }
  151. @Override
  152. public TransformState transform(ProjectId projectId, Map<String, Object> source, TransformState prevState) throws Exception {
  153. ClusterState newState = new ClusterState.Builder(prevState.state()).build();
  154. return new TransformState(newState, prevState.keys());
  155. }
  156. }
  157. private static ClusterState setupProject(ClusterState state, Optional<ProjectId> projectId) {
  158. return projectId.map(p -> ClusterState.builder(state).putProjectMetadata(ProjectMetadata.builder(p)).build()).orElse(state);
  159. }
  160. private static Map<String, ReservedStateMetadata> getMetadata(ClusterState state, Optional<ProjectId> projectId) {
  161. return projectId.map(p -> ProjectStateRegistry.get(state).reservedStateMetadata(p))
  162. .orElseGet(() -> state.metadata().reservedStateMetadata());
  163. }
  164. private static ReservedStateUpdateTask<?> createEmptyTask(
  165. Optional<ProjectId> projectId,
  166. String namespace,
  167. ReservedStateChunk stateChunk,
  168. ReservedStateVersionCheck versionCheck
  169. ) {
  170. return projectId.<ReservedStateUpdateTask<?>>map(
  171. p -> new ReservedProjectStateUpdateTask(
  172. p,
  173. namespace,
  174. stateChunk,
  175. versionCheck,
  176. Map.of(),
  177. Set.of(),
  178. errorState -> {},
  179. ActionListener.noop()
  180. )
  181. )
  182. .orElseGet(
  183. () -> new ReservedClusterStateUpdateTask(
  184. namespace,
  185. stateChunk,
  186. versionCheck,
  187. Map.of(),
  188. List.of(),
  189. errorState -> {},
  190. ActionListener.noop()
  191. )
  192. );
  193. }
  194. public void testOperatorController() throws IOException {
  195. ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
  196. ClusterService clusterService = mock(ClusterService.class);
  197. when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
  198. final ClusterName clusterName = new ClusterName("elasticsearch");
  199. ClusterState state = ClusterState.builder(clusterName).build();
  200. when(clusterService.state()).thenReturn(state);
  201. ReservedClusterStateService controller = new ReservedClusterStateService(
  202. clusterService,
  203. mock(RerouteService.class),
  204. List.of(new ReservedClusterSettingsAction(clusterSettings)),
  205. List.of()
  206. );
  207. String testJSON = """
  208. {
  209. "metadata": {
  210. "version": "1234",
  211. "compatibility": "8.4.0"
  212. },
  213. "state": {
  214. "cluster_settings": {
  215. "indices.recovery.max_bytes_per_sec": "50mb"
  216. }
  217. }
  218. """;
  219. AtomicReference<Exception> x = new AtomicReference<>();
  220. try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) {
  221. controller.process(
  222. "operator",
  223. parser,
  224. randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
  225. x::set
  226. );
  227. assertThat(x.get(), instanceOf(IllegalStateException.class));
  228. assertThat(x.get().getMessage(), containsString("Error processing state change request for operator"));
  229. }
  230. testJSON = """
  231. {
  232. "metadata": {
  233. "version": "1234",
  234. "compatibility": "8.4.0"
  235. },
  236. "state": {
  237. "cluster_settings": {
  238. "indices.recovery.max_bytes_per_sec": "50mb",
  239. "cluster": {
  240. "remote": {
  241. "cluster_one": {
  242. "seeds": [
  243. "127.0.0.1:9300"
  244. ]
  245. }
  246. }
  247. }
  248. }
  249. }
  250. }
  251. """;
  252. try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) {
  253. controller.process(
  254. "operator",
  255. parser,
  256. randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
  257. Assert::assertNull
  258. );
  259. }
  260. }
  261. public void testInitEmptyTask() {
  262. ClusterService clusterService = mock(ClusterService.class);
  263. ArgumentCaptor<ReservedStateUpdateTask<?>> updateTask = ArgumentCaptor.captor();
  264. // grab the update task when it gets given to us
  265. when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any())).thenAnswer(i -> {
  266. MasterServiceTaskQueue<ReservedStateUpdateTask<?>> queue = mockTaskQueue();
  267. doNothing().when(queue).submitTask(any(), updateTask.capture(), any());
  268. return queue;
  269. });
  270. ReservedClusterStateService service = new ReservedClusterStateService(
  271. clusterService,
  272. mock(RerouteService.class),
  273. List.of(),
  274. List.of()
  275. );
  276. service.initEmpty("namespace", ActionListener.noop());
  277. assertThat(updateTask.getValue(), notNullValue());
  278. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  279. ClusterState updatedState = updateTask.getValue().execute(state);
  280. assertThat(
  281. updatedState.metadata().reservedStateMetadata(),
  282. equalTo(Map.of("namespace", new ReservedStateMetadata("namespace", ReservedStateMetadata.EMPTY_VERSION, Map.of(), null)))
  283. );
  284. }
  285. public void testUpdateStateTasks() throws Exception {
  286. RerouteService rerouteService = mock(RerouteService.class);
  287. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  288. ReservedStateUpdateTaskExecutor taskExecutor = new ReservedStateUpdateTaskExecutor(rerouteService);
  289. AtomicBoolean successCalled = new AtomicBoolean(false);
  290. ReservedStateUpdateTask<?> task = spy(
  291. createEmptyTask(
  292. randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault()),
  293. "test",
  294. null,
  295. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  296. )
  297. );
  298. doReturn(state).when(task).execute(any());
  299. ClusterStateTaskExecutor.TaskContext<ReservedStateUpdateTask<?>> taskContext = new TestTaskContext<>(task) {
  300. @Override
  301. public void success(Runnable onPublicationSuccess) {
  302. super.success(onPublicationSuccess);
  303. successCalled.set(true);
  304. }
  305. };
  306. ClusterState newState = taskExecutor.execute(
  307. new ClusterStateTaskExecutor.BatchExecutionContext<>(state, List.of(taskContext), () -> null)
  308. );
  309. assertEquals(state, newState);
  310. assertTrue(successCalled.get());
  311. verify(task, times(1)).execute(any());
  312. taskExecutor.clusterStatePublished(state);
  313. verify(rerouteService, times(1)).reroute(anyString(), any(), any());
  314. }
  315. public void testUpdateErrorStateNonExistingProject() {
  316. ClusterService clusterService = mock(ClusterService.class);
  317. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  318. when(clusterService.state()).thenReturn(state);
  319. ReservedClusterStateService service = new ReservedClusterStateService(
  320. clusterService,
  321. mock(RerouteService.class),
  322. List.of(),
  323. List.of()
  324. );
  325. ErrorState error = new ErrorState(
  326. Optional.of(randomUniqueProjectId()),
  327. "namespace",
  328. 2L,
  329. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  330. List.of("error"),
  331. ReservedStateErrorMetadata.ErrorKind.TRANSIENT
  332. );
  333. service.updateErrorState(error);
  334. verify(clusterService, never()).createTaskQueue(any(), any(), any());
  335. }
  336. public void testProcessMultipleChunks() throws Exception {
  337. ClusterService clusterService = mock(ClusterService.class);
  338. when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
  339. final ClusterName clusterName = new ClusterName("elasticsearch");
  340. ClusterState state = ClusterState.builder(clusterName).build();
  341. ProjectId projectId = randomProjectIdOrDefault();
  342. state = setupProject(state, Optional.of(projectId));
  343. when(clusterService.state()).thenReturn(state);
  344. AtomicReference<Exception> exceptionRef = new AtomicReference<>();
  345. List<ReservedStateChunk> chunks = new ArrayList<>();
  346. String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false, false);
  347. List<ReservedProjectStateHandler<?>> projectHandlers = new ArrayList<>();
  348. for (var key : randomStateKeys) {
  349. projectHandlers.add(spy(new TestProjectStateHandler(key)));
  350. }
  351. ReservedClusterStateService controller = new ReservedClusterStateService(
  352. clusterService,
  353. mock(RerouteService.class),
  354. List.of(),
  355. projectHandlers
  356. );
  357. for (var testHandler : randomStateKeys) {
  358. String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler);
  359. try (
  360. XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON);
  361. ) {
  362. chunks.add(controller.parse(projectId, "test", chunkParser));
  363. }
  364. }
  365. controller.process(
  366. projectId,
  367. "test",
  368. chunks,
  369. randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
  370. exceptionRef::set
  371. );
  372. assertThat(exceptionRef.get(), nullValue());
  373. for (var projectHandler : projectHandlers) {
  374. verify(projectHandler, times(1)).transform(any(), any(), any());
  375. }
  376. }
  377. public void testProcessMultipleChunksVersionMismatch() throws Exception {
  378. ClusterService clusterService = mock(ClusterService.class);
  379. when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
  380. final ClusterName clusterName = new ClusterName("elasticsearch");
  381. ClusterState state = ClusterState.builder(clusterName).build();
  382. ProjectId projectId = randomProjectIdOrDefault();
  383. state = setupProject(state, Optional.of(projectId));
  384. when(clusterService.state()).thenReturn(state);
  385. String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1");
  386. String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2");
  387. AtomicReference<Exception> exceptionRef = new AtomicReference<>();
  388. List<ReservedStateChunk> chunks = new ArrayList<>();
  389. TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1"));
  390. TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2"));
  391. ReservedClusterStateService controller = new ReservedClusterStateService(
  392. clusterService,
  393. mock(RerouteService.class),
  394. List.of(),
  395. List.of(projectStateHandler1, projectStateHandler2)
  396. );
  397. try (
  398. XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
  399. XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
  400. ) {
  401. chunks.add(controller.parse(projectId, "test", chunkParser1));
  402. chunks.add(controller.parse(projectId, "test", chunkParser2));
  403. }
  404. controller.process(
  405. projectId,
  406. "test",
  407. chunks,
  408. randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
  409. exceptionRef::set
  410. );
  411. assertThat(exceptionRef.get(), notNullValue());
  412. assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: ["));
  413. verify(projectStateHandler1, times(0)).transform(any(), any(), any());
  414. verify(projectStateHandler2, times(0)).transform(any(), any(), any());
  415. }
  416. public void testProcessMultipleChunksDuplicateKeys() throws Exception {
  417. ClusterService clusterService = mock(ClusterService.class);
  418. when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
  419. final ClusterName clusterName = new ClusterName("elasticsearch");
  420. ClusterState state = ClusterState.builder(clusterName).build();
  421. ProjectId projectId = randomProjectIdOrDefault();
  422. state = setupProject(state, Optional.of(projectId));
  423. when(clusterService.state()).thenReturn(state);
  424. String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
  425. String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
  426. AtomicReference<Exception> exceptionRef = new AtomicReference<>();
  427. List<ReservedStateChunk> chunks = new ArrayList<>();
  428. TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test"));
  429. ReservedClusterStateService controller = new ReservedClusterStateService(
  430. clusterService,
  431. mock(RerouteService.class),
  432. List.of(),
  433. List.of(projectStateHandler1)
  434. );
  435. try (
  436. XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
  437. XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
  438. ) {
  439. chunks.add(controller.parse(projectId, "test", chunkParser1));
  440. chunks.add(controller.parse(projectId, "test", chunkParser2));
  441. }
  442. controller.process(
  443. projectId,
  444. "test",
  445. chunks,
  446. randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
  447. exceptionRef::set
  448. );
  449. assertThat(exceptionRef.get(), notNullValue());
  450. assertThat(
  451. exceptionRef.get().getMessage(),
  452. containsString("Failed to merge reserved state chunks because of duplicate keys: [test]")
  453. );
  454. verify(projectStateHandler1, times(0)).transform(any(), any(), any());
  455. }
  456. public void testUpdateErrorState() {
  457. ClusterService clusterService = mock(ClusterService.class);
  458. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  459. Optional<ProjectId> project = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  460. state = setupProject(state, project);
  461. ArgumentCaptor<ReservedStateErrorTask> updateTask = ArgumentCaptor.captor();
  462. MasterServiceTaskQueue<ReservedStateErrorTask> errorQueue = mockTaskQueue();
  463. doNothing().when(errorQueue).submitTask(any(), updateTask.capture(), any());
  464. // grab the update task when it gets given to us
  465. when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any()))
  466. .thenReturn(errorQueue);
  467. when(clusterService.state()).thenReturn(state);
  468. ReservedClusterStateService service = new ReservedClusterStateService(
  469. clusterService,
  470. mock(RerouteService.class),
  471. List.of(),
  472. List.of()
  473. );
  474. ErrorState error = new ErrorState(
  475. project,
  476. "namespace",
  477. 2L,
  478. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  479. List.of("error"),
  480. ReservedStateErrorMetadata.ErrorKind.TRANSIENT
  481. );
  482. service.updateErrorState(error);
  483. assertThat(updateTask.getValue(), notNullValue());
  484. verify(errorQueue).submitTask(any(), any(), any());
  485. ClusterState updatedState = updateTask.getValue().execute(state);
  486. assertThat(
  487. getMetadata(updatedState, project).get("namespace"),
  488. equalTo(
  489. new ReservedStateMetadata(
  490. "namespace",
  491. ReservedStateMetadata.NO_VERSION,
  492. Map.of(),
  493. new ReservedStateErrorMetadata(2L, ReservedStateErrorMetadata.ErrorKind.TRANSIENT, List.of("error"))
  494. )
  495. )
  496. );
  497. // it should not update if the error version is less than the current version
  498. when(clusterService.state()).thenReturn(updatedState);
  499. ErrorState oldError = new ErrorState(
  500. project,
  501. "namespace",
  502. 1L,
  503. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  504. List.of("old error"),
  505. ReservedStateErrorMetadata.ErrorKind.TRANSIENT
  506. );
  507. service.updateErrorState(oldError);
  508. verifyNoMoreInteractions(errorQueue);
  509. }
  510. @SuppressWarnings("unchecked")
  511. public void testOneUpdateTaskPerQueue() {
  512. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  513. MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
  514. MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
  515. MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();
  516. ClusterService clusterService = mock(ClusterService.class);
  517. when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-update tasks
  518. .thenReturn(unusedQueue);
  519. when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any()))
  520. .thenReturn(queue1, queue2, unusedQueue);
  521. when(clusterService.state()).thenReturn(state);
  522. ReservedClusterStateService service = new ReservedClusterStateService(
  523. clusterService,
  524. mock(RerouteService.class),
  525. List.of(),
  526. List.of()
  527. );
  528. LongFunction<ReservedStateUpdateTask<?>> update = version -> {
  529. ReservedStateUpdateTask<?> task = spy(
  530. new ReservedClusterStateUpdateTask(
  531. "test",
  532. new ReservedStateChunk(Map.of(), new ReservedStateVersion(version, BuildVersion.current())),
  533. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  534. Map.of(),
  535. Set.of(),
  536. errorState -> {},
  537. ActionListener.noop()
  538. )
  539. );
  540. doReturn(state).when(task).execute(any());
  541. return task;
  542. };
  543. service.submitUpdateTask("test", update.apply(2L));
  544. service.submitUpdateTask("test", update.apply(3L));
  545. // One task to each queue
  546. verify(queue1).submitTask(any(), any(), any());
  547. verify(queue2).submitTask(any(), any(), any());
  548. // No additional unexpected tasks
  549. verifyNoInteractions(unusedQueue);
  550. }
  551. @SuppressWarnings("unchecked")
  552. public void testOneErrorTaskPerQueue() {
  553. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  554. Optional<ProjectId> projectId = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  555. state = setupProject(state, projectId);
  556. MasterServiceTaskQueue<ReservedStateErrorTask> queue1 = mockTaskQueue();
  557. MasterServiceTaskQueue<ReservedStateErrorTask> queue2 = mockTaskQueue();
  558. MasterServiceTaskQueue<ReservedStateErrorTask> unusedQueue = mockTaskQueue();
  559. ClusterService clusterService = mock(ClusterService.class);
  560. when(clusterService.<ReservedStateErrorTask>createTaskQueue(anyString(), any(), any())) // For non-error tasks
  561. .thenReturn(unusedQueue);
  562. when(clusterService.<ReservedStateErrorTask>createTaskQueue(ArgumentMatchers.contains("reserved state error"), any(), any()))
  563. .thenReturn(queue1, queue2, unusedQueue);
  564. when(clusterService.state()).thenReturn(state);
  565. ReservedClusterStateService service = new ReservedClusterStateService(
  566. clusterService,
  567. mock(RerouteService.class),
  568. List.of(),
  569. List.of()
  570. );
  571. LongFunction<ErrorState> error = version -> new ErrorState(
  572. projectId,
  573. "namespace",
  574. version,
  575. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  576. List.of("error"),
  577. ReservedStateErrorMetadata.ErrorKind.TRANSIENT
  578. );
  579. service.updateErrorState(error.apply(2));
  580. service.updateErrorState(error.apply(3));
  581. // One task to each queue
  582. verify(queue1).submitTask(any(), any(), any());
  583. verify(queue2).submitTask(any(), any(), any());
  584. // No additional unexpected tasks
  585. verifyNoInteractions(unusedQueue);
  586. }
  587. public void testErrorStateTask() throws Exception {
  588. ClusterState state = ClusterState.builder(new ClusterName("test")).build();
  589. Optional<ProjectId> projectId = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  590. state = setupProject(state, projectId);
  591. final var listenerCompleted = new AtomicBoolean(false);
  592. ReservedStateErrorTask task = spy(
  593. new ReservedStateErrorTask(
  594. new ErrorState(
  595. projectId,
  596. "test",
  597. 1L,
  598. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  599. List.of("some parse error", "some io error"),
  600. ReservedStateErrorMetadata.ErrorKind.PARSING
  601. ),
  602. ActionListener.running(() -> listenerCompleted.set(true))
  603. )
  604. );
  605. ClusterState newState = new ReservedStateErrorTaskExecutor().execute(
  606. new ClusterStateTaskExecutor.BatchExecutionContext<>(state, List.of(new TestTaskContext<>(task)), () -> null)
  607. );
  608. verify(task, times(1)).execute(any());
  609. ReservedStateMetadata operatorMetadata = getMetadata(newState, projectId).get("test");
  610. assertNotNull(operatorMetadata);
  611. assertNotNull(operatorMetadata.errorMetadata());
  612. assertThat(operatorMetadata.errorMetadata().version(), is(1L));
  613. assertThat(operatorMetadata.errorMetadata().errorKind(), is(ReservedStateErrorMetadata.ErrorKind.PARSING));
  614. assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error"));
  615. assertTrue(listenerCompleted.get());
  616. }
  617. public void testUpdateTaskDuplicateError() {
  618. ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b"));
  619. ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata(
  620. 2L,
  621. ReservedStateErrorMetadata.ErrorKind.VALIDATION,
  622. List.of("Test error 1", "Test error 2")
  623. );
  624. final ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one")
  625. .errorMetadata(emOne)
  626. .version(1L)
  627. .putHandler(hmOne)
  628. .build();
  629. Optional<ProjectId> projectId = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  630. ClusterState.Builder builder = ClusterState.builder(new ClusterName("test"));
  631. if (projectId.isPresent()) {
  632. builder.putCustom(
  633. ProjectStateRegistry.TYPE,
  634. ProjectStateRegistry.builder().putReservedStateMetadata(projectId.get(), operatorMetadata).build()
  635. );
  636. } else {
  637. builder.metadata(Metadata.builder().put(operatorMetadata));
  638. }
  639. ClusterState state = builder.build();
  640. assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY));
  641. assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY));
  642. assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 2L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION));
  643. assertTrue(ReservedStateErrorTask.isNewError(operatorMetadata, 3L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY));
  644. assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_VERSION_ONLY));
  645. assertTrue(ReservedStateErrorTask.isNewError(null, 1L, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION));
  646. var chunk = new ReservedStateChunk(
  647. Map.of("one", Map.of(), "maker", Map.of()),
  648. new ReservedStateVersion(2L, BuildVersion.current())
  649. );
  650. // We submit a task with two handler, one will cause an exception, the other will create a new state.
  651. // When we fail to update the metadata because of version, we ensure that the returned state is equal to the
  652. // original state by pointer reference to avoid cluster state update task to run.
  653. ReservedStateUpdateTask<?> task;
  654. if (projectId.isPresent()) {
  655. ReservedProjectStateHandler<Map<String, Object>> newStateMaker = new TestProjectStateHandler("maker");
  656. ReservedProjectStateHandler<Map<String, Object>> exceptionThrower = new TestProjectStateHandler("one") {
  657. @Override
  658. public TransformState transform(ProjectId projectId1, Map<String, Object> source, TransformState prevState)
  659. throws Exception {
  660. throw new Exception("anything");
  661. }
  662. };
  663. var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());
  664. task = new ReservedProjectStateUpdateTask(
  665. projectId.get(),
  666. "namespace_one",
  667. chunk,
  668. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  669. Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker),
  670. orderedHandlers,
  671. e -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, e.version(), e.versionCheck())),
  672. ActionListener.noop()
  673. );
  674. ClusterService clusterService = mock(ClusterService.class);
  675. final var controller = spy(
  676. new ReservedClusterStateService(
  677. clusterService,
  678. mock(RerouteService.class),
  679. List.of(),
  680. List.of(newStateMaker, exceptionThrower)
  681. )
  682. );
  683. var trialRunErrors = controller.trialRun(projectId.get(), "namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
  684. assertThat(trialRunErrors, contains(containsString("Error processing one state change:")));
  685. } else {
  686. ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new TestClusterStateHandler("maker");
  687. ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new TestClusterStateHandler("one") {
  688. @Override
  689. public TransformState transform(Map<String, Object> source, TransformState prevState) throws Exception {
  690. throw new Exception("anything");
  691. }
  692. };
  693. var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());
  694. task = new ReservedClusterStateUpdateTask(
  695. "namespace_one",
  696. chunk,
  697. ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
  698. Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker),
  699. orderedHandlers,
  700. e -> assertFalse(ReservedStateErrorTask.isNewError(operatorMetadata, e.version(), e.versionCheck())),
  701. ActionListener.noop()
  702. );
  703. ClusterService clusterService = mock(ClusterService.class);
  704. final var controller = spy(
  705. new ReservedClusterStateService(
  706. clusterService,
  707. mock(RerouteService.class),
  708. List.of(newStateMaker, exceptionThrower),
  709. List.of()
  710. )
  711. );
  712. var trialRunErrors = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
  713. assertThat(trialRunErrors, contains(containsString("Error processing one state change:")));
  714. }
  715. // We exit on duplicate errors before we update the reserved state error metadata
  716. assertThat(
  717. expectThrows(IllegalStateException.class, () -> task.execute(state)).getMessage(),
  718. containsString("Error processing state change request for namespace_one")
  719. );
  720. emOne = new ReservedStateErrorMetadata(
  721. 0L,
  722. ReservedStateErrorMetadata.ErrorKind.VALIDATION,
  723. List.of("Test error 1", "Test error 2")
  724. );
  725. // If we are writing with older error metadata, we should get proper IllegalStateException
  726. ReservedStateMetadata opMetadata = ReservedStateMetadata.builder("namespace_one")
  727. .errorMetadata(emOne)
  728. .version(0L)
  729. .putHandler(hmOne)
  730. .build();
  731. builder = ClusterState.builder(new ClusterName("test"));
  732. if (projectId.isPresent()) {
  733. builder.putCustom(
  734. ProjectStateRegistry.TYPE,
  735. ProjectStateRegistry.builder().putReservedStateMetadata(projectId.get(), opMetadata).build()
  736. );
  737. } else {
  738. builder.metadata(Metadata.builder().put(opMetadata));
  739. }
  740. ClusterState newState = builder.build();
  741. // We exit on duplicate errors before we update the reserved state error metadata
  742. assertThat(
  743. expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage(),
  744. containsString("Error processing state change request for namespace_one")
  745. );
  746. }
  747. public void testCheckMetadataVersion() {
  748. ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build();
  749. Optional<ProjectId> projectId = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  750. ClusterState.Builder builder = ClusterState.builder(new ClusterName("test"));
  751. if (projectId.isPresent()) {
  752. builder.putCustom(
  753. ProjectStateRegistry.TYPE,
  754. ProjectStateRegistry.builder().putReservedStateMetadata(projectId.get(), operatorMetadata).build()
  755. );
  756. } else {
  757. builder.metadata(Metadata.builder().put(operatorMetadata));
  758. }
  759. ClusterState state = builder.build();
  760. ReservedStateUpdateTask<?> task = createEmptyTask(
  761. projectId,
  762. "test",
  763. new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())),
  764. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  765. );
  766. assertThat("Cluster state should be modified", task.execute(state), not(sameInstance(state)));
  767. task = createEmptyTask(
  768. projectId,
  769. "test",
  770. new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersion.current())),
  771. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  772. );
  773. assertThat("Cluster state should be modified", task.execute(state), not(sameInstance(state)));
  774. task = createEmptyTask(
  775. projectId,
  776. "test",
  777. new ReservedStateChunk(Map.of(), new ReservedStateVersion(123L, BuildVersion.current())),
  778. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  779. );
  780. assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
  781. task = createEmptyTask(
  782. projectId,
  783. "test",
  784. new ReservedStateChunk(Map.of(), new ReservedStateVersion(123L, BuildVersion.current())),
  785. ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION
  786. );
  787. assertThat("Cluster state should be modified", task.execute(state), not(sameInstance(state)));
  788. task = createEmptyTask(
  789. projectId,
  790. "test",
  791. new ReservedStateChunk(Map.of(), new ReservedStateVersion(122L, BuildVersion.current())),
  792. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  793. );
  794. assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
  795. task = createEmptyTask(
  796. projectId,
  797. "test",
  798. new ReservedStateChunk(Map.of(), new ReservedStateVersion(122L, BuildVersion.current())),
  799. ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION
  800. );
  801. assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
  802. task = createEmptyTask(
  803. projectId,
  804. "test",
  805. new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersionTests.increment(BuildVersion.current()))),
  806. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  807. );
  808. assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
  809. task = createEmptyTask(
  810. projectId,
  811. "test",
  812. new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, BuildVersionTests.increment(BuildVersion.current()))),
  813. ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION
  814. );
  815. assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
  816. }
  817. private ReservedClusterStateHandler<Map<String, Object>> makeClusterHandlerHelper(String name, List<String> deps) {
  818. return new TestClusterStateHandler(name) {
  819. @Override
  820. public TransformState transform(Map<String, Object> source, TransformState prevState) throws Exception {
  821. return null;
  822. }
  823. @Override
  824. public Collection<String> dependencies() {
  825. return deps;
  826. }
  827. };
  828. }
  829. private ReservedProjectStateHandler<Map<String, Object>> makeProjectHandlerHelper(String name, List<String> deps) {
  830. return new TestProjectStateHandler(name) {
  831. @Override
  832. public TransformState transform(ProjectId projectId, Map<String, Object> source, TransformState prevState) throws Exception {
  833. return null;
  834. }
  835. @Override
  836. public Collection<String> dependencies() {
  837. return deps;
  838. }
  839. };
  840. }
  841. public void testClusterHandlerOrdering() {
  842. ReservedClusterStateHandler<Map<String, Object>> oh1 = makeClusterHandlerHelper("one", List.of("two", "three"));
  843. ReservedClusterStateHandler<Map<String, Object>> oh2 = makeClusterHandlerHelper("two", List.of());
  844. ReservedClusterStateHandler<Map<String, Object>> oh3 = makeClusterHandlerHelper("three", List.of("two"));
  845. ClusterService clusterService = mock(ClusterService.class);
  846. final var controller = new ReservedClusterStateService(
  847. clusterService,
  848. mock(RerouteService.class),
  849. List.of(oh1, oh2, oh3),
  850. List.of()
  851. );
  852. Collection<String> ordered = controller.orderedClusterStateHandlers(Set.of("one", "two", "three"));
  853. assertThat(ordered, contains("two", "three", "one"));
  854. // assure that we bail on unknown handler
  855. assertThat(
  856. expectThrows(IllegalStateException.class, () -> controller.orderedClusterStateHandlers(Set.of("one", "two", "three", "four")))
  857. .getMessage(),
  858. is("Unknown handler type: four")
  859. );
  860. // assure that we bail on missing dependency link
  861. assertThat(
  862. expectThrows(IllegalStateException.class, () -> controller.orderedClusterStateHandlers(Set.of("one", "two"))).getMessage(),
  863. is("Missing handler dependency definition: one -> three")
  864. );
  865. // Change the second handler so that we create cycle
  866. oh2 = makeClusterHandlerHelper("two", List.of("one"));
  867. final var controller1 = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of(oh1, oh2), List.of());
  868. assertThat(
  869. expectThrows(IllegalStateException.class, () -> controller1.orderedClusterStateHandlers(Set.of("one", "two"))).getMessage(),
  870. anyOf(
  871. is("Cycle found in settings dependencies: one -> two -> one"),
  872. is("Cycle found in settings dependencies: two -> one -> two")
  873. )
  874. );
  875. }
  876. public void testProjectHandlerOrdering() {
  877. ReservedProjectStateHandler<Map<String, Object>> oh1 = makeProjectHandlerHelper("one", List.of("two", "three"));
  878. ReservedProjectStateHandler<Map<String, Object>> oh2 = makeProjectHandlerHelper("two", List.of());
  879. ReservedProjectStateHandler<Map<String, Object>> oh3 = makeProjectHandlerHelper("three", List.of("two"));
  880. ClusterService clusterService = mock(ClusterService.class);
  881. final var controller = new ReservedClusterStateService(
  882. clusterService,
  883. mock(RerouteService.class),
  884. List.of(),
  885. List.of(oh1, oh2, oh3)
  886. );
  887. Collection<String> ordered = controller.orderedProjectStateHandlers(Set.of("one", "two", "three"));
  888. assertThat(ordered, contains("two", "three", "one"));
  889. // assure that we bail on unknown handler
  890. assertThat(
  891. expectThrows(IllegalStateException.class, () -> controller.orderedProjectStateHandlers(Set.of("one", "two", "three", "four")))
  892. .getMessage(),
  893. is("Unknown handler type: four")
  894. );
  895. // assure that we bail on missing dependency link
  896. assertThat(
  897. expectThrows(IllegalStateException.class, () -> controller.orderedProjectStateHandlers(Set.of("one", "two"))).getMessage(),
  898. is("Missing handler dependency definition: one -> three")
  899. );
  900. // Change the second handler so that we create cycle
  901. oh2 = makeProjectHandlerHelper("two", List.of("one"));
  902. final var controller1 = new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of(), List.of(oh1, oh2));
  903. assertThat(
  904. expectThrows(IllegalStateException.class, () -> controller1.orderedProjectStateHandlers(Set.of("one", "two"))).getMessage(),
  905. anyOf(
  906. is("Cycle found in settings dependencies: one -> two -> one"),
  907. is("Cycle found in settings dependencies: two -> one -> two")
  908. )
  909. );
  910. }
  911. public void testDuplicateHandlerNames() {
  912. ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
  913. ClusterService clusterService = mock(ClusterService.class);
  914. final ClusterName clusterName = new ClusterName("elasticsearch");
  915. ClusterState state = ClusterState.builder(clusterName).build();
  916. when(clusterService.state()).thenReturn(state);
  917. assertThat(
  918. expectThrows(
  919. IllegalArgumentException.class,
  920. () -> new ReservedClusterStateService(
  921. clusterService,
  922. mock(RerouteService.class),
  923. List.of(
  924. new ReservedClusterSettingsAction(clusterSettings),
  925. new TestClusterStateHandler(ReservedClusterSettingsAction.NAME)
  926. ),
  927. List.of()
  928. )
  929. ).getMessage(),
  930. startsWith("Duplicate handler name: [cluster_settings]")
  931. );
  932. assertThat(
  933. expectThrows(
  934. IllegalArgumentException.class,
  935. () -> new ReservedClusterStateService(
  936. clusterService,
  937. mock(RerouteService.class),
  938. List.of(new ReservedClusterSettingsAction(clusterSettings)),
  939. List.of(new TestProjectStateHandler(ReservedClusterSettingsAction.NAME))
  940. )
  941. ).getMessage(),
  942. startsWith("Duplicate handler name: [cluster_settings]")
  943. );
  944. }
  945. public void testCheckAndReportError() {
  946. ClusterService clusterService = mock(ClusterService.class);
  947. Optional<ProjectId> projectId = randomBoolean() ? Optional.empty() : Optional.of(randomProjectIdOrDefault());
  948. var state = ClusterState.builder(new ClusterName("elasticsearch")).build();
  949. state = setupProject(state, projectId);
  950. when(clusterService.state()).thenReturn(state);
  951. when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
  952. final var controller = spy(new ReservedClusterStateService(clusterService, mock(RerouteService.class), List.of(), List.of()));
  953. assertNull(controller.checkAndReportError(projectId, "test", List.of(), null, ReservedStateVersionCheck.HIGHER_VERSION_ONLY));
  954. verify(controller, times(0)).updateErrorState(any());
  955. var version = new ReservedStateVersion(2L, BuildVersion.current());
  956. var error = controller.checkAndReportError(
  957. projectId,
  958. "test",
  959. List.of("test error"),
  960. version,
  961. ReservedStateVersionCheck.HIGHER_VERSION_ONLY
  962. );
  963. assertThat(error, instanceOf(IllegalStateException.class));
  964. assertThat(error.getMessage(), is("Error processing state change request for test, errors: test error"));
  965. verify(controller, times(1)).updateErrorState(any());
  966. }
  967. }