Browse Source

Enable the health node and the disk health indicator #84811 (#90085)

Mary Gouseti 3 years ago
parent
commit
4901cf837a

+ 0 - 3
build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/ElasticsearchTestBasePlugin.java

@@ -161,9 +161,6 @@ public class ElasticsearchTestBasePlugin implements Plugin<Project> {
             // TODO: remove this once cname is prepended to transport.publish_address by default in 8.0
             test.systemProperty("es.transport.cname_in_publish_address", "true");
 
-            // TODO: remove this once the disk usage indicator feature is finished #84811
-            test.systemProperty("es.health_node_feature_flag_enabled", true);
-
             // Set netty system properties to the properties we configure in jvm.options
             test.systemProperty("io.netty.noUnsafe", "true");
             test.systemProperty("io.netty.noKeySetOptimization", "true");

+ 0 - 2
docs/build.gradle

@@ -63,8 +63,6 @@ testClusters.matching { it.name == "yamlRestTest"}.configureEach {
     setting 'xpack.license.self_generated.type', 'trial'
     setting 'indices.lifecycle.history_index_enabled', 'false'
     keystorePassword 'keystore-password'
-    // TODO: remove this once the disk usage indicator feature is finished #84811
-    requiresFeature 'es.health_node_feature_flag_enabled', Version.fromString("8.4.0")
   }
 
   // enable regexes in painless so our tests don't complain about example snippets that use them

+ 5 - 0
docs/changelog/90085.yaml

@@ -0,0 +1,5 @@
+pr: 90085
+summary: "Enable the health node and the disk health indicator #84811"
+area: Health
+type: feature
+issues: []

+ 13 - 6
modules/ingest-geoip/qa/full-cluster-restart/src/test/java/org/elasticsearch/ingest/geoip/FullClusterRestartIT.java

@@ -15,6 +15,7 @@ import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.contains;
@@ -81,18 +82,24 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void testDatabasesLoaded() throws IOException {
         Request getTaskState = new Request("GET", "/_cluster/state");
         ObjectPath state = ObjectPath.createFromResponse(client().performRequest(getTaskState));
 
-        Map<String, Object> databases = null;
-        try {
-            databases = state.evaluate("metadata.persistent_tasks.tasks.0.task.geoip-downloader.state.databases");
-        } catch (Exception e) {
-            // ObjectPath doesn't like the 0 above if the list of tasks is empty, and it throws rather than returning null,
-            // catch that and throw an AssertionError instead (which assertBusy will handle)
+        List<?> tasks = state.evaluate("metadata.persistent_tasks.tasks");
+        // Short-circuit to avoid using steams if the list is empty
+        if (tasks.isEmpty()) {
             fail();
         }
+        Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
+            try {
+                return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
+            } catch (IOException e) {
+                return null;
+            }
+        }).filter(Objects::nonNull).findFirst().orElse(null);
+
         assertNotNull(databases);
 
         for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {

+ 2 - 5
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -265,7 +265,6 @@ import org.elasticsearch.health.GetHealthAction;
 import org.elasticsearch.health.RestGetHealthAction;
 import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
 import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
-import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
 import org.elasticsearch.index.seqno.RetentionLeaseActions;
 import org.elasticsearch.indices.SystemIndices;
@@ -703,10 +702,8 @@ public class ActionModule extends AbstractModule {
         actions.register(UpdateDesiredNodesAction.INSTANCE, TransportUpdateDesiredNodesAction.class);
         actions.register(DeleteDesiredNodesAction.INSTANCE, TransportDeleteDesiredNodesAction.class);
 
-        if (HealthNode.isEnabled()) {
-            actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
-            actions.register(FetchHealthInfoCacheAction.INSTANCE, FetchHealthInfoCacheAction.TransportAction.class);
-        }
+        actions.register(UpdateHealthInfoCacheAction.INSTANCE, UpdateHealthInfoCacheAction.TransportAction.class);
+        actions.register(FetchHealthInfoCacheAction.INSTANCE, FetchHealthInfoCacheAction.TransportAction.class);
 
         return unmodifiableMap(actions.getRegistry());
     }

+ 3 - 5
server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

@@ -61,7 +61,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.gateway.GatewayAllocator;
 import org.elasticsearch.health.metadata.HealthMetadataService;
-import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.ingest.IngestMetadata;
@@ -176,10 +175,9 @@ public class ClusterModule extends AbstractModule {
         // Task Status (not Diffable)
         entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
 
-        if (HealthNode.isEnabled()) {
-            entries.addAll(HealthNodeTaskExecutor.getNamedWriteables());
-            entries.addAll(HealthMetadataService.getNamedWriteables());
-        }
+        // Health API
+        entries.addAll(HealthNodeTaskExecutor.getNamedWriteables());
+        entries.addAll(HealthMetadataService.getNamedWriteables());
         return entries;
     }
 

+ 5 - 9
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -69,7 +69,6 @@ import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.PersistedClusterStateService;
 import org.elasticsearch.health.node.LocalHealthMonitor;
 import org.elasticsearch.health.node.action.TransportHealthNodeAction;
-import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
 import org.elasticsearch.http.HttpTransportSettings;
 import org.elasticsearch.index.IndexModule;
@@ -118,11 +117,8 @@ import org.elasticsearch.watcher.ResourceWatcherService;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Encapsulates all valid cluster level settings.
@@ -189,7 +185,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
         }
     }
 
-    public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Stream.of(
+    public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Set.of(
         AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
         AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
         BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
@@ -529,10 +525,10 @@ public final class ClusterSettings extends AbstractScopedSettings {
         CoordinationDiagnosticsService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING,
         MasterHistory.MAX_HISTORY_AGE_SETTING,
         ReadinessService.PORT,
-        HealthNode.isEnabled() ? HealthNodeTaskExecutor.ENABLED_SETTING : null,
-        HealthNode.isEnabled() ? LocalHealthMonitor.POLL_INTERVAL_SETTING : null,
-        HealthNode.isEnabled() ? TransportHealthNodeAction.HEALTH_NODE_TRANSPORT_ACTION_TIMEOUT : null
-    ).filter(Objects::nonNull).collect(Collectors.toSet());
+        HealthNodeTaskExecutor.ENABLED_SETTING,
+        LocalHealthMonitor.POLL_INTERVAL_SETTING,
+        TransportHealthNodeAction.HEALTH_NODE_TRANSPORT_ACTION_TIMEOUT
+    );
 
     static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();
 

+ 26 - 38
server/src/main/java/org/elasticsearch/health/HealthService.java

@@ -16,7 +16,6 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
 import org.elasticsearch.health.node.HealthInfo;
-import org.elasticsearch.health.node.selection.HealthNode;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -101,43 +100,32 @@ public class HealthService {
             .filter(result -> indicatorName == null || result.name().equals(indicatorName));
 
         if (clusterHealthIsObtainable) {
-            if (HealthNode.isEnabled()) {
-                client.execute(FetchHealthInfoCacheAction.INSTANCE, new FetchHealthInfoCacheAction.Request(), new ActionListener<>() {
-                    @Override
-                    public void onResponse(FetchHealthInfoCacheAction.Response response) {
-                        HealthInfo healthInfo = response.getHealthInfo();
-                        validateResultsAndNotifyListener(
-                            indicatorName,
-                            Stream.concat(
-                                filteredPreflightResults,
-                                filteredIndicators.map(service -> service.calculate(explain, healthInfo))
-                            ).toList(),
-                            listener
-                        );
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        validateResultsAndNotifyListener(
-                            indicatorName,
-                            Stream.concat(
-                                filteredPreflightResults,
-                                filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
-                            ).toList(),
-                            listener
-                        );
-                    }
-                });
-            } else {
-                validateResultsAndNotifyListener(
-                    indicatorName,
-                    Stream.concat(
-                        filteredPreflightResults,
-                        filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
-                    ).toList(),
-                    listener
-                );
-            }
+
+            client.execute(FetchHealthInfoCacheAction.INSTANCE, new FetchHealthInfoCacheAction.Request(), new ActionListener<>() {
+                @Override
+                public void onResponse(FetchHealthInfoCacheAction.Response response) {
+                    HealthInfo healthInfo = response.getHealthInfo();
+                    validateResultsAndNotifyListener(
+                        indicatorName,
+                        Stream.concat(filteredPreflightResults, filteredIndicators.map(service -> service.calculate(explain, healthInfo)))
+                            .toList(),
+                        listener
+                    );
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    validateResultsAndNotifyListener(
+                        indicatorName,
+                        Stream.concat(
+                            filteredPreflightResults,
+                            filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
+                        ).toList(),
+                        listener
+                    );
+                }
+            });
+
         } else {
             // Mark remaining indicators as UNKNOWN
             HealthIndicatorDetails unknownDetails = healthUnknownReason(preflightResults, explain);

+ 1 - 1
server/src/main/java/org/elasticsearch/health/metadata/HealthMetadata.java

@@ -50,7 +50,7 @@ public final class HealthMetadata extends AbstractNamedDiffable<ClusterState.Cus
 
     @Override
     public Version getMinimalSupportedVersion() {
-        return Version.V_8_4_0;
+        return Version.V_8_5_0;
     }
 
     @Override

+ 2 - 2
server/src/main/java/org/elasticsearch/health/metadata/HealthMetadataService.java

@@ -129,8 +129,8 @@ public class HealthMetadataService {
         } else if (isMaster == false) {
             readyToPublish = false;
         }
-        // Wait until every node in the cluster is upgraded to 8.4.0 or later
-        if (event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_4_0)) {
+        // Wait until every node in the cluster is upgraded to 8.5.0 or later
+        if (event.state().nodesIfRecovered().getMinNodeVersion().onOrAfter(Version.V_8_5_0)) {
             if (readyToPublish) {
                 resetHealthMetadata("health-metadata-update-master-election");
                 readyToPublish = false;

+ 0 - 6
server/src/main/java/org/elasticsearch/health/node/selection/HealthNode.java

@@ -22,12 +22,6 @@ import java.util.Map;
  */
 public class HealthNode extends AllocatedPersistentTask {
 
-    public static final boolean FEATURE_FLAG_ENABLED = "true".equals(System.getProperty("es.health_node_feature_flag_enabled"));
-
-    public static boolean isEnabled() {
-        return FEATURE_FLAG_ENABLED;
-    }
-
     public static final String TASK_NAME = "health-node";
 
     HealthNode(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {

+ 1 - 1
server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java

@@ -47,7 +47,7 @@ public class HealthNodeTaskParams implements PersistentTaskParams {
 
     @Override
     public Version getMinimalSupportedVersion() {
-        return Version.V_8_4_0;
+        return Version.V_8_5_0;
     }
 
     @Override

+ 16 - 27
server/src/main/java/org/elasticsearch/node/Node.java

@@ -108,7 +108,6 @@ import org.elasticsearch.health.metadata.HealthMetadataService;
 import org.elasticsearch.health.node.DiskHealthIndicatorService;
 import org.elasticsearch.health.node.HealthInfoCache;
 import org.elasticsearch.health.node.LocalHealthMonitor;
-import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
 import org.elasticsearch.http.HttpServerTransport;
 import org.elasticsearch.index.IndexSettingProvider;
@@ -522,9 +521,6 @@ public class Node implements Closeable {
                 SystemIndexMigrationExecutor.getNamedWriteables().stream()
             ).flatMap(Function.identity()).toList();
             final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
-            Stream<NamedXContentRegistry.Entry> healthNodeTaskNamedXContentParsers = HealthNode.isEnabled()
-                ? HealthNodeTaskExecutor.getNamedXContentParsers().stream()
-                : Stream.empty();
             NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
                 Stream.of(
                     NetworkModule.getNamedXContents().stream(),
@@ -533,7 +529,7 @@ public class Node implements Closeable {
                     pluginsService.flatMap(Plugin::getNamedXContent),
                     ClusterModule.getNamedXWriteables().stream(),
                     SystemIndexMigrationExecutor.getNamedXContentParsers().stream(),
-                    healthNodeTaskNamedXContentParsers
+                    HealthNodeTaskExecutor.getNamedXContentParsers().stream()
                 ).flatMap(Function.identity()).collect(toList())
             );
             final List<SystemIndices.Feature> features = pluginsService.filterPlugins(SystemIndexPlugin.class).stream().map(plugin -> {
@@ -915,12 +911,13 @@ public class Node implements Closeable {
                 metadataCreateIndexService,
                 settingsModule.getIndexScopedSettings()
             );
-            final HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNode.isEnabled()
-                ? HealthNodeTaskExecutor.create(clusterService, persistentTasksService, settings, clusterService.getClusterSettings())
-                : null;
-            final List<PersistentTasksExecutor<?>> builtinTaskExecutors = HealthNode.isEnabled()
-                ? List.of(systemIndexMigrationExecutor, healthNodeTaskExecutor)
-                : List.of(systemIndexMigrationExecutor);
+            final HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNodeTaskExecutor.create(
+                clusterService,
+                persistentTasksService,
+                settings,
+                clusterService.getClusterSettings()
+            );
+            final List<PersistentTasksExecutor<?>> builtinTaskExecutors = List.of(systemIndexMigrationExecutor, healthNodeTaskExecutor);
             final List<PersistentTasksExecutor<?>> pluginTaskExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
                 .stream()
                 .map(
@@ -962,13 +959,9 @@ public class Node implements Closeable {
                 masterHistoryService
             );
             HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService);
-            HealthMetadataService healthMetadataService = HealthNode.isEnabled()
-                ? HealthMetadataService.create(clusterService, settings)
-                : null;
-            LocalHealthMonitor localHealthMonitor = HealthNode.isEnabled()
-                ? LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client)
-                : null;
-            HealthInfoCache nodeHealthOverview = HealthNode.isEnabled() ? HealthInfoCache.create(clusterService) : null;
+            HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
+            LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
+            HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
 
             modules.add(b -> {
                 b.bind(Node.class).toInstance(this);
@@ -1053,12 +1046,10 @@ public class Node implements Closeable {
                 b.bind(HealthService.class).toInstance(healthService);
                 b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
                 b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
-                if (HealthNode.isEnabled()) {
-                    b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
-                    b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
-                    b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
-                    b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
-                }
+                b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
+                b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
+                b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
+                b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
                 b.bind(Tracer.class).toInstance(tracer);
                 b.bind(FileSettingsService.class).toInstance(fileSettingsService);
             });
@@ -1171,9 +1162,7 @@ public class Node implements Closeable {
                 new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
             )
         );
-        if (HealthNode.isEnabled()) {
-            serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
-        }
+        serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
         var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
             .stream()
             .flatMap(plugin -> plugin.getHealthIndicatorServices().stream())

+ 1 - 2
server/src/test/java/org/elasticsearch/health/HealthServiceTests.java

@@ -15,7 +15,6 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.health.node.DiskHealthInfo;
 import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
 import org.elasticsearch.health.node.HealthInfo;
-import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.Collections;
@@ -370,7 +369,7 @@ public class HealthServiceTests extends ESTestCase {
 
             @Override
             public HealthIndicatorResult calculate(boolean explain, HealthInfo healthInfo) {
-                if (expectedHealthInfo != null && HealthNode.isEnabled()) {
+                if (expectedHealthInfo != null) {
                     assertThat(healthInfo, equalTo(expectedHealthInfo));
                 }
                 return result;

+ 12 - 5
x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.core.CheckedRunnable;
+import org.elasticsearch.health.node.selection.HealthNode;
 import org.elasticsearch.index.seqno.ReplicationTracker;
 import org.elasticsearch.test.rest.ObjectPath;
 
@@ -79,7 +80,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
             pauseFollow(allowedIndex);
             // Make sure that there are no other ccr relates operations running:
             assertBusy(() -> {
-                assertNoPersistentTasks();
+                assertNoPendingPersistentTasks();
                 assertThat(getCcrNodeTasks(), empty());
             });
 
@@ -88,7 +89,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
             pauseFollow(allowedIndex);
             // Make sure that there are no other ccr relates operations running:
             assertBusy(() -> {
-                assertNoPersistentTasks();
+                assertNoPendingPersistentTasks();
                 assertThat(getCcrNodeTasks(), empty());
             });
 
@@ -266,7 +267,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
             deleteIndex(client(), cleanFollower);
             // the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
             assertBusy(() -> {
-                assertNoPersistentTasks();
+                assertNoPendingPersistentTasks();
                 assertThat(getCcrNodeTasks(), empty());
             });
         }
@@ -340,9 +341,15 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
         }
     }
 
-    private static void assertNoPersistentTasks() throws IOException {
+    private static void assertNoPendingPersistentTasks() throws IOException {
         Map<String, Object> clusterState = toMap(adminClient().performRequest(new Request("GET", "/_cluster/state")));
-        List<?> tasks = (List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState);
+        List<?> tasks = ((List<?>) XContentMapValues.extractValue("metadata.persistent_tasks.tasks", clusterState)).stream()
+            .filter(
+                task -> (((task instanceof Map<?, ?> taskMap)
+                    && taskMap.containsKey("id")
+                    && taskMap.get("id").equals(HealthNode.TASK_NAME))) == false
+            )
+            .toList();
         assertThat(tasks, empty());
     }
 }

+ 2 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -253,8 +253,10 @@ public class Constants {
         "cluster:monitor/ccr/follow_stats",
         "cluster:monitor/ccr/stats",
         "cluster:monitor/eql/async/status",
+        "cluster:monitor/fetch/health/info",
         "cluster:monitor/health",
         "cluster:monitor/health_api",
+        "cluster:monitor/update/health/info",
         "cluster:monitor/ingest/geoip/stats",
         "cluster:monitor/main",
         "cluster:monitor/nodes/hot_threads",