Browse Source

Allow passing several reserved state chunks in single process call (#124574)

This PR overloads the `process` method and allows it to be used with
several `ReservedStateChunks`. The purpose is to allow several state
chunks to be spread across several files but handled as a single cluster
state update by validating and merging them into a single representation
of the `ReservedStateChunk`.
Johannes Fredén 7 months ago
parent
commit
fc7fbdfe4c

+ 5 - 0
docs/changelog/124574.yaml

@@ -0,0 +1,5 @@
+pr: 124574
+summary: Allow passing several reserved state chunks in single process call
+area: Infra/Settings
+type: enhancement
+issues: []

+ 65 - 4
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.FixForMultiProject;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.env.BuildVersion;
@@ -187,7 +188,7 @@ public class ReservedClusterStateService {
         process(namespace, stateChunk, versionCheck, errorListener);
     }
 
-    ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
+    public ReservedStateChunk parse(ProjectId projectId, String namespace, XContentParser parser) {
         try {
             return stateChunkParser.apply(parser, null);
         } catch (Exception e) {
@@ -377,6 +378,7 @@ public class ReservedClusterStateService {
      * @param projectId the project state to modify
      * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
      * @param reservedStateChunk a {@link ReservedStateChunk} composite state object to process
+     * @param versionCheck  Enum representing whether a reserved state should be processed based on the current and new versions
      * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
      *        cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
      */
@@ -387,17 +389,41 @@ public class ReservedClusterStateService {
         ReservedStateVersionCheck versionCheck,
         Consumer<Exception> errorListener
     ) {
-        Map<String, Object> reservedState = reservedStateChunk.state();
-        ReservedStateVersion reservedStateVersion = reservedStateChunk.metadata();
+        process(projectId, namespace, List.of(reservedStateChunk), versionCheck, errorListener);
+    }
 
+    /**
+     * Saves and reserves a chunk of the cluster state under a given 'namespace' from {@link XContentParser} by combining several chunks
+     * into one
+     *
+     * @param projectId the project state to modify
+     * @param namespace the namespace under which we'll store the reserved keys in the cluster state metadata
+     * @param reservedStateChunks a list of {@link ReservedStateChunk} composite state objects to process
+     * @param versionCheck  Enum representing whether a reserved state should be processed based on the current and new versions
+     * @param errorListener a consumer called with {@link IllegalStateException} if the content has errors and the
+     *        cluster state cannot be correctly applied, null if successful or the state failed to apply because of incompatible version.
+     */
+    public void process(
+        ProjectId projectId,
+        String namespace,
+        List<ReservedStateChunk> reservedStateChunks,
+        ReservedStateVersionCheck versionCheck,
+        Consumer<Exception> errorListener
+    ) {
+        ReservedStateChunk reservedStateChunk;
+        ReservedStateVersion reservedStateVersion;
         LinkedHashSet<String> orderedHandlers;
+
         try {
+            reservedStateChunk = mergeReservedStateChunks(reservedStateChunks);
+            Map<String, Object> reservedState = reservedStateChunk.state();
+            reservedStateVersion = reservedStateChunk.metadata();
             orderedHandlers = orderedProjectStateHandlers(reservedState.keySet());
         } catch (Exception e) {
             ErrorState errorState = new ErrorState(
                 projectId,
                 namespace,
-                reservedStateVersion.version(),
+                reservedStateChunks.getFirst().metadata().version(),
                 versionCheck,
                 e,
                 ReservedStateErrorMetadata.ErrorKind.PARSING
@@ -476,6 +502,36 @@ public class ReservedClusterStateService {
         );
     }
 
+    private static ReservedStateChunk mergeReservedStateChunks(List<ReservedStateChunk> chunks) {
+        if (chunks.isEmpty()) {
+            throw new IllegalArgumentException("No chunks provided");
+        }
+
+        if (chunks.size() == 1) {
+            return chunks.getFirst();
+        }
+
+        ReservedStateVersion reservedStateVersion = chunks.getFirst().metadata();
+        Map<String, Object> mergedChunks = new HashMap<>(chunks.size());
+        for (var chunk : chunks) {
+            Set<String> duplicateKeys = Sets.intersection(chunk.state().keySet(), mergedChunks.keySet());
+            if (chunk.metadata().equals(reservedStateVersion) == false) {
+                throw new IllegalStateException(
+                    "Failed to merge reserved state chunks because of version mismatch: ["
+                        + reservedStateVersion
+                        + "] != ["
+                        + chunk.metadata()
+                        + "]"
+                );
+            } else if (duplicateKeys.isEmpty() == false) {
+                throw new IllegalStateException("Failed to merge reserved state chunks because of duplicate keys: " + duplicateKeys);
+            }
+            mergedChunks.putAll(chunk.state());
+        }
+
+        return new ReservedStateChunk(mergedChunks, reservedStateVersion);
+    }
+
     // package private for testing
     Exception checkAndReportError(
         Optional<ProjectId> projectId,
@@ -518,6 +574,11 @@ public class ReservedClusterStateService {
             return;
         }
 
+        if (errorState.projectId().isPresent() && clusterService.state().metadata().hasProject(errorState.projectId().get()) == false) {
+            // Can't update error state for a project that doesn't exist yet
+            return;
+        }
+
         submitErrorUpdateTask(errorState);
     }
 

+ 190 - 0
server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Releasable;
@@ -41,6 +42,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -60,6 +62,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.hamcrest.Matchers.startsWith;
 import static org.mockito.ArgumentMatchers.any;
@@ -67,6 +70,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -76,6 +80,20 @@ import static org.mockito.Mockito.when;
 
 public class ReservedClusterStateServiceTests extends ESTestCase {
 
+    private static final String TEST_CHUNK_TEMPLATE = """
+        {
+             "metadata": {
+                 "version": "%s",
+                 "compatibility": "8.4.0"
+             },
+             "state": {
+                 "%s": {
+                     "nothing": "useful"
+                 }
+             }
+        }
+        """;
+
     @SuppressWarnings("unchecked")
     private static <T extends ClusterStateTaskListener> MasterServiceTaskQueue<T> mockTaskQueue() {
         return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
@@ -349,6 +367,178 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
         verify(rerouteService, times(1)).reroute(anyString(), any(), any());
     }
 
+    public void testUpdateErrorStateNonExistingProject() {
+        ClusterService clusterService = mock(ClusterService.class);
+        ClusterState state = ClusterState.builder(new ClusterName("test")).build();
+        when(clusterService.state()).thenReturn(state);
+
+        ReservedClusterStateService service = new ReservedClusterStateService(
+            clusterService,
+            mock(RerouteService.class),
+            List.of(),
+            List.of()
+        );
+
+        ErrorState error = new ErrorState(
+            Optional.of(randomUniqueProjectId()),
+            "namespace",
+            2L,
+            ReservedStateVersionCheck.HIGHER_VERSION_ONLY,
+            List.of("error"),
+            ReservedStateErrorMetadata.ErrorKind.TRANSIENT
+        );
+        service.updateErrorState(error);
+        verify(clusterService, never()).createTaskQueue(any(), any(), any());
+    }
+
+    public void testProcessMultipleChunks() throws Exception {
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
+        final ClusterName clusterName = new ClusterName("elasticsearch");
+
+        ClusterState state = ClusterState.builder(clusterName).build();
+        ProjectId projectId = randomProjectIdOrDefault();
+        state = setupProject(state, Optional.of(projectId));
+        when(clusterService.state()).thenReturn(state);
+
+        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+        List<ReservedStateChunk> chunks = new ArrayList<>();
+
+        String[] randomStateKeys = generateRandomStringArray(randomIntBetween(5, 10), randomIntBetween(10, 15), false);
+
+        List<ReservedClusterStateHandler<ProjectMetadata, ?>> projectHandlers = new ArrayList<>();
+        for (var key : randomStateKeys) {
+            projectHandlers.add(spy(new TestProjectStateHandler(key)));
+        }
+
+        ReservedClusterStateService controller = new ReservedClusterStateService(
+            clusterService,
+            mock(RerouteService.class),
+            List.of(),
+            projectHandlers
+        );
+
+        for (var testHandler : randomStateKeys) {
+            String testChunkJSON = Strings.format(TEST_CHUNK_TEMPLATE, 1, testHandler);
+            try (
+                XContentParser chunkParser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testChunkJSON);
+            ) {
+                chunks.add(controller.parse(projectId, "test", chunkParser));
+            }
+        }
+
+        controller.process(
+            projectId,
+            "test",
+            chunks,
+            randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
+            exceptionRef::set
+        );
+
+        assertThat(exceptionRef.get(), nullValue());
+
+        for (var projectHandler : projectHandlers) {
+            verify(projectHandler, times(1)).transform(any(), any());
+        }
+    }
+
+    public void testProcessMultipleChunksVersionMismatch() throws Exception {
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
+        final ClusterName clusterName = new ClusterName("elasticsearch");
+
+        ClusterState state = ClusterState.builder(clusterName).build();
+        ProjectId projectId = randomProjectIdOrDefault();
+        state = setupProject(state, Optional.of(projectId));
+        when(clusterService.state()).thenReturn(state);
+
+        String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test1");
+        String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 2, "test2");
+
+        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+        List<ReservedStateChunk> chunks = new ArrayList<>();
+
+        TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test1"));
+        TestProjectStateHandler projectStateHandler2 = spy(new TestProjectStateHandler("test2"));
+
+        ReservedClusterStateService controller = new ReservedClusterStateService(
+            clusterService,
+            mock(RerouteService.class),
+            List.of(),
+            List.of(projectStateHandler1, projectStateHandler2)
+        );
+
+        try (
+            XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
+            XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
+        ) {
+            chunks.add(controller.parse(projectId, "test", chunkParser1));
+            chunks.add(controller.parse(projectId, "test", chunkParser2));
+        }
+
+        controller.process(
+            projectId,
+            "test",
+            chunks,
+            randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
+            exceptionRef::set
+        );
+
+        assertThat(exceptionRef.get(), notNullValue());
+        assertThat(exceptionRef.get().getMessage(), containsString("Failed to merge reserved state chunks because of version mismatch: ["));
+        verify(projectStateHandler1, times(0)).transform(any(), any());
+        verify(projectStateHandler2, times(0)).transform(any(), any());
+    }
+
+    public void testProcessMultipleChunksDuplicateKeys() throws Exception {
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.createTaskQueue(any(), any(), any())).thenReturn(mockTaskQueue());
+        final ClusterName clusterName = new ClusterName("elasticsearch");
+
+        ClusterState state = ClusterState.builder(clusterName).build();
+        ProjectId projectId = randomProjectIdOrDefault();
+        state = setupProject(state, Optional.of(projectId));
+        when(clusterService.state()).thenReturn(state);
+
+        String testJSON1 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
+        String testJSON2 = Strings.format(TEST_CHUNK_TEMPLATE, 1, "test");
+
+        AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+        List<ReservedStateChunk> chunks = new ArrayList<>();
+
+        TestProjectStateHandler projectStateHandler1 = spy(new TestProjectStateHandler("test"));
+
+        ReservedClusterStateService controller = new ReservedClusterStateService(
+            clusterService,
+            mock(RerouteService.class),
+            List.of(),
+            List.of(projectStateHandler1)
+        );
+
+        try (
+            XContentParser chunkParser1 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON1);
+            XContentParser chunkParser2 = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON2)
+        ) {
+            chunks.add(controller.parse(projectId, "test", chunkParser1));
+            chunks.add(controller.parse(projectId, "test", chunkParser2));
+        }
+
+        controller.process(
+            projectId,
+            "test",
+            chunks,
+            randomFrom(ReservedStateVersionCheck.HIGHER_VERSION_ONLY, ReservedStateVersionCheck.HIGHER_OR_SAME_VERSION),
+            exceptionRef::set
+        );
+
+        assertThat(exceptionRef.get(), notNullValue());
+        assertThat(
+            exceptionRef.get().getMessage(),
+            containsString("Failed to merge reserved state chunks because of duplicate keys: [test]")
+        );
+        verify(projectStateHandler1, times(0)).transform(any(), any());
+    }
+
     public void testUpdateErrorState() {
         ClusterService clusterService = mock(ClusterService.class);
         ClusterState state = ClusterState.builder(new ClusterName("test")).build();