|
|
@@ -60,7 +60,6 @@ import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.cluster.service.TransportVersionsFixupListener;
|
|
|
import org.elasticsearch.cluster.version.CompatibilityVersions;
|
|
|
-import org.elasticsearch.common.breaker.CircuitBreaker;
|
|
|
import org.elasticsearch.common.component.LifecycleComponent;
|
|
|
import org.elasticsearch.common.inject.Injector;
|
|
|
import org.elasticsearch.common.inject.Key;
|
|
|
@@ -80,6 +79,7 @@ import org.elasticsearch.common.settings.SettingsModule;
|
|
|
import org.elasticsearch.common.util.BigArrays;
|
|
|
import org.elasticsearch.common.util.PageCacheRecycler;
|
|
|
import org.elasticsearch.core.IOUtils;
|
|
|
+import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.discovery.DiscoveryModule;
|
|
|
import org.elasticsearch.env.Environment;
|
|
|
import org.elasticsearch.env.NodeEnvironment;
|
|
|
@@ -113,7 +113,6 @@ import org.elasticsearch.indices.ShardLimitValidator;
|
|
|
import org.elasticsearch.indices.SystemIndexMappingUpdateService;
|
|
|
import org.elasticsearch.indices.SystemIndices;
|
|
|
import org.elasticsearch.indices.analysis.AnalysisModule;
|
|
|
-import org.elasticsearch.indices.breaker.BreakerSettings;
|
|
|
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
|
|
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
|
|
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
|
|
@@ -672,19 +671,10 @@ class NodeConstruction {
|
|
|
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList());
|
|
|
modules.add(indicesModule);
|
|
|
|
|
|
- List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
|
|
|
- .map(plugin -> plugin.getCircuitBreaker(settings))
|
|
|
- .toList();
|
|
|
- final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
|
|
|
+ CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
|
|
|
settingsModule.getSettings(),
|
|
|
- pluginCircuitBreakers,
|
|
|
settingsModule.getClusterSettings()
|
|
|
);
|
|
|
- pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
|
|
|
- CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
|
|
|
- plugin.setCircuitBreaker(breaker);
|
|
|
- });
|
|
|
- resourcesToClose.add(circuitBreakerService);
|
|
|
modules.add(new GatewayModule());
|
|
|
|
|
|
CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
|
|
|
@@ -1149,7 +1139,6 @@ class NodeConstruction {
|
|
|
|
|
|
modules.add(b -> {
|
|
|
b.bind(NodeService.class).toInstance(nodeService);
|
|
|
- b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
|
|
|
b.bind(BigArrays.class).toInstance(bigArrays);
|
|
|
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
|
|
|
b.bind(IngestService.class).toInstance(ingestService);
|
|
|
@@ -1357,21 +1346,31 @@ class NodeConstruction {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Creates a new {@link CircuitBreakerService} based on the settings provided.
|
|
|
+ * Create and initialize a new {@link CircuitBreakerService} based on the settings provided.
|
|
|
*
|
|
|
* @see Node#BREAKER_TYPE_KEY
|
|
|
*/
|
|
|
- private static CircuitBreakerService createCircuitBreakerService(
|
|
|
- Settings settings,
|
|
|
- List<BreakerSettings> breakerSettings,
|
|
|
- ClusterSettings clusterSettings
|
|
|
- ) {
|
|
|
+ private CircuitBreakerService createCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
|
|
|
+ var pluginBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
|
|
|
+ .map(p -> Tuple.tuple(p, p.getCircuitBreaker(settings)))
|
|
|
+ .toList();
|
|
|
+
|
|
|
String type = Node.BREAKER_TYPE_KEY.get(settings);
|
|
|
- return switch (type) {
|
|
|
- case "hierarchy" -> new HierarchyCircuitBreakerService(settings, breakerSettings, clusterSettings);
|
|
|
+ CircuitBreakerService circuitBreakerService = switch (type) {
|
|
|
+ case "hierarchy" -> new HierarchyCircuitBreakerService(
|
|
|
+ settings,
|
|
|
+ pluginBreakers.stream().map(Tuple::v2).toList(),
|
|
|
+ clusterSettings
|
|
|
+ );
|
|
|
case "none" -> new NoneCircuitBreakerService();
|
|
|
default -> throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
|
|
|
};
|
|
|
+ resourcesToClose.add(circuitBreakerService);
|
|
|
+ modules.bindToInstance(CircuitBreakerService.class, circuitBreakerService);
|
|
|
+
|
|
|
+ pluginBreakers.forEach(t -> t.v1().setCircuitBreaker(circuitBreakerService.getBreaker(t.v2().getName())));
|
|
|
+
|
|
|
+ return circuitBreakerService;
|
|
|
}
|
|
|
|
|
|
/**
|