|
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
-import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
|
|
|
import static org.hamcrest.Matchers.anyOf;
|
|
|
import static org.hamcrest.Matchers.contains;
|
|
|
import static org.hamcrest.Matchers.containsString;
|
|
@@ -54,7 +53,9 @@ import static org.hamcrest.Matchers.empty;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
|
import static org.hamcrest.Matchers.is;
|
|
|
+import static org.hamcrest.Matchers.not;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
+import static org.hamcrest.Matchers.sameInstance;
|
|
|
import static org.hamcrest.Matchers.startsWith;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
import static org.mockito.ArgumentMatchers.anyString;
|
|
@@ -74,6 +75,65 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
return (MasterServiceTaskQueue<T>) mock(MasterServiceTaskQueue.class);
|
|
|
}
|
|
|
|
|
|
+ private static class TestTaskContext<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor.TaskContext<T> {
|
|
|
+ private final T task;
|
|
|
+
|
|
|
+ private TestTaskContext(T task) {
|
|
|
+ this.task = task;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public T getTask() {
|
|
|
+ return task;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void success(Runnable onPublicationSuccess) {
|
|
|
+ onPublicationSuccess.run();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void success(Consumer<ClusterState> publishedStateConsumer) {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void success(Consumer<ClusterState> publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(Exception failure) {}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Releasable captureResponseHeaders() {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class TestStateHandler implements ReservedClusterStateHandler<Map<String, Object>> {
|
|
|
+ private final String name;
|
|
|
+
|
|
|
+ private TestStateHandler(String name) {
|
|
|
+ this.name = name;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String name() {
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
+ ClusterState newState = new ClusterState.Builder(prevState.state()).build();
|
|
|
+ return new TransformState(newState, prevState.keys());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
+ return parser.map();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void testOperatorController() throws IOException {
|
|
|
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
|
|
ClusterService clusterService = mock(ClusterService.class);
|
|
@@ -147,8 +207,7 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
|
|
|
// grab the update task when it gets given to us
|
|
|
when(clusterService.createTaskQueue(ArgumentMatchers.contains("reserved state update"), any(), any())).thenAnswer(i -> {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- MasterServiceTaskQueue<ReservedStateUpdateTask> queue = mock(MasterServiceTaskQueue.class);
|
|
|
+ MasterServiceTaskQueue<ReservedStateUpdateTask> queue = mockTaskQueue();
|
|
|
doNothing().when(queue).submitTask(any(), updateTask.capture(), any());
|
|
|
return queue;
|
|
|
});
|
|
@@ -181,34 +240,12 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
|
|
|
doReturn(state).when(task).execute(any());
|
|
|
|
|
|
- ClusterStateTaskExecutor.TaskContext<ReservedStateUpdateTask> taskContext = new ClusterStateTaskExecutor.TaskContext<>() {
|
|
|
- @Override
|
|
|
- public ReservedStateUpdateTask getTask() {
|
|
|
- return task;
|
|
|
- }
|
|
|
-
|
|
|
+ ClusterStateTaskExecutor.TaskContext<ReservedStateUpdateTask> taskContext = new TestTaskContext<>(task) {
|
|
|
@Override
|
|
|
public void success(Runnable onPublicationSuccess) {
|
|
|
- onPublicationSuccess.run();
|
|
|
+ super.success(onPublicationSuccess);
|
|
|
successCalled.set(true);
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Consumer<ClusterState> publishedStateConsumer) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Consumer<ClusterState> publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception failure) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public Releasable captureResponseHeaders() {
|
|
|
- return null;
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
ClusterState newState = taskExecutor.execute(
|
|
@@ -227,8 +264,7 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
ClusterState state = ClusterState.builder(new ClusterName("test")).build();
|
|
|
|
|
|
ArgumentCaptor<ReservedStateErrorTask> updateTask = ArgumentCaptor.captor();
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- MasterServiceTaskQueue<ReservedStateErrorTask> errorQueue = mock(MasterServiceTaskQueue.class);
|
|
|
+ MasterServiceTaskQueue<ReservedStateErrorTask> errorQueue = mockTaskQueue();
|
|
|
doNothing().when(errorQueue).submitTask(any(), updateTask.capture(), any());
|
|
|
|
|
|
// grab the update task when it gets given to us
|
|
@@ -276,40 +312,8 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
)
|
|
|
);
|
|
|
|
|
|
- ReservedStateErrorTaskExecutor.TaskContext<ReservedStateErrorTask> taskContext =
|
|
|
- new ReservedStateErrorTaskExecutor.TaskContext<>() {
|
|
|
- @Override
|
|
|
- public ReservedStateErrorTask getTask() {
|
|
|
- return task;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Runnable onPublicationSuccess) {
|
|
|
- onPublicationSuccess.run();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Consumer<ClusterState> publishedStateConsumer) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void success(Consumer<ClusterState> publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception failure) {}
|
|
|
-
|
|
|
- @Override
|
|
|
- public Releasable captureResponseHeaders() {
|
|
|
- return null;
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- ReservedStateErrorTaskExecutor executor = new ReservedStateErrorTaskExecutor();
|
|
|
-
|
|
|
- ClusterState newState = executor.execute(
|
|
|
- new ClusterStateTaskExecutor.BatchExecutionContext<>(state, List.of(taskContext), () -> null)
|
|
|
+ ClusterState newState = new ReservedStateErrorTaskExecutor().execute(
|
|
|
+ new ClusterStateTaskExecutor.BatchExecutionContext<>(state, List.of(new TestTaskContext<>(task)), () -> null)
|
|
|
);
|
|
|
|
|
|
verify(task, times(1)).execute(any());
|
|
@@ -324,39 +328,12 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testUpdateTaskDuplicateError() {
|
|
|
- ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new ReservedClusterStateHandler<>() {
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return "maker";
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
- ClusterState newState = new ClusterState.Builder(prevState.state()).build();
|
|
|
- return new TransformState(newState, prevState.keys());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new ReservedClusterStateHandler<>() {
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return "one";
|
|
|
- }
|
|
|
-
|
|
|
+ ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new TestStateHandler("maker");
|
|
|
+ ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new TestStateHandler("one") {
|
|
|
@Override
|
|
|
public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
throw new Exception("anything");
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("one", Set.of("a", "b"));
|
|
@@ -435,22 +412,40 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
public void testCheckMetadataVersion() {
|
|
|
ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("test").version(123L).build();
|
|
|
|
|
|
- assertTrue(checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(124L, Version.CURRENT)));
|
|
|
-
|
|
|
- assertFalse(checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(123L, Version.CURRENT)));
|
|
|
-
|
|
|
- assertFalse(
|
|
|
- checkMetadataVersion("operator", operatorMetadata, new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1)))
|
|
|
+ ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(Metadata.builder().put(operatorMetadata)).build();
|
|
|
+ ReservedStateUpdateTask task = new ReservedStateUpdateTask(
|
|
|
+ "test",
|
|
|
+ new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, Version.CURRENT)),
|
|
|
+ Map.of(),
|
|
|
+ List.of(),
|
|
|
+ e -> {},
|
|
|
+ ActionListener.noop()
|
|
|
+ );
|
|
|
+ assertThat("Cluster state should be modified", task.execute(state), not(sameInstance(state)));
|
|
|
+
|
|
|
+ task = new ReservedStateUpdateTask(
|
|
|
+ "test",
|
|
|
+ new ReservedStateChunk(Map.of(), new ReservedStateVersion(123L, Version.CURRENT)),
|
|
|
+ Map.of(),
|
|
|
+ List.of(),
|
|
|
+ e -> {},
|
|
|
+ ActionListener.noop()
|
|
|
+ );
|
|
|
+ assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
|
|
|
+
|
|
|
+ task = new ReservedStateUpdateTask(
|
|
|
+ "test",
|
|
|
+ new ReservedStateChunk(Map.of(), new ReservedStateVersion(124L, Version.fromId(Version.CURRENT.id + 1))),
|
|
|
+ Map.of(),
|
|
|
+ List.of(),
|
|
|
+ e -> {},
|
|
|
+ ActionListener.noop()
|
|
|
);
|
|
|
+ assertThat("Cluster state should not be modified", task.execute(state), sameInstance(state));
|
|
|
}
|
|
|
|
|
|
- private ReservedClusterStateHandler<Map<String, Object>> makeHandlerHelper(final String name, final List<String> deps) {
|
|
|
- return new ReservedClusterStateHandler<>() {
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return name;
|
|
|
- }
|
|
|
-
|
|
|
+ private ReservedClusterStateHandler<Map<String, Object>> makeHandlerHelper(String name, List<String> deps) {
|
|
|
+ return new TestStateHandler(name) {
|
|
|
@Override
|
|
|
public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
return null;
|
|
@@ -460,11 +455,6 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
public Collection<String> dependencies() {
|
|
|
return deps;
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
};
|
|
|
}
|
|
|
|
|
@@ -519,7 +509,12 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
() -> new ReservedClusterStateService(
|
|
|
clusterService,
|
|
|
mock(RerouteService.class),
|
|
|
- List.of(new ReservedClusterSettingsAction(clusterSettings), new TestHandler())
|
|
|
+ List.of(new ReservedClusterSettingsAction(clusterSettings), new TestStateHandler(ReservedClusterSettingsAction.NAME) {
|
|
|
+ @Override
|
|
|
+ public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
+ return prevState;
|
|
|
+ }
|
|
|
+ })
|
|
|
)
|
|
|
).getMessage(),
|
|
|
startsWith("Duplicate key cluster_settings")
|
|
@@ -545,39 +540,12 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTrialRunExtractsNonStateActions() {
|
|
|
- ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new ReservedClusterStateHandler<>() {
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return "maker";
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public TransformState transform(Object source, TransformState prevState) throws Exception {
|
|
|
- ClusterState newState = new ClusterState.Builder(prevState.state()).build();
|
|
|
- return new TransformState(newState, prevState.keys());
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
- };
|
|
|
-
|
|
|
- ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new ReservedClusterStateHandler<>() {
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return "non-state";
|
|
|
- }
|
|
|
-
|
|
|
+ ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new TestStateHandler("maker");
|
|
|
+ ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new TestStateHandler("non-state") {
|
|
|
@Override
|
|
|
public TransformState transform(Object source, TransformState prevState) {
|
|
|
return new TransformState(prevState.state(), prevState.keys());
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("non-state", Set.of("a", "b"));
|
|
@@ -607,22 +575,4 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
|
|
|
var trialRunErrors = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
|
|
|
assertThat(trialRunErrors, empty());
|
|
|
}
|
|
|
-
|
|
|
- static class TestHandler implements ReservedClusterStateHandler<Map<String, Object>> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public String name() {
|
|
|
- return ReservedClusterSettingsAction.NAME;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public TransformState transform(Object source, TransformState prevState) {
|
|
|
- return prevState;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
|
|
|
- return parser.map();
|
|
|
- }
|
|
|
- }
|
|
|
}
|