Browse Source

Change the ClusterService reroute reference to a supplier to match other uses (#102690)

Simon Cooper 1 year ago
parent
commit
97770aaab6

+ 32 - 8
server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -31,6 +31,8 @@ import org.elasticsearch.node.Node;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.function.Supplier;
+
 public class ClusterService extends AbstractLifecycleComponent {
     private final MasterService masterService;
 
@@ -54,13 +56,24 @@ public class ClusterService extends AbstractLifecycleComponent {
 
     private final String nodeName;
 
-    private RerouteService rerouteService;
+    private final Supplier<RerouteService> rerouteService;
 
     public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) {
+        this(settings, clusterSettings, threadPool, taskManager, () -> { throw new IllegalStateException("RerouteService not provided"); });
+    }
+
+    public ClusterService(
+        Settings settings,
+        ClusterSettings clusterSettings,
+        ThreadPool threadPool,
+        TaskManager taskManager,
+        Supplier<RerouteService> rerouteService
+    ) {
         this(
             settings,
             clusterSettings,
             new MasterService(settings, clusterSettings, threadPool, taskManager),
+            rerouteService,
             new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
         );
     }
@@ -70,10 +83,27 @@ public class ClusterService extends AbstractLifecycleComponent {
         ClusterSettings clusterSettings,
         MasterService masterService,
         ClusterApplierService clusterApplierService
+    ) {
+        this(
+            settings,
+            clusterSettings,
+            masterService,
+            () -> { throw new IllegalStateException("RerouteService not provided"); },
+            clusterApplierService
+        );
+    }
+
+    public ClusterService(
+        Settings settings,
+        ClusterSettings clusterSettings,
+        MasterService masterService,
+        Supplier<RerouteService> rerouteService,
+        ClusterApplierService clusterApplierService
     ) {
         this.settings = settings;
         this.nodeName = Node.NODE_NAME_SETTING.get(settings);
         this.masterService = masterService;
+        this.rerouteService = rerouteService;
         this.operationRouting = new OperationRouting(settings, clusterSettings);
         this.clusterSettings = clusterSettings;
         this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
@@ -90,14 +120,8 @@ public class ClusterService extends AbstractLifecycleComponent {
         clusterApplierService.setNodeConnectionsService(nodeConnectionsService);
     }
 
-    public void setRerouteService(RerouteService rerouteService) {
-        assert this.rerouteService == null : "RerouteService is already set";
-        this.rerouteService = rerouteService;
-    }
-
     public RerouteService getRerouteService() {
-        assert this.rerouteService != null : "RerouteService not set";
-        return rerouteService;
+        return rerouteService.get();
     }
 
     @Override

+ 10 - 5
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -607,7 +607,8 @@ class NodeConstruction {
             telemetryProvider.getTracer()
         );
 
-        ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
+        final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
+        ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager, rerouteServiceReference::get);
         clusterService.addStateApplier(scriptService);
 
         Supplier<DocumentParsingObserver> documentParsingObserverSupplier = getDocumentParsingObserverSupplier();
@@ -627,7 +628,6 @@ class NodeConstruction {
         SystemIndices systemIndices = createSystemIndices(settings);
 
         final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
-        final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
         final ClusterInfoService clusterInfoService = serviceProvider.newClusterInfoService(
             pluginsService,
             settings,
@@ -656,7 +656,6 @@ class NodeConstruction {
 
         RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
         rerouteServiceReference.set(rerouteService);
-        clusterService.setRerouteService(rerouteService);
 
         clusterInfoService.addListener(
             new DiskThresholdMonitor(
@@ -1075,12 +1074,18 @@ class NodeConstruction {
         postInjection(clusterModule, actionModule, clusterService, transportService, featureService);
     }
 
-    private ClusterService createClusterService(SettingsModule settingsModule, ThreadPool threadPool, TaskManager taskManager) {
+    private ClusterService createClusterService(
+        SettingsModule settingsModule,
+        ThreadPool threadPool,
+        TaskManager taskManager,
+        Supplier<RerouteService> rerouteService
+    ) {
         ClusterService clusterService = new ClusterService(
             settingsModule.getSettings(),
             settingsModule.getClusterSettings(),
             threadPool,
-            taskManager
+            taskManager,
+            rerouteService
         );
         resourcesToClose.add(clusterService);
 

+ 3 - 3
server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java

@@ -62,18 +62,19 @@ public class FileSettingsServiceTests extends ESTestCase {
     private ThreadPool threadpool;
 
     @Before
-    @SuppressWarnings("unchecked")
     public void setUp() throws Exception {
         super.setUp();
 
         threadpool = new TestThreadPool("file_settings_service_tests");
 
+        var reroute = mock(RerouteService.class);
         clusterService = spy(
             new ClusterService(
                 Settings.builder().put(NODE_NAME_SETTING.getKey(), "test").build(),
                 new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
                 threadpool,
-                new TaskManager(Settings.EMPTY, threadpool, Set.of())
+                new TaskManager(Settings.EMPTY, threadpool, Set.of()),
+                () -> reroute
             )
         );
 
@@ -83,7 +84,6 @@ public class FileSettingsServiceTests extends ESTestCase {
             .build();
         doAnswer((Answer<ClusterState>) invocation -> clusterState).when(clusterService).state();
 
-        clusterService.setRerouteService(mock(RerouteService.class));
         clusterService.setNodeConnectionsService(mock(NodeConnectionsService.class));
         clusterService.getClusterApplierService().setInitialState(clusterState);
         clusterService.getMasterService().setClusterStatePublisher((e, pl, al) -> {

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -1612,6 +1612,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     settings,
                     clusterSettings,
                     masterService,
+                    () -> (reason, priority, listener) -> listener.onResponse(null),
                     new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
                         @Override
                         protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
@@ -1641,7 +1642,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
                         }
                     }
                 );
-                clusterService.setRerouteService((reason, priority, listener) -> listener.onResponse(null));
                 recoverySettings = new RecoverySettings(settings, clusterSettings);
                 mockTransport = new DisruptableMockTransport(node, deterministicTaskQueue) {
                     @Override