Explorar el Código

Decouple DiskThresholdMonitor & ClusterInfoService (#44105)

Today the `ClusterInfoService` requires the `DiskThresholdMonitor` at
construction time so that it can notify it when nodes report changes in their
disk usage, but this is awkward to construct: the `DiskThresholdMonitor`
requires a `RerouteService` which requires an `AllocationService` which comees
from the `ClusterModule` which requires the `ClusterInfoService`.

Today we break the cycle with a `LazilyInitializedRerouteService` which is
itself a little ugly. This commit replaces this with a more traditional
subject/observer relationship between the `ClusterInfoService` and the
`DiskThresholdMonitor`.
David Turner hace 6 años
padre
commit
af512f9dcf

+ 14 - 3
server/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java

@@ -19,12 +19,23 @@
 
 package org.elasticsearch.cluster;
 
+import java.util.function.Consumer;
+
 /**
- * Interface for a class used to gather information about a cluster at
- * regular intervals
+ * Interface for a class used to gather information about a cluster periodically.
  */
+@FunctionalInterface
 public interface ClusterInfoService {
 
-    /** The latest cluster information */
+    /**
+     * @return the latest cluster information
+     */
     ClusterInfo getClusterInfo();
+
+    /**
+     * Add a listener for new cluster information
+     */
+    default void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
+        throw new UnsupportedOperationException();
+    }
 }

+ 8 - 1
server/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java

@@ -19,8 +19,10 @@
 
 package org.elasticsearch.cluster;
 
+import java.util.function.Consumer;
+
 /**
- * ClusterInfoService that provides empty maps for disk usage and shard sizes
+ * {@link ClusterInfoService} that provides empty maps for disk usage and shard sizes
  */
 public class EmptyClusterInfoService implements ClusterInfoService {
     public static final EmptyClusterInfoService INSTANCE = new EmptyClusterInfoService();
@@ -29,4 +31,9 @@ public class EmptyClusterInfoService implements ClusterInfoService {
     public ClusterInfo getClusterInfo() {
         return ClusterInfo.EMPTY;
     }
+
+    @Override
+    public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
+        // never updated, so we can discard the listener
+    }
 }

+ 20 - 8
server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

@@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -46,6 +47,8 @@ import org.elasticsearch.monitor.fs.FsInfo;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ReceiveTimeoutTransportException;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -85,10 +88,9 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
     private final ClusterService clusterService;
     private final ThreadPool threadPool;
     private final NodeClient client;
-    private final Consumer<ClusterInfo> listener;
+    private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
 
-    public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
-                                      Consumer<ClusterInfo> listener) {
+    public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
         this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
         this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
         this.shardRoutingToDataPath = ImmutableOpenMap.of();
@@ -109,7 +111,6 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
         this.clusterService.addLocalNodeMasterListener(this);
         // Add to listen for state changes (when nodes are added)
         this.clusterService.addListener(this);
-        this.listener = listener;
     }
 
     private void setEnabled(boolean enabled) {
@@ -356,14 +357,25 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
             Thread.currentThread().interrupt(); // restore interrupt status
         }
         ClusterInfo clusterInfo = getClusterInfo();
-        try {
-            listener.accept(clusterInfo);
-        } catch (Exception e) {
-            logger.info("Failed executing ClusterInfoService listener", e);
+        boolean anyListeners = false;
+        for (final Consumer<ClusterInfo> listener : listeners) {
+            anyListeners = true;
+            try {
+                logger.trace("notifying [{}] of new cluster info", listener);
+                listener.accept(clusterInfo);
+            } catch (Exception e) {
+                logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
+            }
         }
+        assert anyListeners : "expected to notify at least one listener";
         return clusterInfo;
     }
 
+    @Override
+    public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
+        listeners.add(clusterInfoConsumer);
+    }
+
     static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
                                     ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
         for (ShardStats s : stats) {

+ 0 - 42
server/src/main/java/org/elasticsearch/cluster/routing/LazilyInitializedRerouteService.java

@@ -1,42 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.cluster.routing;
-
-import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.action.ActionListener;
-
-/**
- * A {@link RerouteService} that can be initialized lazily. The real reroute service, {@link BatchedRerouteService}, depends on components
- * constructed quite late in the construction of the node, but other components constructed earlier eventually need access to the reroute
- * service too.
- */
-public class LazilyInitializedRerouteService implements RerouteService {
-
-    private final SetOnce<RerouteService> delegate = new SetOnce<>();
-
-    @Override
-    public void reroute(String reason, ActionListener<Void> listener) {
-        assert delegate.get() != null;
-        delegate.get().reroute(reason, listener);
-    }
-
-    public void setRerouteService(RerouteService rerouteService) {
-        delegate.set(rerouteService);
-    }
-}

+ 8 - 12
server/src/main/java/org/elasticsearch/node/Node.java

@@ -26,8 +26,8 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.Build;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.ActionModule;
+import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
 import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
@@ -38,7 +38,6 @@ import org.elasticsearch.bootstrap.BootstrapCheck;
 import org.elasticsearch.bootstrap.BootstrapContext;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.node.NodeClient;
-import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterName;
@@ -57,7 +56,6 @@ import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.BatchedRerouteService;
-import org.elasticsearch.cluster.routing.LazilyInitializedRerouteService;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -177,7 +175,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -371,11 +368,7 @@ public class Node implements Closeable {
                             .newHashPublisher());
             final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
                 scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
-            final LazilyInitializedRerouteService lazilyInitializedRerouteService = new LazilyInitializedRerouteService();
-            final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
-                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, lazilyInitializedRerouteService);
-            final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
-                diskThresholdMonitor::onNewInfo);
+            final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
             final UsageService usageService = new UsageService();
 
             ModulesBuilder modules = new ModulesBuilder();
@@ -508,7 +501,10 @@ public class Node implements Closeable {
 
             final RerouteService rerouteService
                 = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
-            lazilyInitializedRerouteService.setRerouteService(rerouteService);
+            final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
+                clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
+            clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
+
             final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
                 networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
                 clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
@@ -1017,8 +1013,8 @@ public class Node implements Closeable {
 
     /** Constructs a ClusterInfoService which may be mocked for tests. */
     protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
-                                                       ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listeners) {
-        return new InternalClusterInfoService(settings, clusterService, threadPool, client, listeners);
+                                                       ThreadPool threadPool, NodeClient client) {
+        return new InternalClusterInfoService(settings, clusterService, threadPool, client);
     }
 
     /** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */

+ 25 - 49
server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

@@ -100,12 +100,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
                         new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
                         makeDecider(diskSettings))));
 
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
         AllocationService strategy = new AllocationService(deciders,
                 new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), cis);
@@ -278,12 +275,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
                         new SameShardAllocationDecider(Settings.EMPTY, clusterSettings),
                         makeDecider(diskSettings))));
 
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
 
         AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@@ -328,12 +322,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
         usages = usagesBuilder.build();
         final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes);
-        cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo2;
-            }
+        cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo2;
         };
         strategy = new AllocationService(deciders, new TestGatewayAllocator(),
                 new BalancedShardsAllocator(Settings.EMPTY), cis);
@@ -516,12 +507,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
                         ),
                         makeDecider(diskSettings))));
 
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
 
         AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@@ -580,12 +568,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
                         ),
                         makeDecider(diskSettings))));
 
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
 
         AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@@ -677,12 +662,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
                     Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
                 ), decider)));
 
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
 
         AllocationService strategy = new AllocationService(deciders, new TestGatewayAllocator(),
@@ -856,12 +838,9 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
         }
 
         // Creating AllocationService instance and the services it depends on...
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
         AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(
                 new SameShardAllocationDecider(
@@ -948,13 +927,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
 
         // Two shards should start happily
         assertThat(decision.type(), equalTo(Decision.Type.YES));
-        assertThat(((Decision.Single) decision).getExplanation(), containsString("there is only a single data node present"));
-        ClusterInfoService cis = new ClusterInfoService() {
-            @Override
-            public ClusterInfo getClusterInfo() {
-                logger.info("--> calling fake getClusterInfo");
-                return clusterInfo;
-            }
+        assertThat(decision.getExplanation(), containsString("there is only a single data node present"));
+        ClusterInfoService cis = () -> {
+            logger.info("--> calling fake getClusterInfo");
+            return clusterInfo;
         };
 
         AllocationDeciders deciders = new AllocationDeciders(new HashSet<>(Arrays.asList(

+ 6 - 8
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -18,11 +18,6 @@
  */
 package org.elasticsearch.cluster;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Consumer;
-
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -40,6 +35,10 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 
@@ -71,9 +70,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
             null, null, null, null);
     }
 
-    public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
-                                          Consumer<ClusterInfo> listener) {
-        super(settings, clusterService, threadPool, client, listener);
+    public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
+        super(settings, clusterService, threadPool, client);
         this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
         stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100));
         stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100));

+ 3 - 5
test/framework/src/main/java/org/elasticsearch/node/MockNode.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.node;
 
 import org.elasticsearch.client.node.NodeClient;
-import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.MockInternalClusterInfoService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -55,7 +54,6 @@ import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
-import java.util.function.Consumer;
 import java.util.function.Function;
 
 /**
@@ -159,11 +157,11 @@ public class MockNode extends Node {
 
     @Override
     protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
-                                                       ThreadPool threadPool, NodeClient client, Consumer<ClusterInfo> listener) {
+                                                       ThreadPool threadPool, NodeClient client) {
         if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
-            return super.newClusterInfoService(settings, clusterService, threadPool, client, listener);
+            return super.newClusterInfoService(settings, clusterService, threadPool, client);
         } else {
-            return new MockInternalClusterInfoService(settings, clusterService, threadPool, client, listener);
+            return new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
         }
     }