Browse Source

Fork calculating the health indicators to Management pool (#91628)

When a request for the health API comes in we first reach out to the
health node (transport request) to fetch the cluster health state. When
we get a response we calculate all the health indicators (except
`master_is_stable` which executes before we do anything else).

Calculating the health indicators can get expensive when a large number
of shards are present in the cluster and we shouldn't keep the transport
worker thread busy with this.

This forks the health indicators health computation to the Management
pool.
Andrei Dan 2 years ago
parent
commit
ec4e9cc0f6

+ 0 - 2
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/HealthRestCancellationIT.java

@@ -9,7 +9,6 @@
 package org.elasticsearch.http;
 
 import org.apache.http.client.methods.HttpGet;
-import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Cancellable;
 import org.elasticsearch.client.Request;
@@ -40,7 +39,6 @@ import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 
-@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91597")
 public class HealthRestCancellationIT extends HttpSmokeTestCase {
 
     @Override

+ 44 - 12
server/src/main/java/org/elasticsearch/health/HealthService.java

@@ -8,14 +8,15 @@
 
 package org.elasticsearch.health;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
 import org.elasticsearch.health.node.HealthInfo;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -44,10 +45,10 @@ public class HealthService {
      * Detail map key that contains the reasons a result was marked as UNKNOWN
      */
     private static final String REASON = "reasons";
-    private static final Logger logger = LogManager.getLogger(HealthService.class);
 
     private final List<HealthIndicatorService> preflightHealthIndicatorServices;
     private final List<HealthIndicatorService> healthIndicatorServices;
+    private final ThreadPool threadPool;
 
     /**
      * Creates a new HealthService.
@@ -62,10 +63,12 @@ public class HealthService {
      */
     public HealthService(
         List<HealthIndicatorService> preflightHealthIndicatorServices,
-        List<HealthIndicatorService> healthIndicatorServices
+        List<HealthIndicatorService> healthIndicatorServices,
+        ThreadPool threadPool
     ) {
         this.preflightHealthIndicatorServices = preflightHealthIndicatorServices;
         this.healthIndicatorServices = healthIndicatorServices;
+        this.threadPool = threadPool;
     }
 
     /**
@@ -105,24 +108,53 @@ public class HealthService {
                 @Override
                 public void onResponse(FetchHealthInfoCacheAction.Response response) {
                     HealthInfo healthInfo = response.getHealthInfo();
-                    validateResultsAndNotifyListener(
+                    // fork off to the management pool as calculating the indicators can run for longer than is acceptable
+                    // on a transport thread in case of large numbers of indices
+                    ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable(
                         indicatorName,
-                        Stream.concat(filteredPreflightResults, filteredIndicators.map(service -> service.calculate(explain, healthInfo)))
-                            .toList(),
+                        healthInfo,
+                        explain,
                         listener
                     );
+
+                    try {
+                        threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable);
+                    } catch (EsRejectedExecutionException e) {
+                        calculateFilteredIndicatorsRunnable.onRejection(e);
+                    }
                 }
 
                 @Override
                 public void onFailure(Exception e) {
-                    validateResultsAndNotifyListener(
+                    // fork off to the management pool as calculating the indicators can run for longer than is acceptable
+                    // on a transport thread in case of large numbers of indices
+                    ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable = calculateFilteredIndicatorsRunnable(
                         indicatorName,
-                        Stream.concat(
-                            filteredPreflightResults,
-                            filteredIndicators.map(service -> service.calculate(explain, HealthInfo.EMPTY_HEALTH_INFO))
-                        ).toList(),
+                        HealthInfo.EMPTY_HEALTH_INFO,
+                        explain,
                         listener
                     );
+                    try {
+                        threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(calculateFilteredIndicatorsRunnable);
+                    } catch (EsRejectedExecutionException esRejectedExecutionException) {
+                        calculateFilteredIndicatorsRunnable.onRejection(esRejectedExecutionException);
+                    }
+                }
+
+                private ActionRunnable<List<HealthIndicatorResult>> calculateFilteredIndicatorsRunnable(
+                    String indicatorName,
+                    HealthInfo healthInfo,
+                    boolean explain,
+                    ActionListener<List<HealthIndicatorResult>> listener
+                ) {
+                    return ActionRunnable.wrap(listener, l -> {
+                        List<HealthIndicatorResult> results = Stream.concat(
+                            filteredPreflightResults,
+                            filteredIndicators.map(service -> service.calculate(explain, healthInfo))
+                        ).toList();
+
+                        validateResultsAndNotifyListener(indicatorName, results, l);
+                    });
                 }
             });
 

+ 5 - 3
server/src/main/java/org/elasticsearch/node/Node.java

@@ -997,7 +997,7 @@ public class Node implements Closeable {
                 discoveryModule.getCoordinator(),
                 masterHistoryService
             );
-            HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService);
+            HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService, threadPool);
             HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
             LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
             HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
@@ -1196,7 +1196,8 @@ public class Node implements Closeable {
     private HealthService createHealthService(
         ClusterService clusterService,
         ClusterModule clusterModule,
-        CoordinationDiagnosticsService coordinationDiagnosticsService
+        CoordinationDiagnosticsService coordinationDiagnosticsService,
+        ThreadPool threadPool
     ) {
         List<HealthIndicatorService> preflightHealthIndicatorServices = Collections.singletonList(
             new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService)
@@ -1214,7 +1215,8 @@ public class Node implements Closeable {
             .toList();
         return new HealthService(
             preflightHealthIndicatorServices,
-            concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices)
+            concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices),
+            threadPool
         );
     }
 

+ 27 - 29
server/src/test/java/org/elasticsearch/health/HealthServiceTests.java

@@ -16,6 +16,10 @@ import org.elasticsearch.health.node.DiskHealthInfo;
 import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
 import org.elasticsearch.health.node.HealthInfo;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,6 +42,19 @@ import static org.mockito.Mockito.mock;
 
 public class HealthServiceTests extends ESTestCase {
 
+    private ThreadPool threadPool;
+
+    @Before
+    public void setupThreadpool() {
+        threadPool = new TestThreadPool(HealthServiceTests.class.getSimpleName());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        threadPool.shutdownNow();
+    }
+
     public void testShouldReturnGroupedIndicators() throws Exception {
 
         var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null);
@@ -50,7 +67,8 @@ public class HealthServiceTests extends ESTestCase {
                 createMockHealthIndicatorService(networkLatency),
                 createMockHealthIndicatorService(slowTasks),
                 createMockHealthIndicatorService(shardsAvailable)
-            )
+            ),
+            threadPool
         );
 
         NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
@@ -94,30 +112,6 @@ public class HealthServiceTests extends ESTestCase {
         };
     }
 
-    public void testDuplicateIndicatorNames() throws Exception {
-        // Same indicator name, should throw exception:
-        var networkLatency = new HealthIndicatorResult(
-            "network_latency",
-            GREEN,
-            null,
-            null,
-            Collections.emptyList(),
-            Collections.emptyList()
-        );
-        var slowTasks = new HealthIndicatorResult("network_latency", YELLOW, null, null, Collections.emptyList(), Collections.emptyList());
-        var service = new HealthService(
-            Collections.emptyList(),
-            List.of(
-                createMockHealthIndicatorService(networkLatency),
-                createMockHealthIndicatorService(slowTasks),
-                createMockHealthIndicatorService(networkLatency)
-            )
-        );
-        NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
-        // This is testing an assertion, so we expect it to blow up in place rather than calling onFailure:
-        assertGetHealthThrowsException(service, client, null, true, AssertionError.class, null, false);
-    }
-
     public void testMissingIndicator() throws Exception {
         var networkLatency = new HealthIndicatorResult("network_latency", GREEN, null, null, null, null);
         var slowTasks = new HealthIndicatorResult("slow_task_assignment", YELLOW, null, null, null, null);
@@ -129,7 +123,8 @@ public class HealthServiceTests extends ESTestCase {
                 createMockHealthIndicatorService(networkLatency),
                 createMockHealthIndicatorService(slowTasks),
                 createMockHealthIndicatorService(shardsAvailable)
-            )
+            ),
+            threadPool
         );
         NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
         assertGetHealthThrowsException(
@@ -216,7 +211,8 @@ public class HealthServiceTests extends ESTestCase {
                 createMockHealthIndicatorService(networkLatency),
                 createMockHealthIndicatorService(slowTasks),
                 createMockHealthIndicatorService(shardsAvailable)
-            )
+            ),
+            threadPool
         );
         NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
 
@@ -255,7 +251,8 @@ public class HealthServiceTests extends ESTestCase {
                 createMockHealthIndicatorService(networkLatency, healthInfo),
                 createMockHealthIndicatorService(slowTasks, healthInfo),
                 createMockHealthIndicatorService(shardsAvailable, healthInfo)
-            )
+            ),
+            threadPool
         );
         NodeClient client = getTestClient(healthInfo);
 
@@ -283,7 +280,8 @@ public class HealthServiceTests extends ESTestCase {
                 createMockHealthIndicatorService(networkLatency),
                 createMockHealthIndicatorService(slowTasks),
                 createMockHealthIndicatorService(shardsAvailable)
-            )
+            ),
+            threadPool
         );
         NodeClient client = getTestClient(HealthInfo.EMPTY_HEALTH_INFO);
         {