NodeConstruction.java 69 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.node;
  9. import org.apache.logging.log4j.LogManager;
  10. import org.apache.logging.log4j.Logger;
  11. import org.apache.lucene.search.IndexSearcher;
  12. import org.apache.lucene.util.Constants;
  13. import org.apache.lucene.util.SetOnce;
  14. import org.elasticsearch.Build;
  15. import org.elasticsearch.ElasticsearchException;
  16. import org.elasticsearch.TransportVersion;
  17. import org.elasticsearch.action.ActionModule;
  18. import org.elasticsearch.action.ActionRequest;
  19. import org.elasticsearch.action.ActionResponse;
  20. import org.elasticsearch.action.ActionType;
  21. import org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction;
  22. import org.elasticsearch.action.admin.indices.template.reservedstate.ReservedComposableIndexTemplateAction;
  23. import org.elasticsearch.action.ingest.ReservedPipelineAction;
  24. import org.elasticsearch.action.search.SearchExecutionStatsCollector;
  25. import org.elasticsearch.action.search.SearchPhaseController;
  26. import org.elasticsearch.action.search.SearchTransportAPMMetrics;
  27. import org.elasticsearch.action.search.SearchTransportService;
  28. import org.elasticsearch.action.support.TransportAction;
  29. import org.elasticsearch.action.update.UpdateHelper;
  30. import org.elasticsearch.client.internal.Client;
  31. import org.elasticsearch.client.internal.node.NodeClient;
  32. import org.elasticsearch.cluster.ClusterInfoService;
  33. import org.elasticsearch.cluster.ClusterModule;
  34. import org.elasticsearch.cluster.ClusterName;
  35. import org.elasticsearch.cluster.ClusterState;
  36. import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
  37. import org.elasticsearch.cluster.coordination.Coordinator;
  38. import org.elasticsearch.cluster.coordination.MasterHistoryService;
  39. import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
  40. import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
  41. import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
  42. import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
  43. import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
  44. import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
  45. import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
  46. import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
  47. import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
  48. import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
  49. import org.elasticsearch.cluster.node.DiscoveryNode;
  50. import org.elasticsearch.cluster.node.DiscoveryNodeRole;
  51. import org.elasticsearch.cluster.routing.BatchedRerouteService;
  52. import org.elasticsearch.cluster.routing.RerouteService;
  53. import org.elasticsearch.cluster.routing.allocation.AllocationService;
  54. import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
  55. import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
  56. import org.elasticsearch.cluster.service.ClusterService;
  57. import org.elasticsearch.cluster.service.TransportVersionsFixupListener;
  58. import org.elasticsearch.cluster.version.CompatibilityVersions;
  59. import org.elasticsearch.common.breaker.CircuitBreaker;
  60. import org.elasticsearch.common.component.LifecycleComponent;
  61. import org.elasticsearch.common.inject.Injector;
  62. import org.elasticsearch.common.inject.Key;
  63. import org.elasticsearch.common.inject.Module;
  64. import org.elasticsearch.common.inject.ModulesBuilder;
  65. import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
  66. import org.elasticsearch.common.logging.DeprecationCategory;
  67. import org.elasticsearch.common.logging.DeprecationLogger;
  68. import org.elasticsearch.common.logging.HeaderWarning;
  69. import org.elasticsearch.common.network.NetworkModule;
  70. import org.elasticsearch.common.network.NetworkService;
  71. import org.elasticsearch.common.settings.ClusterSettings;
  72. import org.elasticsearch.common.settings.ConsistentSettingsService;
  73. import org.elasticsearch.common.settings.Setting;
  74. import org.elasticsearch.common.settings.Settings;
  75. import org.elasticsearch.common.settings.SettingsModule;
  76. import org.elasticsearch.common.util.BigArrays;
  77. import org.elasticsearch.common.util.PageCacheRecycler;
  78. import org.elasticsearch.core.IOUtils;
  79. import org.elasticsearch.core.TimeValue;
  80. import org.elasticsearch.core.Tuple;
  81. import org.elasticsearch.discovery.DiscoveryModule;
  82. import org.elasticsearch.env.Environment;
  83. import org.elasticsearch.env.NodeEnvironment;
  84. import org.elasticsearch.features.FeatureService;
  85. import org.elasticsearch.features.FeatureSpecification;
  86. import org.elasticsearch.gateway.GatewayAllocator;
  87. import org.elasticsearch.gateway.GatewayMetaState;
  88. import org.elasticsearch.gateway.GatewayModule;
  89. import org.elasticsearch.gateway.MetaStateService;
  90. import org.elasticsearch.gateway.PersistedClusterStateService;
  91. import org.elasticsearch.health.HealthPeriodicLogger;
  92. import org.elasticsearch.health.HealthService;
  93. import org.elasticsearch.health.metadata.HealthMetadataService;
  94. import org.elasticsearch.health.node.DiskHealthIndicatorService;
  95. import org.elasticsearch.health.node.HealthInfoCache;
  96. import org.elasticsearch.health.node.LocalHealthMonitor;
  97. import org.elasticsearch.health.node.ShardsCapacityHealthIndicatorService;
  98. import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
  99. import org.elasticsearch.health.node.tracker.DiskHealthTracker;
  100. import org.elasticsearch.health.node.tracker.HealthTracker;
  101. import org.elasticsearch.health.node.tracker.RepositoriesHealthTracker;
  102. import org.elasticsearch.health.stats.HealthApiStats;
  103. import org.elasticsearch.http.HttpServerTransport;
  104. import org.elasticsearch.index.IndexSettingProvider;
  105. import org.elasticsearch.index.IndexSettingProviders;
  106. import org.elasticsearch.index.IndexingPressure;
  107. import org.elasticsearch.index.analysis.AnalysisRegistry;
  108. import org.elasticsearch.indices.ExecutorSelector;
  109. import org.elasticsearch.indices.IndicesModule;
  110. import org.elasticsearch.indices.IndicesService;
  111. import org.elasticsearch.indices.IndicesServiceBuilder;
  112. import org.elasticsearch.indices.ShardLimitValidator;
  113. import org.elasticsearch.indices.SystemIndexMappingUpdateService;
  114. import org.elasticsearch.indices.SystemIndices;
  115. import org.elasticsearch.indices.analysis.AnalysisModule;
  116. import org.elasticsearch.indices.breaker.CircuitBreakerMetrics;
  117. import org.elasticsearch.indices.breaker.CircuitBreakerService;
  118. import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
  119. import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
  120. import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
  121. import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
  122. import org.elasticsearch.indices.recovery.RecoverySettings;
  123. import org.elasticsearch.indices.recovery.SnapshotFilesProvider;
  124. import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService;
  125. import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
  126. import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
  127. import org.elasticsearch.inference.InferenceServiceRegistry;
  128. import org.elasticsearch.inference.ModelRegistry;
  129. import org.elasticsearch.ingest.IngestService;
  130. import org.elasticsearch.monitor.MonitorService;
  131. import org.elasticsearch.monitor.fs.FsHealthService;
  132. import org.elasticsearch.monitor.jvm.JvmInfo;
  133. import org.elasticsearch.monitor.metrics.NodeMetrics;
  134. import org.elasticsearch.node.internal.TerminationHandler;
  135. import org.elasticsearch.node.internal.TerminationHandlerProvider;
  136. import org.elasticsearch.persistent.PersistentTasksClusterService;
  137. import org.elasticsearch.persistent.PersistentTasksExecutor;
  138. import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
  139. import org.elasticsearch.persistent.PersistentTasksService;
  140. import org.elasticsearch.plugins.ActionPlugin;
  141. import org.elasticsearch.plugins.AnalysisPlugin;
  142. import org.elasticsearch.plugins.CircuitBreakerPlugin;
  143. import org.elasticsearch.plugins.ClusterCoordinationPlugin;
  144. import org.elasticsearch.plugins.ClusterPlugin;
  145. import org.elasticsearch.plugins.DiscoveryPlugin;
  146. import org.elasticsearch.plugins.HealthPlugin;
  147. import org.elasticsearch.plugins.InferenceRegistryPlugin;
  148. import org.elasticsearch.plugins.IngestPlugin;
  149. import org.elasticsearch.plugins.MapperPlugin;
  150. import org.elasticsearch.plugins.MetadataUpgrader;
  151. import org.elasticsearch.plugins.NetworkPlugin;
  152. import org.elasticsearch.plugins.PersistentTaskPlugin;
  153. import org.elasticsearch.plugins.Plugin;
  154. import org.elasticsearch.plugins.PluginsService;
  155. import org.elasticsearch.plugins.RecoveryPlannerPlugin;
  156. import org.elasticsearch.plugins.ReloadablePlugin;
  157. import org.elasticsearch.plugins.RepositoryPlugin;
  158. import org.elasticsearch.plugins.ScriptPlugin;
  159. import org.elasticsearch.plugins.SearchPlugin;
  160. import org.elasticsearch.plugins.ShutdownAwarePlugin;
  161. import org.elasticsearch.plugins.SystemIndexPlugin;
  162. import org.elasticsearch.plugins.TelemetryPlugin;
  163. import org.elasticsearch.plugins.internal.DocumentParsingProvider;
  164. import org.elasticsearch.plugins.internal.DocumentParsingProviderPlugin;
  165. import org.elasticsearch.plugins.internal.ReloadAwarePlugin;
  166. import org.elasticsearch.plugins.internal.RestExtension;
  167. import org.elasticsearch.plugins.internal.SettingsExtension;
  168. import org.elasticsearch.readiness.ReadinessService;
  169. import org.elasticsearch.repositories.RepositoriesModule;
  170. import org.elasticsearch.repositories.RepositoriesService;
  171. import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
  172. import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
  173. import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
  174. import org.elasticsearch.reservedstate.service.FileSettingsService;
  175. import org.elasticsearch.rest.action.search.SearchResponseMetrics;
  176. import org.elasticsearch.script.ScriptModule;
  177. import org.elasticsearch.script.ScriptService;
  178. import org.elasticsearch.search.SearchModule;
  179. import org.elasticsearch.search.SearchService;
  180. import org.elasticsearch.search.SearchUtils;
  181. import org.elasticsearch.search.aggregations.support.AggregationUsageService;
  182. import org.elasticsearch.shutdown.PluginShutdownService;
  183. import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
  184. import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
  185. import org.elasticsearch.snapshots.RestoreService;
  186. import org.elasticsearch.snapshots.SnapshotShardsService;
  187. import org.elasticsearch.snapshots.SnapshotsInfoService;
  188. import org.elasticsearch.snapshots.SnapshotsService;
  189. import org.elasticsearch.tasks.Task;
  190. import org.elasticsearch.tasks.TaskManager;
  191. import org.elasticsearch.telemetry.TelemetryProvider;
  192. import org.elasticsearch.telemetry.metric.MeterRegistry;
  193. import org.elasticsearch.telemetry.tracing.Tracer;
  194. import org.elasticsearch.threadpool.ExecutorBuilder;
  195. import org.elasticsearch.threadpool.ThreadPool;
  196. import org.elasticsearch.transport.Transport;
  197. import org.elasticsearch.transport.TransportService;
  198. import org.elasticsearch.upgrades.SystemIndexMigrationExecutor;
  199. import org.elasticsearch.usage.UsageService;
  200. import org.elasticsearch.watcher.ResourceWatcherService;
  201. import org.elasticsearch.xcontent.NamedXContentRegistry;
  202. import java.io.Closeable;
  203. import java.io.IOException;
  204. import java.io.UncheckedIOException;
  205. import java.util.ArrayList;
  206. import java.util.Arrays;
  207. import java.util.Collection;
  208. import java.util.LinkedHashSet;
  209. import java.util.List;
  210. import java.util.Locale;
  211. import java.util.Map;
  212. import java.util.Objects;
  213. import java.util.Optional;
  214. import java.util.Set;
  215. import java.util.concurrent.TimeUnit;
  216. import java.util.function.Function;
  217. import java.util.function.Supplier;
  218. import java.util.stream.Collectors;
  219. import java.util.stream.Stream;
  220. import static org.elasticsearch.core.Types.forciblyCast;
  221. /**
  222. * Class uses to perform all the operations needed to construct a {@link Node} instance.
  223. * <p>
  224. * Constructing a {@link Node} is a complex operation, involving many interdependent services.
  225. * Separating out this logic into a dedicated class is a lot clearer and more flexible than
  226. * doing all this logic inside a constructor in {@link Node}.
  227. */
  228. class NodeConstruction {
  229. /**
  230. * Prepare everything needed to create a {@link Node} instance.
  231. *
  232. * @param initialEnvironment the initial environment for this node, which will be added to by plugins
  233. * @param serviceProvider provides various service implementations that could be mocked
  234. * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
  235. * test framework for tests that rely on being able to set private settings
  236. */
  237. static NodeConstruction prepareConstruction(
  238. Environment initialEnvironment,
  239. NodeServiceProvider serviceProvider,
  240. boolean forbidPrivateIndexSettings
  241. ) {
  242. List<Closeable> closeables = new ArrayList<>();
  243. try {
  244. NodeConstruction constructor = new NodeConstruction(closeables);
  245. Settings settings = constructor.createEnvironment(initialEnvironment, serviceProvider);
  246. TelemetryProvider telemetryProvider = constructor.createTelemetryProvider(settings);
  247. ThreadPool threadPool = constructor.createThreadPool(settings, telemetryProvider.getMeterRegistry());
  248. SettingsModule settingsModule = constructor.validateSettings(initialEnvironment.settings(), settings, threadPool);
  249. SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool);
  250. constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);
  251. ScriptService scriptService = constructor.createScriptService(settingsModule, threadPool, serviceProvider);
  252. constructor.construct(
  253. threadPool,
  254. settingsModule,
  255. searchModule,
  256. scriptService,
  257. constructor.createAnalysisRegistry(),
  258. serviceProvider,
  259. forbidPrivateIndexSettings,
  260. telemetryProvider
  261. );
  262. return constructor;
  263. } catch (IOException e) {
  264. IOUtils.closeWhileHandlingException(closeables);
  265. throw new ElasticsearchException("Failed to bind service", e);
  266. } catch (Throwable t) {
  267. IOUtils.closeWhileHandlingException(closeables);
  268. throw t;
  269. }
  270. }
  271. /**
  272. * See comments on Node#logger for why this is not static
  273. */
  274. private final Logger logger = LogManager.getLogger(Node.class);
  275. private final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(Node.class);
  276. private final List<Closeable> resourcesToClose;
  277. private final ModulesBuilder modules = new ModulesBuilder();
  278. /*
  279. * References for storing in a Node
  280. */
  281. private Injector injector;
  282. private Environment environment;
  283. private NodeEnvironment nodeEnvironment;
  284. private PluginsService pluginsService;
  285. private NodeClient client;
  286. private Collection<LifecycleComponent> pluginLifecycleComponents;
  287. private Node.LocalNodeFactory localNodeFactory;
  288. private NodeService nodeService;
  289. private TerminationHandler terminationHandler;
  290. private NamedWriteableRegistry namedWriteableRegistry;
  291. private NamedXContentRegistry xContentRegistry;
  292. private NodeConstruction(List<Closeable> resourcesToClose) {
  293. this.resourcesToClose = resourcesToClose;
  294. }
  295. Injector injector() {
  296. return injector;
  297. }
  298. Environment environment() {
  299. return environment;
  300. }
  301. NodeEnvironment nodeEnvironment() {
  302. return nodeEnvironment;
  303. }
  304. PluginsService pluginsService() {
  305. return pluginsService;
  306. }
  307. NodeClient client() {
  308. return client;
  309. }
  310. Collection<LifecycleComponent> pluginLifecycleComponents() {
  311. return pluginLifecycleComponents;
  312. }
  313. Node.LocalNodeFactory localNodeFactory() {
  314. return localNodeFactory;
  315. }
  316. NodeService nodeService() {
  317. return nodeService;
  318. }
  319. TerminationHandler terminationHandler() {
  320. return terminationHandler;
  321. }
  322. NamedWriteableRegistry namedWriteableRegistry() {
  323. return namedWriteableRegistry;
  324. }
  325. NamedXContentRegistry namedXContentRegistry() {
  326. return xContentRegistry;
  327. }
  328. private <T> Optional<T> getSinglePlugin(Class<T> pluginClass) {
  329. return getSinglePlugin(pluginsService.filterPlugins(pluginClass), pluginClass);
  330. }
  331. private static <T> Optional<T> getSinglePlugin(Stream<T> plugins, Class<T> pluginClass) {
  332. var it = plugins.iterator();
  333. if (it.hasNext() == false) {
  334. return Optional.empty();
  335. }
  336. T plugin = it.next();
  337. if (it.hasNext()) {
  338. List<T> allPlugins = new ArrayList<>();
  339. allPlugins.add(plugin);
  340. it.forEachRemaining(allPlugins::add);
  341. throw new IllegalStateException("A single " + pluginClass.getName() + " was expected but got " + allPlugins);
  342. }
  343. return Optional.of(plugin);
  344. }
  345. private Settings createEnvironment(Environment initialEnvironment, NodeServiceProvider serviceProvider) {
  346. // Pass the node settings to the DeprecationLogger class so that it can have the deprecation.skip_deprecated_settings setting:
  347. Settings envSettings = initialEnvironment.settings();
  348. DeprecationLogger.initialize(envSettings);
  349. JvmInfo jvmInfo = JvmInfo.jvmInfo();
  350. logger.info(
  351. "version[{}], pid[{}], build[{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
  352. Build.current().qualifiedVersion(),
  353. jvmInfo.pid(),
  354. Build.current().type().displayName(),
  355. Build.current().hash(),
  356. Build.current().date(),
  357. Constants.OS_NAME,
  358. Constants.OS_VERSION,
  359. Constants.OS_ARCH,
  360. Constants.JVM_VENDOR,
  361. Constants.JVM_NAME,
  362. Constants.JAVA_VERSION,
  363. Constants.JVM_VERSION
  364. );
  365. logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
  366. logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
  367. logger.info("Default Locale [{}]", Locale.getDefault());
  368. if (Build.current().isProductionRelease() == false) {
  369. logger.warn(
  370. "version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
  371. Build.current().qualifiedVersion()
  372. );
  373. }
  374. if (Environment.PATH_SHARED_DATA_SETTING.exists(envSettings)) {
  375. // NOTE: this must be done with an explicit check here because the deprecation property on a path setting will
  376. // cause ES to fail to start since logging is not yet initialized on first read of the setting
  377. deprecationLogger.warn(
  378. DeprecationCategory.SETTINGS,
  379. "shared-data-path",
  380. "setting [path.shared_data] is deprecated and will be removed in a future release"
  381. );
  382. }
  383. if (initialEnvironment.dataFiles().length > 1) {
  384. // NOTE: we use initialEnvironment here, but assertEquivalent below ensures the data paths do not change
  385. deprecationLogger.warn(
  386. DeprecationCategory.SETTINGS,
  387. "multiple-data-paths",
  388. "Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing "
  389. + "multiple disks. This feature will be removed in a future release."
  390. );
  391. }
  392. if (Environment.dataPathUsesList(envSettings)) {
  393. // already checked for multiple values above, so if this is a list it is a single valued list
  394. deprecationLogger.warn(
  395. DeprecationCategory.SETTINGS,
  396. "multiple-data-paths-list",
  397. "Configuring [path.data] with a list is deprecated. Instead specify as a string value."
  398. );
  399. }
  400. if (logger.isDebugEnabled()) {
  401. logger.debug(
  402. "using config [{}], data [{}], logs [{}], plugins [{}]",
  403. initialEnvironment.configFile(),
  404. Arrays.toString(initialEnvironment.dataFiles()),
  405. initialEnvironment.logsFile(),
  406. initialEnvironment.pluginsFile()
  407. );
  408. }
  409. Node.deleteTemporaryApmConfig(
  410. jvmInfo,
  411. (e, apmConfig) -> logger.error("failed to delete temporary APM config file [{}], reason: [{}]", apmConfig, e.getMessage())
  412. );
  413. pluginsService = serviceProvider.newPluginService(initialEnvironment, envSettings);
  414. modules.bindToInstance(PluginsService.class, pluginsService);
  415. Settings settings = Node.mergePluginSettings(pluginsService.pluginMap(), envSettings);
  416. /*
  417. * Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
  418. * values, no matter they ask for them from.
  419. */
  420. environment = new Environment(settings, initialEnvironment.configFile());
  421. Environment.assertEquivalent(initialEnvironment, environment);
  422. modules.bindToInstance(Environment.class, environment);
  423. return settings;
  424. }
  425. private TelemetryProvider createTelemetryProvider(Settings settings) {
  426. return getSinglePlugin(TelemetryPlugin.class).map(p -> p.getTelemetryProvider(settings)).orElse(TelemetryProvider.NOOP);
  427. }
  428. private ThreadPool createThreadPool(Settings settings, MeterRegistry meterRegistry) throws IOException {
  429. ThreadPool threadPool = new ThreadPool(
  430. settings,
  431. meterRegistry,
  432. pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toArray(ExecutorBuilder<?>[]::new)
  433. );
  434. resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
  435. modules.bindToInstance(ThreadPool.class, threadPool);
  436. // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
  437. HeaderWarning.setThreadContext(threadPool.getThreadContext());
  438. resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
  439. return threadPool;
  440. }
  441. private SettingsModule validateSettings(Settings envSettings, Settings settings, ThreadPool threadPool) throws IOException {
  442. // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
  443. List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.flatMap(Plugin::getSettings).toList());
  444. for (final ExecutorBuilder<?> builder : threadPool.builders()) {
  445. additionalSettings.addAll(builder.getRegisteredSettings());
  446. }
  447. SettingsExtension.load().forEach(e -> additionalSettings.addAll(e.getSettings()));
  448. // this is as early as we can validate settings at this point. we already pass them to ThreadPool
  449. // so we might be late here already
  450. SettingsModule settingsModule = new SettingsModule(
  451. settings,
  452. additionalSettings,
  453. pluginsService.flatMap(Plugin::getSettingsFilter).toList()
  454. );
  455. modules.add(settingsModule);
  456. // creating `NodeEnvironment` breaks the ability to rollback to 7.x on an 8.0 upgrade (`upgradeLegacyNodeFolders`) so do this
  457. // after settings validation.
  458. nodeEnvironment = new NodeEnvironment(envSettings, environment);
  459. logger.info(
  460. "node name [{}], node ID [{}], cluster name [{}], roles {}",
  461. Node.NODE_NAME_SETTING.get(envSettings),
  462. nodeEnvironment.nodeId(),
  463. ClusterName.CLUSTER_NAME_SETTING.get(envSettings).value(),
  464. DiscoveryNode.getRolesFromSettings(settings)
  465. .stream()
  466. .map(DiscoveryNodeRole::roleName)
  467. .collect(Collectors.toCollection(LinkedHashSet::new))
  468. );
  469. resourcesToClose.add(nodeEnvironment);
  470. modules.bindToInstance(NodeEnvironment.class, nodeEnvironment);
  471. return settingsModule;
  472. }
  473. private SearchModule createSearchModule(Settings settings, ThreadPool threadPool) {
  474. IndexSearcher.setMaxClauseCount(SearchUtils.calculateMaxClauseValue(threadPool));
  475. return new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class).toList());
  476. }
  477. /**
  478. * Create various objects that are stored as member variables. This is so they are accessible as soon as possible.
  479. */
  480. private void createClientAndRegistries(Settings settings, ThreadPool threadPool, SearchModule searchModule) {
  481. client = new NodeClient(settings, threadPool);
  482. modules.add(b -> {
  483. b.bind(Client.class).toInstance(client);
  484. b.bind(NodeClient.class).toInstance(client);
  485. });
  486. localNodeFactory = new Node.LocalNodeFactory(settings, nodeEnvironment.nodeId());
  487. namedWriteableRegistry = new NamedWriteableRegistry(
  488. Stream.of(
  489. NetworkModule.getNamedWriteables().stream(),
  490. IndicesModule.getNamedWriteables().stream(),
  491. searchModule.getNamedWriteables().stream(),
  492. pluginsService.flatMap(Plugin::getNamedWriteables),
  493. ClusterModule.getNamedWriteables().stream(),
  494. SystemIndexMigrationExecutor.getNamedWriteables().stream()
  495. ).flatMap(Function.identity()).toList()
  496. );
  497. xContentRegistry = new NamedXContentRegistry(
  498. Stream.of(
  499. NetworkModule.getNamedXContents().stream(),
  500. IndicesModule.getNamedXContents().stream(),
  501. searchModule.getNamedXContents().stream(),
  502. pluginsService.flatMap(Plugin::getNamedXContent),
  503. ClusterModule.getNamedXWriteables().stream(),
  504. SystemIndexMigrationExecutor.getNamedXContentParsers().stream(),
  505. HealthNodeTaskExecutor.getNamedXContentParsers().stream()
  506. ).flatMap(Function.identity()).toList()
  507. );
  508. modules.add(b -> {
  509. b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
  510. b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
  511. });
  512. }
  513. private ScriptService createScriptService(SettingsModule settingsModule, ThreadPool threadPool, NodeServiceProvider serviceProvider) {
  514. Settings settings = settingsModule.getSettings();
  515. ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class).toList());
  516. ScriptService scriptService = serviceProvider.newScriptService(
  517. pluginsService,
  518. settings,
  519. scriptModule.engines,
  520. scriptModule.contexts,
  521. threadPool::absoluteTimeInMillis
  522. );
  523. ScriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
  524. modules.add(b -> {
  525. b.bind(ScriptService.class).toInstance(scriptService);
  526. b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
  527. });
  528. return scriptService;
  529. }
  530. private AnalysisRegistry createAnalysisRegistry() throws IOException {
  531. AnalysisRegistry registry = new AnalysisModule(
  532. environment,
  533. pluginsService.filterPlugins(AnalysisPlugin.class).toList(),
  534. pluginsService.getStablePluginRegistry()
  535. ).getAnalysisRegistry();
  536. modules.bindToInstance(AnalysisRegistry.class, registry);
  537. return registry;
  538. }
  539. private void construct(
  540. ThreadPool threadPool,
  541. SettingsModule settingsModule,
  542. SearchModule searchModule,
  543. ScriptService scriptService,
  544. AnalysisRegistry analysisRegistry,
  545. NodeServiceProvider serviceProvider,
  546. boolean forbidPrivateIndexSettings,
  547. TelemetryProvider telemetryProvider
  548. ) throws IOException {
  549. Settings settings = settingsModule.getSettings();
  550. modules.bindToInstance(Tracer.class, telemetryProvider.getTracer());
  551. TaskManager taskManager = new TaskManager(
  552. settings,
  553. threadPool,
  554. Stream.concat(
  555. pluginsService.filterPlugins(ActionPlugin.class).flatMap(p -> p.getTaskHeaders().stream()),
  556. Task.HEADERS_TO_COPY.stream()
  557. ).collect(Collectors.toSet()),
  558. telemetryProvider.getTracer()
  559. );
  560. final Tracer tracer = telemetryProvider.getTracer();
  561. ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
  562. clusterService.addStateApplier(scriptService);
  563. DocumentParsingProvider documentParsingProvider = getDocumentParsingSupplier();
  564. modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
  565. final IngestService ingestService = new IngestService(
  566. clusterService,
  567. threadPool,
  568. environment,
  569. scriptService,
  570. analysisRegistry,
  571. pluginsService.filterPlugins(IngestPlugin.class).toList(),
  572. client,
  573. IngestService.createGrokThreadWatchdog(environment, threadPool),
  574. documentParsingProvider
  575. );
  576. SystemIndices systemIndices = createSystemIndices(settings);
  577. final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
  578. final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
  579. final ClusterInfoService clusterInfoService = serviceProvider.newClusterInfoService(
  580. pluginsService,
  581. settings,
  582. clusterService,
  583. threadPool,
  584. client
  585. );
  586. final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
  587. settings,
  588. clusterService,
  589. repositoriesServiceReference::get,
  590. rerouteServiceReference::get
  591. );
  592. final ClusterModule clusterModule = new ClusterModule(
  593. settings,
  594. clusterService,
  595. pluginsService.filterPlugins(ClusterPlugin.class).toList(),
  596. clusterInfoService,
  597. snapshotsInfoService,
  598. threadPool,
  599. systemIndices,
  600. getWriteLoadForecaster(threadPool, settings, clusterService.getClusterSettings()),
  601. telemetryProvider
  602. );
  603. modules.add(clusterModule);
  604. RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
  605. rerouteServiceReference.set(rerouteService);
  606. clusterInfoService.addListener(
  607. new DiskThresholdMonitor(
  608. settings,
  609. clusterService::state,
  610. clusterService.getClusterSettings(),
  611. client,
  612. threadPool::relativeTimeInMillis,
  613. rerouteService
  614. )::onNewInfo
  615. );
  616. IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class).toList());
  617. modules.add(indicesModule);
  618. CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
  619. new CircuitBreakerMetrics(telemetryProvider),
  620. settingsModule.getSettings(),
  621. settingsModule.getClusterSettings()
  622. );
  623. modules.add(new GatewayModule());
  624. CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
  625. TransportVersion.current(),
  626. systemIndices.getMappingsVersions()
  627. );
  628. modules.add(loadPersistedClusterStateService(clusterService.getClusterSettings(), threadPool, compatibilityVersions));
  629. PageCacheRecycler pageCacheRecycler = serviceProvider.newPageCacheRecycler(pluginsService, settings);
  630. BigArrays bigArrays = serviceProvider.newBigArrays(pluginsService, pageCacheRecycler, circuitBreakerService);
  631. final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
  632. FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
  633. if (DiscoveryNode.isMasterNode(settings)) {
  634. clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
  635. clusterService.addListener(
  636. new TransportVersionsFixupListener(clusterService, client.admin().cluster(), featureService, threadPool)
  637. );
  638. }
  639. IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
  640. .pluginsService(pluginsService)
  641. .nodeEnvironment(nodeEnvironment)
  642. .xContentRegistry(xContentRegistry)
  643. .analysisRegistry(analysisRegistry)
  644. .indexNameExpressionResolver(clusterModule.getIndexNameExpressionResolver())
  645. .mapperRegistry(indicesModule.getMapperRegistry())
  646. .namedWriteableRegistry(namedWriteableRegistry)
  647. .threadPool(threadPool)
  648. .indexScopedSettings(settingsModule.getIndexScopedSettings())
  649. .circuitBreakerService(circuitBreakerService)
  650. .bigArrays(bigArrays)
  651. .scriptService(scriptService)
  652. .clusterService(clusterService)
  653. .client(client)
  654. .featureService(featureService)
  655. .metaStateService(metaStateService)
  656. .valuesSourceRegistry(searchModule.getValuesSourceRegistry())
  657. .requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
  658. .build();
  659. final var parameters = new IndexSettingProvider.Parameters(indicesService::createIndexMapperServiceForValidation);
  660. IndexSettingProviders indexSettingProviders = new IndexSettingProviders(
  661. pluginsService.flatMap(p -> p.getAdditionalIndexSettingProviders(parameters)).collect(Collectors.toSet())
  662. );
  663. final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
  664. final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
  665. settings,
  666. clusterService,
  667. indicesService,
  668. clusterModule.getAllocationService(),
  669. shardLimitValidator,
  670. environment,
  671. settingsModule.getIndexScopedSettings(),
  672. threadPool,
  673. xContentRegistry,
  674. systemIndices,
  675. forbidPrivateIndexSettings,
  676. indexSettingProviders
  677. );
  678. modules.bindToInstance(
  679. MetadataCreateDataStreamService.class,
  680. new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService)
  681. );
  682. modules.bindToInstance(MetadataDataStreamsService.class, new MetadataDataStreamsService(clusterService, indicesService));
  683. final MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(
  684. clusterService,
  685. clusterModule.getAllocationService(),
  686. settingsModule.getIndexScopedSettings(),
  687. indicesService,
  688. shardLimitValidator,
  689. threadPool
  690. );
  691. record PluginServiceInstances(
  692. Client client,
  693. ClusterService clusterService,
  694. RerouteService rerouteService,
  695. ThreadPool threadPool,
  696. ResourceWatcherService resourceWatcherService,
  697. ScriptService scriptService,
  698. NamedXContentRegistry xContentRegistry,
  699. Environment environment,
  700. NodeEnvironment nodeEnvironment,
  701. NamedWriteableRegistry namedWriteableRegistry,
  702. IndexNameExpressionResolver indexNameExpressionResolver,
  703. Supplier<RepositoriesService> repositoriesServiceSupplier,
  704. TelemetryProvider telemetryProvider,
  705. AllocationService allocationService,
  706. IndicesService indicesService,
  707. FeatureService featureService,
  708. SystemIndices systemIndices
  709. ) implements Plugin.PluginServices {}
  710. PluginServiceInstances pluginServices = new PluginServiceInstances(
  711. client,
  712. clusterService,
  713. rerouteService,
  714. threadPool,
  715. createResourceWatcherService(settings, threadPool),
  716. scriptService,
  717. xContentRegistry,
  718. environment,
  719. nodeEnvironment,
  720. namedWriteableRegistry,
  721. clusterModule.getIndexNameExpressionResolver(),
  722. repositoriesServiceReference::get,
  723. telemetryProvider,
  724. clusterModule.getAllocationService(),
  725. indicesService,
  726. featureService,
  727. systemIndices
  728. );
  729. Collection<?> pluginComponents = pluginsService.flatMap(p -> p.createComponents(pluginServices)).toList();
  730. var terminationHandlers = pluginsService.loadServiceProviders(TerminationHandlerProvider.class)
  731. .stream()
  732. .map(TerminationHandlerProvider::handler);
  733. terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null);
  734. ActionModule actionModule = new ActionModule(
  735. settings,
  736. clusterModule.getIndexNameExpressionResolver(),
  737. namedWriteableRegistry,
  738. settingsModule.getIndexScopedSettings(),
  739. settingsModule.getClusterSettings(),
  740. settingsModule.getSettingsFilter(),
  741. threadPool,
  742. pluginsService.filterPlugins(ActionPlugin.class).toList(),
  743. client,
  744. circuitBreakerService,
  745. createUsageService(),
  746. systemIndices,
  747. telemetryProvider.getTracer(),
  748. clusterService,
  749. rerouteService,
  750. buildReservedStateHandlers(
  751. settingsModule,
  752. clusterService,
  753. indicesService,
  754. systemIndices,
  755. indexSettingProviders,
  756. metadataCreateIndexService
  757. ),
  758. pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
  759. );
  760. modules.add(actionModule);
  761. final NetworkService networkService = new NetworkService(
  762. pluginsService.filterPlugins(DiscoveryPlugin.class)
  763. .map(d -> d.getCustomNameResolver(environment.settings()))
  764. .filter(Objects::nonNull)
  765. .toList()
  766. );
  767. final NetworkModule networkModule = new NetworkModule(
  768. settings,
  769. pluginsService.filterPlugins(NetworkPlugin.class).toList(),
  770. threadPool,
  771. bigArrays,
  772. pageCacheRecycler,
  773. circuitBreakerService,
  774. namedWriteableRegistry,
  775. xContentRegistry,
  776. networkService,
  777. actionModule.getRestController(),
  778. actionModule::copyRequestHeadersToThreadContext,
  779. clusterService.getClusterSettings(),
  780. telemetryProvider.getTracer()
  781. );
  782. var indexTemplateMetadataUpgraders = pluginsService.map(Plugin::getIndexTemplateMetadataUpgrader).toList();
  783. modules.bindToInstance(MetadataUpgrader.class, new MetadataUpgrader(indexTemplateMetadataUpgraders));
  784. final IndexMetadataVerifier indexMetadataVerifier = new IndexMetadataVerifier(
  785. settings,
  786. clusterService,
  787. xContentRegistry,
  788. indicesModule.getMapperRegistry(),
  789. settingsModule.getIndexScopedSettings(),
  790. scriptService
  791. );
  792. if (DiscoveryNode.isMasterNode(settings)) {
  793. clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
  794. clusterService.addListener(new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders));
  795. }
  796. final Transport transport = networkModule.getTransportSupplier().get();
  797. final TransportService transportService = serviceProvider.newTransportService(
  798. pluginsService,
  799. settings,
  800. transport,
  801. threadPool,
  802. networkModule.getTransportInterceptor(),
  803. localNodeFactory,
  804. settingsModule.getClusterSettings(),
  805. taskManager,
  806. telemetryProvider.getTracer()
  807. );
  808. final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
  809. final SearchTransportAPMMetrics searchTransportAPMMetrics = new SearchTransportAPMMetrics(telemetryProvider.getMeterRegistry());
  810. final SearchResponseMetrics searchResponseMetrics = new SearchResponseMetrics(telemetryProvider.getMeterRegistry());
  811. final SearchTransportService searchTransportService = new SearchTransportService(
  812. transportService,
  813. client,
  814. SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
  815. );
  816. final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
  817. final IndexingPressure indexingLimits = new IndexingPressure(settings);
  818. final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
  819. RepositoriesModule repositoriesModule = new RepositoriesModule(
  820. environment,
  821. pluginsService.filterPlugins(RepositoryPlugin.class).toList(),
  822. transportService,
  823. clusterService,
  824. bigArrays,
  825. xContentRegistry,
  826. recoverySettings,
  827. telemetryProvider
  828. );
  829. RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
  830. repositoriesServiceReference.set(repositoryService);
  831. SnapshotsService snapshotsService = new SnapshotsService(
  832. settings,
  833. clusterService,
  834. rerouteService,
  835. clusterModule.getIndexNameExpressionResolver(),
  836. repositoryService,
  837. transportService,
  838. actionModule.getActionFilters(),
  839. systemIndices
  840. );
  841. SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
  842. settings,
  843. clusterService,
  844. repositoryService,
  845. transportService,
  846. indicesService
  847. );
  848. actionModule.getReservedClusterStateService().installStateHandler(new ReservedRepositoryAction(repositoryService));
  849. actionModule.getReservedClusterStateService().installStateHandler(new ReservedPipelineAction());
  850. FileSettingsService fileSettingsService = new FileSettingsService(
  851. clusterService,
  852. actionModule.getReservedClusterStateService(),
  853. environment
  854. );
  855. RestoreService restoreService = new RestoreService(
  856. clusterService,
  857. repositoryService,
  858. clusterModule.getAllocationService(),
  859. metadataCreateIndexService,
  860. indexMetadataVerifier,
  861. shardLimitValidator,
  862. systemIndices,
  863. indicesService,
  864. fileSettingsService,
  865. threadPool
  866. );
  867. DiscoveryModule discoveryModule = createDiscoveryModule(
  868. settings,
  869. threadPool,
  870. transportService,
  871. networkService,
  872. clusterService,
  873. clusterModule.getAllocationService(),
  874. rerouteService,
  875. circuitBreakerService,
  876. compatibilityVersions,
  877. featureService
  878. );
  879. nodeService = new NodeService(
  880. settings,
  881. threadPool,
  882. new MonitorService(settings, nodeEnvironment, threadPool),
  883. discoveryModule.getCoordinator(),
  884. transportService,
  885. indicesService,
  886. pluginsService,
  887. circuitBreakerService,
  888. scriptService,
  889. httpServerTransport,
  890. ingestService,
  891. clusterService,
  892. settingsModule.getSettingsFilter(),
  893. responseCollectorService,
  894. searchTransportService,
  895. indexingLimits,
  896. searchModule.getValuesSourceRegistry().getUsageService(),
  897. repositoryService
  898. );
  899. final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10));
  900. final NodeMetrics nodeMetrics = new NodeMetrics(telemetryProvider.getMeterRegistry(), nodeService, metricsInterval);
  901. final SearchService searchService = serviceProvider.newSearchService(
  902. pluginsService,
  903. clusterService,
  904. indicesService,
  905. threadPool,
  906. scriptService,
  907. bigArrays,
  908. searchModule.getFetchPhase(),
  909. responseCollectorService,
  910. circuitBreakerService,
  911. systemIndices.getExecutorSelector(),
  912. telemetryProvider.getTracer()
  913. );
  914. modules.add(
  915. loadPersistentTasksService(
  916. settingsModule,
  917. clusterService,
  918. threadPool,
  919. systemIndices,
  920. featureService,
  921. clusterModule.getIndexNameExpressionResolver(),
  922. metadataUpdateSettingsService,
  923. metadataCreateIndexService
  924. )
  925. );
  926. modules.add(
  927. loadPluginShutdownService(clusterService),
  928. loadDiagnosticServices(
  929. settings,
  930. discoveryModule.getCoordinator(),
  931. clusterService,
  932. transportService,
  933. featureService,
  934. threadPool,
  935. telemetryProvider,
  936. repositoryService
  937. )
  938. );
  939. RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);
  940. modules.add(b -> {
  941. serviceProvider.processRecoverySettings(pluginsService, settingsModule.getClusterSettings(), recoverySettings);
  942. SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoryService);
  943. var peerRecovery = new PeerRecoverySourceService(
  944. transportService,
  945. indicesService,
  946. clusterService,
  947. recoverySettings,
  948. recoveryPlannerService
  949. );
  950. resourcesToClose.add(peerRecovery);
  951. b.bind(PeerRecoverySourceService.class).toInstance(peerRecovery);
  952. b.bind(PeerRecoveryTargetService.class)
  953. .toInstance(
  954. new PeerRecoveryTargetService(
  955. client,
  956. threadPool,
  957. transportService,
  958. recoverySettings,
  959. clusterService,
  960. snapshotFilesProvider
  961. )
  962. );
  963. });
  964. modules.add(loadPluginComponents(pluginComponents));
  965. modules.add(b -> {
  966. b.bind(NodeService.class).toInstance(nodeService);
  967. b.bind(BigArrays.class).toInstance(bigArrays);
  968. b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
  969. b.bind(IngestService.class).toInstance(ingestService);
  970. b.bind(IndexingPressure.class).toInstance(indexingLimits);
  971. b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
  972. b.bind(MetaStateService.class).toInstance(metaStateService);
  973. b.bind(IndicesService.class).toInstance(indicesService);
  974. b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
  975. b.bind(MetadataUpdateSettingsService.class).toInstance(metadataUpdateSettingsService);
  976. b.bind(SearchService.class).toInstance(searchService);
  977. b.bind(SearchTransportAPMMetrics.class).toInstance(searchTransportAPMMetrics);
  978. b.bind(SearchResponseMetrics.class).toInstance(searchResponseMetrics);
  979. b.bind(SearchTransportService.class).toInstance(searchTransportService);
  980. b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));
  981. b.bind(Transport.class).toInstance(transport);
  982. b.bind(TransportService.class).toInstance(transportService);
  983. b.bind(NodeMetrics.class).toInstance(nodeMetrics);
  984. b.bind(NetworkService.class).toInstance(networkService);
  985. b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier);
  986. b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
  987. b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
  988. b.bind(FeatureService.class).toInstance(featureService);
  989. b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
  990. b.bind(RepositoriesService.class).toInstance(repositoryService);
  991. b.bind(SnapshotsService.class).toInstance(snapshotsService);
  992. b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
  993. b.bind(RestoreService.class).toInstance(restoreService);
  994. b.bind(RerouteService.class).toInstance(rerouteService);
  995. b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
  996. b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders);
  997. b.bind(FileSettingsService.class).toInstance(fileSettingsService);
  998. b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
  999. });
  1000. if (ReadinessService.enabled(environment)) {
  1001. modules.bindToInstance(
  1002. ReadinessService.class,
  1003. serviceProvider.newReadinessService(pluginsService, clusterService, environment)
  1004. );
  1005. }
  1006. // Register noop versions of inference services if Inference plugin is not available
  1007. Optional<InferenceRegistryPlugin> inferenceRegistryPlugin = getSinglePlugin(InferenceRegistryPlugin.class);
  1008. modules.bindToInstance(
  1009. InferenceServiceRegistry.class,
  1010. inferenceRegistryPlugin.map(InferenceRegistryPlugin::getInferenceServiceRegistry)
  1011. .orElse(new InferenceServiceRegistry.NoopInferenceServiceRegistry())
  1012. );
  1013. modules.bindToInstance(
  1014. ModelRegistry.class,
  1015. inferenceRegistryPlugin.map(InferenceRegistryPlugin::getModelRegistry).orElse(new ModelRegistry.NoopModelRegistry())
  1016. );
  1017. injector = modules.createInjector();
  1018. postInjection(clusterModule, actionModule, clusterService, transportService, featureService);
  1019. }
  1020. private ClusterService createClusterService(SettingsModule settingsModule, ThreadPool threadPool, TaskManager taskManager) {
  1021. ClusterService clusterService = new ClusterService(
  1022. settingsModule.getSettings(),
  1023. settingsModule.getClusterSettings(),
  1024. threadPool,
  1025. taskManager
  1026. );
  1027. resourcesToClose.add(clusterService);
  1028. Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
  1029. if (consistentSettings.isEmpty() == false) {
  1030. clusterService.addLocalNodeMasterListener(
  1031. new ConsistentSettingsService(settingsModule.getSettings(), clusterService, consistentSettings).newHashPublisher()
  1032. );
  1033. }
  1034. return clusterService;
  1035. }
  1036. private UsageService createUsageService() {
  1037. UsageService usageService = new UsageService();
  1038. modules.bindToInstance(UsageService.class, usageService);
  1039. return usageService;
  1040. }
  1041. private SystemIndices createSystemIndices(Settings settings) {
  1042. List<SystemIndices.Feature> features = pluginsService.filterPlugins(SystemIndexPlugin.class).map(plugin -> {
  1043. SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName());
  1044. return SystemIndices.Feature.fromSystemIndexPlugin(plugin, settings);
  1045. }).toList();
  1046. SystemIndices systemIndices = new SystemIndices(features);
  1047. modules.add(b -> {
  1048. b.bind(SystemIndices.class).toInstance(systemIndices);
  1049. b.bind(ExecutorSelector.class).toInstance(systemIndices.getExecutorSelector());
  1050. });
  1051. return systemIndices;
  1052. }
  1053. private ResourceWatcherService createResourceWatcherService(Settings settings, ThreadPool threadPool) {
  1054. ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
  1055. resourcesToClose.add(resourceWatcherService);
  1056. modules.bindToInstance(ResourceWatcherService.class, resourceWatcherService);
  1057. return resourceWatcherService;
  1058. }
  1059. private Module loadPluginShutdownService(ClusterService clusterService) {
  1060. PluginShutdownService pluginShutdownService = new PluginShutdownService(
  1061. pluginsService.filterPlugins(ShutdownAwarePlugin.class).toList()
  1062. );
  1063. clusterService.addListener(pluginShutdownService);
  1064. return b -> b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
  1065. }
  1066. private Module loadDiagnosticServices(
  1067. Settings settings,
  1068. Coordinator coordinator,
  1069. ClusterService clusterService,
  1070. TransportService transportService,
  1071. FeatureService featureService,
  1072. ThreadPool threadPool,
  1073. TelemetryProvider telemetryProvider,
  1074. RepositoriesService repositoriesService
  1075. ) {
  1076. MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
  1077. CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
  1078. clusterService,
  1079. transportService,
  1080. coordinator,
  1081. masterHistoryService
  1082. );
  1083. var serverHealthIndicatorServices = Stream.of(
  1084. new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService),
  1085. new RepositoryIntegrityHealthIndicatorService(clusterService),
  1086. new DiskHealthIndicatorService(clusterService),
  1087. new ShardsCapacityHealthIndicatorService(clusterService)
  1088. );
  1089. var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
  1090. .flatMap(plugin -> plugin.getHealthIndicatorServices().stream());
  1091. HealthService healthService = new HealthService(
  1092. Stream.concat(serverHealthIndicatorServices, pluginHealthIndicatorServices).toList(),
  1093. threadPool
  1094. );
  1095. HealthPeriodicLogger healthPeriodicLogger = HealthPeriodicLogger.create(
  1096. settings,
  1097. clusterService,
  1098. client,
  1099. healthService,
  1100. telemetryProvider
  1101. );
  1102. HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, featureService, settings);
  1103. List<HealthTracker<?>> healthTrackers = List.of(
  1104. new DiskHealthTracker(nodeService, clusterService),
  1105. new RepositoriesHealthTracker(repositoriesService)
  1106. );
  1107. LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(
  1108. settings,
  1109. clusterService,
  1110. threadPool,
  1111. client,
  1112. featureService,
  1113. healthTrackers
  1114. );
  1115. HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
  1116. return b -> {
  1117. b.bind(HealthService.class).toInstance(healthService);
  1118. b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
  1119. b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
  1120. b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
  1121. b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
  1122. b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
  1123. b.bind(HealthApiStats.class).toInstance(new HealthApiStats());
  1124. b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
  1125. };
  1126. }
  1127. private Module loadPluginComponents(Collection<?> pluginComponents) {
  1128. List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().map(p -> {
  1129. if (p instanceof PluginComponentBinding<?, ?> pcb) {
  1130. return pcb.impl();
  1131. }
  1132. return p;
  1133. }).filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).toList();
  1134. resourcesToClose.addAll(pluginLifecycleComponents);
  1135. this.pluginLifecycleComponents = pluginLifecycleComponents;
  1136. List<ReloadablePlugin> reloadablePlugins = pluginsService.filterPlugins(ReloadablePlugin.class).toList();
  1137. pluginsService.filterPlugins(ReloadAwarePlugin.class).forEach(p -> p.setReloadCallback(wrapPlugins(reloadablePlugins)));
  1138. return b -> pluginComponents.forEach(p -> {
  1139. if (p instanceof PluginComponentBinding<?, ?> pcb) {
  1140. @SuppressWarnings("unchecked")
  1141. Class<Object> clazz = (Class<Object>) pcb.inter();
  1142. b.bind(clazz).toInstance(pcb.impl());
  1143. } else {
  1144. @SuppressWarnings("unchecked")
  1145. Class<Object> clazz = (Class<Object>) p.getClass();
  1146. b.bind(clazz).toInstance(p);
  1147. }
  1148. });
  1149. }
  1150. private void postInjection(
  1151. ClusterModule clusterModule,
  1152. ActionModule actionModule,
  1153. ClusterService clusterService,
  1154. TransportService transportService,
  1155. FeatureService featureService
  1156. ) {
  1157. // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
  1158. // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
  1159. // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
  1160. // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
  1161. // reroute, which needs to call into the allocation service. We close the loop here:
  1162. clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
  1163. // Due to Java's type erasure with generics, the injector can't give us exactly what we need, and we have
  1164. // to resort to some evil casting.
  1165. @SuppressWarnings("rawtypes")
  1166. Map<ActionType<? extends ActionResponse>, TransportAction<? extends ActionRequest, ? extends ActionResponse>> actions =
  1167. forciblyCast(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
  1168. }));
  1169. client.initialize(
  1170. actions,
  1171. transportService.getTaskManager(),
  1172. () -> clusterService.localNode().getId(),
  1173. transportService.getLocalNodeConnection(),
  1174. transportService.getRemoteClusterService()
  1175. );
  1176. logger.debug("initializing HTTP handlers ...");
  1177. actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered(), f -> {
  1178. ClusterState state = clusterService.state();
  1179. return state.clusterRecovered() && featureService.clusterHasFeature(state, f);
  1180. });
  1181. logger.info("initialized");
  1182. }
  1183. private DocumentParsingProvider getDocumentParsingSupplier() {
  1184. return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingSupplier)
  1185. .orElse(DocumentParsingProvider.EMPTY_INSTANCE);
  1186. }
  1187. /**
  1188. * Create and initialize a new {@link CircuitBreakerService} based on the settings provided.
  1189. *
  1190. * @see Node#BREAKER_TYPE_KEY
  1191. */
  1192. private CircuitBreakerService createCircuitBreakerService(
  1193. CircuitBreakerMetrics metrics,
  1194. Settings settings,
  1195. ClusterSettings clusterSettings
  1196. ) {
  1197. var pluginBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
  1198. .map(p -> Tuple.tuple(p, p.getCircuitBreaker(settings)))
  1199. .toList();
  1200. String type = Node.BREAKER_TYPE_KEY.get(settings);
  1201. CircuitBreakerService circuitBreakerService = switch (type) {
  1202. case "hierarchy" -> new HierarchyCircuitBreakerService(
  1203. metrics,
  1204. settings,
  1205. pluginBreakers.stream().map(Tuple::v2).toList(),
  1206. clusterSettings
  1207. );
  1208. case "none" -> new NoneCircuitBreakerService();
  1209. default -> throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
  1210. };
  1211. resourcesToClose.add(circuitBreakerService);
  1212. modules.bindToInstance(CircuitBreakerService.class, circuitBreakerService);
  1213. pluginBreakers.forEach(t -> {
  1214. final CircuitBreaker circuitBreaker = circuitBreakerService.getBreaker(t.v2().getName());
  1215. t.v1().setCircuitBreaker(circuitBreaker);
  1216. });
  1217. return circuitBreakerService;
  1218. }
  1219. /**
  1220. * Wrap a group of reloadable plugins into a single reloadable plugin interface
  1221. * @param reloadablePlugins A list of reloadable plugins
  1222. * @return A single ReloadablePlugin that, upon reload, reloads the plugins it wraps
  1223. */
  1224. private static ReloadablePlugin wrapPlugins(List<ReloadablePlugin> reloadablePlugins) {
  1225. return settings -> {
  1226. for (ReloadablePlugin plugin : reloadablePlugins) {
  1227. try {
  1228. plugin.reload(settings);
  1229. } catch (IOException e) {
  1230. throw new UncheckedIOException(e);
  1231. }
  1232. }
  1233. };
  1234. }
  1235. private RecoveryPlannerService getRecoveryPlannerService(
  1236. ThreadPool threadPool,
  1237. ClusterService clusterService,
  1238. RepositoriesService repositoryService
  1239. ) {
  1240. var recoveryPlannerServices = pluginsService.filterPlugins(RecoveryPlannerPlugin.class)
  1241. .map(
  1242. plugin -> plugin.createRecoveryPlannerService(
  1243. new ShardSnapshotsService(client, repositoryService, threadPool, clusterService)
  1244. )
  1245. )
  1246. .flatMap(Optional::stream);
  1247. return getSinglePlugin(recoveryPlannerServices, RecoveryPlannerService.class).orElseGet(PeerOnlyRecoveryPlannerService::new);
  1248. }
  1249. private WriteLoadForecaster getWriteLoadForecaster(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) {
  1250. var writeLoadForecasters = pluginsService.filterPlugins(ClusterPlugin.class)
  1251. .flatMap(clusterPlugin -> clusterPlugin.createWriteLoadForecasters(threadPool, settings, clusterSettings).stream());
  1252. WriteLoadForecaster forecaster = getSinglePlugin(writeLoadForecasters, WriteLoadForecaster.class).orElse(
  1253. WriteLoadForecaster.DEFAULT
  1254. );
  1255. modules.bindToInstance(WriteLoadForecaster.class, forecaster);
  1256. return forecaster;
  1257. }
  1258. private Module loadPersistedClusterStateService(
  1259. ClusterSettings clusterSettings,
  1260. ThreadPool threadPool,
  1261. CompatibilityVersions compatibilityVersions
  1262. ) {
  1263. var persistedClusterStateServiceFactories = pluginsService.filterPlugins(ClusterCoordinationPlugin.class)
  1264. .map(ClusterCoordinationPlugin::getPersistedClusterStateServiceFactory)
  1265. .flatMap(Optional::stream);
  1266. var service = getSinglePlugin(
  1267. persistedClusterStateServiceFactories,
  1268. ClusterCoordinationPlugin.PersistedClusterStateServiceFactory.class
  1269. ).map(f -> f.newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool, compatibilityVersions))
  1270. .orElseGet(
  1271. () -> new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis)
  1272. );
  1273. return b -> b.bind(PersistedClusterStateService.class).toInstance(service);
  1274. }
  1275. private List<ReservedClusterStateHandler<?>> buildReservedStateHandlers(
  1276. SettingsModule settingsModule,
  1277. ClusterService clusterService,
  1278. IndicesService indicesService,
  1279. SystemIndices systemIndices,
  1280. IndexSettingProviders indexSettingProviders,
  1281. MetadataCreateIndexService metadataCreateIndexService
  1282. ) {
  1283. List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();
  1284. // add all reserved state handlers from server
  1285. reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings()));
  1286. var templateService = new MetadataIndexTemplateService(
  1287. clusterService,
  1288. metadataCreateIndexService,
  1289. indicesService,
  1290. settingsModule.getIndexScopedSettings(),
  1291. xContentRegistry,
  1292. systemIndices,
  1293. indexSettingProviders
  1294. );
  1295. reservedStateHandlers.add(new ReservedComposableIndexTemplateAction(templateService, settingsModule.getIndexScopedSettings()));
  1296. // add all reserved state handlers from plugins
  1297. pluginsService.loadServiceProviders(ReservedClusterStateHandlerProvider.class)
  1298. .forEach(h -> reservedStateHandlers.addAll(h.handlers()));
  1299. return reservedStateHandlers;
  1300. }
  1301. private DiscoveryModule createDiscoveryModule(
  1302. Settings settings,
  1303. ThreadPool threadPool,
  1304. TransportService transportService,
  1305. NetworkService networkService,
  1306. ClusterService clusterService,
  1307. AllocationService allocationService,
  1308. RerouteService rerouteService,
  1309. CircuitBreakerService circuitBreakerService,
  1310. CompatibilityVersions compatibilityVersions,
  1311. FeatureService featureService
  1312. ) {
  1313. GatewayMetaState gatewayMetaState = new GatewayMetaState();
  1314. FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool, nodeEnvironment);
  1315. DiscoveryModule module = new DiscoveryModule(
  1316. settings,
  1317. transportService,
  1318. client,
  1319. namedWriteableRegistry,
  1320. networkService,
  1321. clusterService.getMasterService(),
  1322. clusterService.getClusterApplierService(),
  1323. clusterService.getClusterSettings(),
  1324. pluginsService.filterPlugins(DiscoveryPlugin.class).toList(),
  1325. pluginsService.filterPlugins(ClusterCoordinationPlugin.class).toList(),
  1326. allocationService,
  1327. environment.configFile(),
  1328. gatewayMetaState,
  1329. rerouteService,
  1330. fsHealthService,
  1331. circuitBreakerService,
  1332. compatibilityVersions,
  1333. featureService
  1334. );
  1335. modules.add(module, b -> {
  1336. b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
  1337. b.bind(FsHealthService.class).toInstance(fsHealthService);
  1338. });
  1339. return module;
  1340. }
  1341. private Module loadPersistentTasksService(
  1342. SettingsModule settingsModule,
  1343. ClusterService clusterService,
  1344. ThreadPool threadPool,
  1345. SystemIndices systemIndices,
  1346. FeatureService featureService,
  1347. IndexNameExpressionResolver indexNameExpressionResolver,
  1348. MetadataUpdateSettingsService metadataUpdateSettingsService,
  1349. MetadataCreateIndexService metadataCreateIndexService
  1350. ) {
  1351. PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
  1352. SystemIndexMigrationExecutor systemIndexMigrationExecutor = new SystemIndexMigrationExecutor(
  1353. client,
  1354. clusterService,
  1355. systemIndices,
  1356. metadataUpdateSettingsService,
  1357. metadataCreateIndexService,
  1358. settingsModule.getIndexScopedSettings()
  1359. );
  1360. HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNodeTaskExecutor.create(
  1361. clusterService,
  1362. persistentTasksService,
  1363. featureService,
  1364. settingsModule.getSettings(),
  1365. clusterService.getClusterSettings()
  1366. );
  1367. Stream<PersistentTasksExecutor<?>> builtinTaskExecutors = Stream.of(systemIndexMigrationExecutor, healthNodeTaskExecutor);
  1368. Stream<PersistentTasksExecutor<?>> pluginTaskExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
  1369. .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule, indexNameExpressionResolver))
  1370. .flatMap(List::stream);
  1371. PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(
  1372. Stream.concat(pluginTaskExecutors, builtinTaskExecutors).toList()
  1373. );
  1374. PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(
  1375. settingsModule.getSettings(),
  1376. registry,
  1377. clusterService,
  1378. threadPool
  1379. );
  1380. resourcesToClose.add(persistentTasksClusterService);
  1381. return b -> {
  1382. b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
  1383. b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
  1384. b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
  1385. b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
  1386. };
  1387. }
  1388. }