Node.java 99 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.node;
  9. import org.apache.logging.log4j.LogManager;
  10. import org.apache.logging.log4j.Logger;
  11. import org.apache.lucene.search.IndexSearcher;
  12. import org.apache.lucene.util.Constants;
  13. import org.apache.lucene.util.SetOnce;
  14. import org.elasticsearch.Build;
  15. import org.elasticsearch.ElasticsearchException;
  16. import org.elasticsearch.ElasticsearchTimeoutException;
  17. import org.elasticsearch.TransportVersion;
  18. import org.elasticsearch.Version;
  19. import org.elasticsearch.action.ActionModule;
  20. import org.elasticsearch.action.ActionRequest;
  21. import org.elasticsearch.action.ActionResponse;
  22. import org.elasticsearch.action.ActionType;
  23. import org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction;
  24. import org.elasticsearch.action.admin.indices.template.reservedstate.ReservedComposableIndexTemplateAction;
  25. import org.elasticsearch.action.ingest.ReservedPipelineAction;
  26. import org.elasticsearch.action.search.SearchExecutionStatsCollector;
  27. import org.elasticsearch.action.search.SearchPhaseController;
  28. import org.elasticsearch.action.search.SearchTransportService;
  29. import org.elasticsearch.action.support.TransportAction;
  30. import org.elasticsearch.action.update.UpdateHelper;
  31. import org.elasticsearch.bootstrap.BootstrapCheck;
  32. import org.elasticsearch.bootstrap.BootstrapContext;
  33. import org.elasticsearch.client.internal.Client;
  34. import org.elasticsearch.client.internal.node.NodeClient;
  35. import org.elasticsearch.cluster.ClusterInfoService;
  36. import org.elasticsearch.cluster.ClusterModule;
  37. import org.elasticsearch.cluster.ClusterName;
  38. import org.elasticsearch.cluster.ClusterState;
  39. import org.elasticsearch.cluster.ClusterStateObserver;
  40. import org.elasticsearch.cluster.InternalClusterInfoService;
  41. import org.elasticsearch.cluster.NodeConnectionsService;
  42. import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
  43. import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
  44. import org.elasticsearch.cluster.coordination.Coordinator;
  45. import org.elasticsearch.cluster.coordination.MasterHistoryService;
  46. import org.elasticsearch.cluster.coordination.Reconfigurator;
  47. import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
  48. import org.elasticsearch.cluster.desirednodes.DesiredNodesSettingsValidator;
  49. import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
  50. import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
  51. import org.elasticsearch.cluster.metadata.Metadata;
  52. import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
  53. import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
  54. import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
  55. import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
  56. import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
  57. import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
  58. import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
  59. import org.elasticsearch.cluster.node.DiscoveryNode;
  60. import org.elasticsearch.cluster.node.DiscoveryNodeRole;
  61. import org.elasticsearch.cluster.routing.BatchedRerouteService;
  62. import org.elasticsearch.cluster.routing.RerouteService;
  63. import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
  64. import org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService;
  65. import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
  66. import org.elasticsearch.cluster.service.ClusterService;
  67. import org.elasticsearch.cluster.service.TransportVersionsFixupListener;
  68. import org.elasticsearch.cluster.version.CompatibilityVersions;
  69. import org.elasticsearch.common.StopWatch;
  70. import org.elasticsearch.common.breaker.CircuitBreaker;
  71. import org.elasticsearch.common.component.Lifecycle;
  72. import org.elasticsearch.common.component.LifecycleComponent;
  73. import org.elasticsearch.common.inject.Injector;
  74. import org.elasticsearch.common.inject.Key;
  75. import org.elasticsearch.common.inject.ModulesBuilder;
  76. import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
  77. import org.elasticsearch.common.logging.DeprecationCategory;
  78. import org.elasticsearch.common.logging.DeprecationLogger;
  79. import org.elasticsearch.common.logging.HeaderWarning;
  80. import org.elasticsearch.common.logging.NodeAndClusterIdStateListener;
  81. import org.elasticsearch.common.network.NetworkAddress;
  82. import org.elasticsearch.common.network.NetworkModule;
  83. import org.elasticsearch.common.network.NetworkService;
  84. import org.elasticsearch.common.settings.ClusterSettings;
  85. import org.elasticsearch.common.settings.ConsistentSettingsService;
  86. import org.elasticsearch.common.settings.Setting;
  87. import org.elasticsearch.common.settings.Setting.Property;
  88. import org.elasticsearch.common.settings.Settings;
  89. import org.elasticsearch.common.settings.SettingsModule;
  90. import org.elasticsearch.common.transport.BoundTransportAddress;
  91. import org.elasticsearch.common.transport.TransportAddress;
  92. import org.elasticsearch.common.util.BigArrays;
  93. import org.elasticsearch.common.util.PageCacheRecycler;
  94. import org.elasticsearch.core.Assertions;
  95. import org.elasticsearch.core.IOUtils;
  96. import org.elasticsearch.core.PathUtils;
  97. import org.elasticsearch.core.Releasables;
  98. import org.elasticsearch.core.Strings;
  99. import org.elasticsearch.core.SuppressForbidden;
  100. import org.elasticsearch.core.TimeValue;
  101. import org.elasticsearch.discovery.DiscoveryModule;
  102. import org.elasticsearch.env.Environment;
  103. import org.elasticsearch.env.NodeEnvironment;
  104. import org.elasticsearch.env.NodeMetadata;
  105. import org.elasticsearch.gateway.GatewayAllocator;
  106. import org.elasticsearch.gateway.GatewayMetaState;
  107. import org.elasticsearch.gateway.GatewayModule;
  108. import org.elasticsearch.gateway.GatewayService;
  109. import org.elasticsearch.gateway.MetaStateService;
  110. import org.elasticsearch.gateway.PersistedClusterStateService;
  111. import org.elasticsearch.health.HealthPeriodicLogger;
  112. import org.elasticsearch.health.HealthService;
  113. import org.elasticsearch.health.metadata.HealthMetadataService;
  114. import org.elasticsearch.health.node.DiskHealthIndicatorService;
  115. import org.elasticsearch.health.node.HealthInfoCache;
  116. import org.elasticsearch.health.node.LocalHealthMonitor;
  117. import org.elasticsearch.health.node.ShardsCapacityHealthIndicatorService;
  118. import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
  119. import org.elasticsearch.health.stats.HealthApiStats;
  120. import org.elasticsearch.http.HttpServerTransport;
  121. import org.elasticsearch.index.IndexSettingProvider;
  122. import org.elasticsearch.index.IndexSettingProviders;
  123. import org.elasticsearch.index.IndexSettings;
  124. import org.elasticsearch.index.IndexingPressure;
  125. import org.elasticsearch.index.analysis.AnalysisRegistry;
  126. import org.elasticsearch.index.engine.EngineFactory;
  127. import org.elasticsearch.indices.ExecutorSelector;
  128. import org.elasticsearch.indices.IndicesModule;
  129. import org.elasticsearch.indices.IndicesService;
  130. import org.elasticsearch.indices.ShardLimitValidator;
  131. import org.elasticsearch.indices.SystemIndexMappingUpdateService;
  132. import org.elasticsearch.indices.SystemIndices;
  133. import org.elasticsearch.indices.analysis.AnalysisModule;
  134. import org.elasticsearch.indices.breaker.BreakerSettings;
  135. import org.elasticsearch.indices.breaker.CircuitBreakerService;
  136. import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
  137. import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
  138. import org.elasticsearch.indices.cluster.IndicesClusterStateService;
  139. import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
  140. import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
  141. import org.elasticsearch.indices.recovery.RecoverySettings;
  142. import org.elasticsearch.indices.recovery.SnapshotFilesProvider;
  143. import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService;
  144. import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
  145. import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
  146. import org.elasticsearch.indices.store.IndicesStore;
  147. import org.elasticsearch.ingest.IngestService;
  148. import org.elasticsearch.monitor.MonitorService;
  149. import org.elasticsearch.monitor.fs.FsHealthService;
  150. import org.elasticsearch.monitor.jvm.JvmInfo;
  151. import org.elasticsearch.node.internal.TerminationHandler;
  152. import org.elasticsearch.node.internal.TerminationHandlerProvider;
  153. import org.elasticsearch.persistent.PersistentTasksClusterService;
  154. import org.elasticsearch.persistent.PersistentTasksExecutor;
  155. import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
  156. import org.elasticsearch.persistent.PersistentTasksService;
  157. import org.elasticsearch.plugins.ActionPlugin;
  158. import org.elasticsearch.plugins.AnalysisPlugin;
  159. import org.elasticsearch.plugins.CircuitBreakerPlugin;
  160. import org.elasticsearch.plugins.ClusterCoordinationPlugin;
  161. import org.elasticsearch.plugins.ClusterPlugin;
  162. import org.elasticsearch.plugins.DiscoveryPlugin;
  163. import org.elasticsearch.plugins.EnginePlugin;
  164. import org.elasticsearch.plugins.HealthPlugin;
  165. import org.elasticsearch.plugins.IndexStorePlugin;
  166. import org.elasticsearch.plugins.IngestPlugin;
  167. import org.elasticsearch.plugins.MapperPlugin;
  168. import org.elasticsearch.plugins.MetadataUpgrader;
  169. import org.elasticsearch.plugins.NetworkPlugin;
  170. import org.elasticsearch.plugins.PersistentTaskPlugin;
  171. import org.elasticsearch.plugins.Plugin;
  172. import org.elasticsearch.plugins.PluginsService;
  173. import org.elasticsearch.plugins.RecoveryPlannerPlugin;
  174. import org.elasticsearch.plugins.ReloadablePlugin;
  175. import org.elasticsearch.plugins.RepositoryPlugin;
  176. import org.elasticsearch.plugins.ScriptPlugin;
  177. import org.elasticsearch.plugins.SearchPlugin;
  178. import org.elasticsearch.plugins.ShutdownAwarePlugin;
  179. import org.elasticsearch.plugins.SystemIndexPlugin;
  180. import org.elasticsearch.plugins.TracerPlugin;
  181. import org.elasticsearch.plugins.internal.DocumentParsingObserver;
  182. import org.elasticsearch.plugins.internal.DocumentParsingObserverPlugin;
  183. import org.elasticsearch.plugins.internal.ReloadAwarePlugin;
  184. import org.elasticsearch.plugins.internal.RestExtension;
  185. import org.elasticsearch.plugins.internal.SettingsExtension;
  186. import org.elasticsearch.readiness.ReadinessService;
  187. import org.elasticsearch.repositories.RepositoriesModule;
  188. import org.elasticsearch.repositories.RepositoriesService;
  189. import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
  190. import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
  191. import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
  192. import org.elasticsearch.reservedstate.service.FileSettingsService;
  193. import org.elasticsearch.rest.RestController;
  194. import org.elasticsearch.script.ScriptContext;
  195. import org.elasticsearch.script.ScriptEngine;
  196. import org.elasticsearch.script.ScriptModule;
  197. import org.elasticsearch.script.ScriptService;
  198. import org.elasticsearch.search.SearchModule;
  199. import org.elasticsearch.search.SearchService;
  200. import org.elasticsearch.search.SearchUtils;
  201. import org.elasticsearch.search.aggregations.support.AggregationUsageService;
  202. import org.elasticsearch.search.fetch.FetchPhase;
  203. import org.elasticsearch.shutdown.PluginShutdownService;
  204. import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
  205. import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
  206. import org.elasticsearch.snapshots.RestoreService;
  207. import org.elasticsearch.snapshots.SnapshotShardsService;
  208. import org.elasticsearch.snapshots.SnapshotsInfoService;
  209. import org.elasticsearch.snapshots.SnapshotsService;
  210. import org.elasticsearch.tasks.Task;
  211. import org.elasticsearch.tasks.TaskCancellationService;
  212. import org.elasticsearch.tasks.TaskManager;
  213. import org.elasticsearch.tasks.TaskResultsService;
  214. import org.elasticsearch.telemetry.tracing.Tracer;
  215. import org.elasticsearch.threadpool.ExecutorBuilder;
  216. import org.elasticsearch.threadpool.ThreadPool;
  217. import org.elasticsearch.transport.RemoteClusterPortSettings;
  218. import org.elasticsearch.transport.Transport;
  219. import org.elasticsearch.transport.TransportInterceptor;
  220. import org.elasticsearch.transport.TransportService;
  221. import org.elasticsearch.upgrades.SystemIndexMigrationExecutor;
  222. import org.elasticsearch.usage.UsageService;
  223. import org.elasticsearch.watcher.ResourceWatcherService;
  224. import org.elasticsearch.xcontent.NamedXContentRegistry;
  225. import java.io.BufferedWriter;
  226. import java.io.Closeable;
  227. import java.io.File;
  228. import java.io.IOException;
  229. import java.net.InetAddress;
  230. import java.net.InetSocketAddress;
  231. import java.nio.charset.Charset;
  232. import java.nio.file.Files;
  233. import java.nio.file.Path;
  234. import java.nio.file.StandardCopyOption;
  235. import java.util.ArrayList;
  236. import java.util.Arrays;
  237. import java.util.Collection;
  238. import java.util.Collections;
  239. import java.util.HashMap;
  240. import java.util.LinkedHashSet;
  241. import java.util.List;
  242. import java.util.Map;
  243. import java.util.Optional;
  244. import java.util.Set;
  245. import java.util.concurrent.CountDownLatch;
  246. import java.util.concurrent.FutureTask;
  247. import java.util.concurrent.TimeUnit;
  248. import java.util.function.BiConsumer;
  249. import java.util.function.Function;
  250. import java.util.function.LongSupplier;
  251. import java.util.function.Supplier;
  252. import java.util.function.UnaryOperator;
  253. import java.util.stream.Collectors;
  254. import java.util.stream.Stream;
  255. import javax.net.ssl.SNIHostName;
  256. import static java.util.stream.Collectors.toList;
  257. import static org.elasticsearch.common.util.CollectionUtils.concatLists;
  258. import static org.elasticsearch.core.Types.forciblyCast;
  259. /**
  260. * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
  261. * in order to use a {@link Client} to perform actions/operations against the cluster.
  262. */
  263. public class Node implements Closeable {
  264. public static final Setting<Boolean> WRITE_PORTS_FILE_SETTING = Setting.boolSetting("node.portsfile", false, Property.NodeScope);
  265. public static final Setting<String> NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope);
  266. public static final Setting<String> NODE_EXTERNAL_ID_SETTING = Setting.simpleString(
  267. "node.external_id",
  268. NODE_NAME_SETTING,
  269. Property.NodeScope
  270. );
  271. public static final Setting.AffixSetting<String> NODE_ATTRIBUTES = Setting.prefixKeySetting(
  272. "node.attr.",
  273. (key) -> new Setting<>(key, "", (value) -> {
  274. if (value.length() > 0
  275. && (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(value.length() - 1)))) {
  276. throw new IllegalArgumentException(key + " cannot have leading or trailing whitespace [" + value + "]");
  277. }
  278. if (value.length() > 0 && "node.attr.server_name".equals(key)) {
  279. try {
  280. new SNIHostName(value);
  281. } catch (IllegalArgumentException e) {
  282. throw new IllegalArgumentException("invalid node.attr.server_name [" + value + "]", e);
  283. }
  284. }
  285. return value;
  286. }, Property.NodeScope)
  287. );
  288. public static final Setting<String> BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> {
  289. return switch (s) {
  290. case "hierarchy", "none" -> s;
  291. default -> throw new IllegalArgumentException("indices.breaker.type must be one of [hierarchy, none] but was: " + s);
  292. };
  293. }, Setting.Property.NodeScope);
  294. public static final Setting<TimeValue> INITIAL_STATE_TIMEOUT_SETTING = Setting.positiveTimeSetting(
  295. "discovery.initial_state_timeout",
  296. TimeValue.timeValueSeconds(30),
  297. Property.NodeScope
  298. );
  299. private static final String CLIENT_TYPE = "node";
  300. private final Lifecycle lifecycle = new Lifecycle();
  301. /**
  302. * This logger instance is an instance field as opposed to a static field. This ensures that the field is not
  303. * initialized until an instance of Node is constructed, which is sure to happen after the logging infrastructure
  304. * has been initialized to include the hostname. If this field were static, then it would be initialized when the
  305. * class initializer runs. Alas, this happens too early, before logging is initialized as this class is referred to
  306. * in InternalSettingsPreparer#finalizeSettings, which runs when creating the Environment, before logging is
  307. * initialized.
  308. */
  309. private final Logger logger = LogManager.getLogger(Node.class);
  310. private final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(Node.class);
  311. private final Injector injector;
  312. private final Environment environment;
  313. private final NodeEnvironment nodeEnvironment;
  314. private final PluginsService pluginsService;
  315. private final NodeClient client;
  316. private final Collection<LifecycleComponent> pluginLifecycleComponents;
  317. private final LocalNodeFactory localNodeFactory;
  318. private final NodeService nodeService;
  319. private final SetOnce<TerminationHandler> terminationHandler = new SetOnce<>();
  320. // for testing
  321. final NamedWriteableRegistry namedWriteableRegistry;
  322. final NamedXContentRegistry namedXContentRegistry;
  323. /**
  324. * Constructs a node
  325. *
  326. * @param environment the initial environment for this node, which will be added to by plugins
  327. */
  328. public Node(Environment environment) {
  329. this(environment, PluginsService.getPluginsServiceCtor(environment), true);
  330. }
  331. /**
  332. * Constructs a node
  333. *
  334. * @param initialEnvironment the initial environment for this node, which will be added to by plugins
  335. * @param pluginServiceCtor a function that takes a {@link Settings} object and returns a {@link PluginsService}
  336. * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
  337. * test framework for tests that rely on being able to set private settings
  338. */
  339. protected Node(
  340. final Environment initialEnvironment,
  341. final Function<Settings, PluginsService> pluginServiceCtor,
  342. boolean forbidPrivateIndexSettings
  343. ) {
  344. final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
  345. boolean success = false;
  346. try {
  347. // Pass the node settings to the DeprecationLogger class so that it can have the deprecation.skip_deprecated_settings setting:
  348. DeprecationLogger.initialize(initialEnvironment.settings());
  349. Settings tmpSettings = Settings.builder()
  350. .put(initialEnvironment.settings())
  351. .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE)
  352. .build();
  353. final JvmInfo jvmInfo = JvmInfo.jvmInfo();
  354. logger.info(
  355. "version[{}], pid[{}], build[{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
  356. Build.current().qualifiedVersion(),
  357. jvmInfo.pid(),
  358. Build.current().type().displayName(),
  359. Build.current().hash(),
  360. Build.current().date(),
  361. Constants.OS_NAME,
  362. Constants.OS_VERSION,
  363. Constants.OS_ARCH,
  364. Constants.JVM_VENDOR,
  365. Constants.JVM_NAME,
  366. Constants.JAVA_VERSION,
  367. Constants.JVM_VERSION
  368. );
  369. logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
  370. logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
  371. if (Build.current().isProductionRelease() == false) {
  372. logger.warn(
  373. "version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
  374. Build.current().qualifiedVersion()
  375. );
  376. }
  377. if (Environment.PATH_SHARED_DATA_SETTING.exists(tmpSettings)) {
  378. // NOTE: this must be done with an explicit check here because the deprecation property on a path setting will
  379. // cause ES to fail to start since logging is not yet initialized on first read of the setting
  380. deprecationLogger.warn(
  381. DeprecationCategory.SETTINGS,
  382. "shared-data-path",
  383. "setting [path.shared_data] is deprecated and will be removed in a future release"
  384. );
  385. }
  386. if (initialEnvironment.dataFiles().length > 1) {
  387. // NOTE: we use initialEnvironment here, but assertEquivalent below ensures the data paths do not change
  388. deprecationLogger.warn(
  389. DeprecationCategory.SETTINGS,
  390. "multiple-data-paths",
  391. "Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing "
  392. + "multiple disks. This feature will be removed in a future release."
  393. );
  394. }
  395. if (Environment.dataPathUsesList(tmpSettings)) {
  396. // already checked for multiple values above, so if this is a list it is a single valued list
  397. deprecationLogger.warn(
  398. DeprecationCategory.SETTINGS,
  399. "multiple-data-paths-list",
  400. "Configuring [path.data] with a list is deprecated. Instead specify as a string value."
  401. );
  402. }
  403. if (logger.isDebugEnabled()) {
  404. logger.debug(
  405. "using config [{}], data [{}], logs [{}], plugins [{}]",
  406. initialEnvironment.configFile(),
  407. Arrays.toString(initialEnvironment.dataFiles()),
  408. initialEnvironment.logsFile(),
  409. initialEnvironment.pluginsFile()
  410. );
  411. }
  412. deleteTemporaryApmConfig(
  413. jvmInfo,
  414. (e, apmConfig) -> logger.error("failed to delete temporary APM config file [{}], reason: [{}]", apmConfig, e.getMessage())
  415. );
  416. this.pluginsService = pluginServiceCtor.apply(tmpSettings);
  417. final Settings settings = mergePluginSettings(pluginsService.pluginMap(), tmpSettings);
  418. /*
  419. * Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
  420. * values, no matter they ask for them from.
  421. */
  422. this.environment = new Environment(settings, initialEnvironment.configFile());
  423. Environment.assertEquivalent(initialEnvironment, this.environment);
  424. final List<ExecutorBuilder<?>> executorBuilders = pluginsService.flatMap(p -> p.getExecutorBuilders(settings)).toList();
  425. final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder<?>[0]));
  426. resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
  427. final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
  428. resourcesToClose.add(resourceWatcherService);
  429. // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
  430. HeaderWarning.setThreadContext(threadPool.getThreadContext());
  431. resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
  432. final Set<String> taskHeaders = Stream.concat(
  433. pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
  434. Task.HEADERS_TO_COPY.stream()
  435. ).collect(Collectors.toSet());
  436. final Tracer tracer = getTracer(pluginsService, settings);
  437. final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders, tracer);
  438. // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
  439. final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.flatMap(Plugin::getSettings).toList());
  440. for (final ExecutorBuilder<?> builder : threadPool.builders()) {
  441. additionalSettings.addAll(builder.getRegisteredSettings());
  442. }
  443. SettingsExtension.load().forEach(e -> additionalSettings.addAll(e.getSettings()));
  444. client = new NodeClient(settings, threadPool);
  445. final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
  446. final ScriptService scriptService = newScriptService(
  447. settings,
  448. scriptModule.engines,
  449. scriptModule.contexts,
  450. threadPool::absoluteTimeInMillis
  451. );
  452. AnalysisModule analysisModule = new AnalysisModule(
  453. this.environment,
  454. pluginsService.filterPlugins(AnalysisPlugin.class),
  455. pluginsService.getStablePluginRegistry()
  456. );
  457. // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
  458. // so we might be late here already
  459. final SettingsModule settingsModule = new SettingsModule(
  460. settings,
  461. additionalSettings,
  462. pluginsService.flatMap(Plugin::getSettingsFilter).toList()
  463. );
  464. // creating `NodeEnvironment` breaks the ability to rollback to 7.x on an 8.0 upgrade (`upgradeLegacyNodeFolders`) so do this
  465. // after settings validation.
  466. nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
  467. logger.info(
  468. "node name [{}], node ID [{}], cluster name [{}], roles {}",
  469. NODE_NAME_SETTING.get(tmpSettings),
  470. nodeEnvironment.nodeId(),
  471. ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
  472. DiscoveryNode.getRolesFromSettings(settings)
  473. .stream()
  474. .map(DiscoveryNodeRole::roleName)
  475. .collect(Collectors.toCollection(LinkedHashSet::new))
  476. );
  477. resourcesToClose.add(nodeEnvironment);
  478. localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
  479. ScriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
  480. final NetworkService networkService = new NetworkService(
  481. getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
  482. );
  483. List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
  484. final ClusterService clusterService = new ClusterService(
  485. settings,
  486. settingsModule.getClusterSettings(),
  487. threadPool,
  488. taskManager
  489. );
  490. clusterService.addStateApplier(scriptService);
  491. resourcesToClose.add(clusterService);
  492. final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
  493. if (consistentSettings.isEmpty() == false) {
  494. clusterService.addLocalNodeMasterListener(
  495. new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
  496. );
  497. }
  498. Supplier<DocumentParsingObserver> documentParsingObserverSupplier = getDocumentParsingObserverSupplier();
  499. final IngestService ingestService = new IngestService(
  500. clusterService,
  501. threadPool,
  502. this.environment,
  503. scriptService,
  504. analysisModule.getAnalysisRegistry(),
  505. pluginsService.filterPlugins(IngestPlugin.class),
  506. client,
  507. IngestService.createGrokThreadWatchdog(this.environment, threadPool),
  508. documentParsingObserverSupplier
  509. );
  510. final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
  511. final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
  512. final UsageService usageService = new UsageService();
  513. SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));
  514. IndexSearcher.setMaxClauseCount(SearchUtils.calculateMaxClauseValue(threadPool));
  515. List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
  516. NetworkModule.getNamedWriteables().stream(),
  517. IndicesModule.getNamedWriteables().stream(),
  518. searchModule.getNamedWriteables().stream(),
  519. pluginsService.flatMap(Plugin::getNamedWriteables),
  520. ClusterModule.getNamedWriteables().stream(),
  521. SystemIndexMigrationExecutor.getNamedWriteables().stream()
  522. ).flatMap(Function.identity()).toList();
  523. final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
  524. NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(
  525. Stream.of(
  526. NetworkModule.getNamedXContents().stream(),
  527. IndicesModule.getNamedXContents().stream(),
  528. searchModule.getNamedXContents().stream(),
  529. pluginsService.flatMap(Plugin::getNamedXContent),
  530. ClusterModule.getNamedXWriteables().stream(),
  531. SystemIndexMigrationExecutor.getNamedXContentParsers().stream(),
  532. HealthNodeTaskExecutor.getNamedXContentParsers().stream()
  533. ).flatMap(Function.identity()).collect(toList())
  534. );
  535. final List<SystemIndices.Feature> features = pluginsService.filterPlugins(SystemIndexPlugin.class).stream().map(plugin -> {
  536. SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName());
  537. return SystemIndices.Feature.fromSystemIndexPlugin(plugin, settings);
  538. }).toList();
  539. final SystemIndices systemIndices = new SystemIndices(features);
  540. final ExecutorSelector executorSelector = systemIndices.getExecutorSelector();
  541. ModulesBuilder modules = new ModulesBuilder();
  542. final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
  543. final FsHealthService fsHealthService = new FsHealthService(
  544. settings,
  545. clusterService.getClusterSettings(),
  546. threadPool,
  547. nodeEnvironment
  548. );
  549. final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
  550. final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(
  551. settings,
  552. clusterService,
  553. repositoriesServiceReference::get,
  554. rerouteServiceReference::get
  555. );
  556. final WriteLoadForecaster writeLoadForecaster = getWriteLoadForecaster(
  557. threadPool,
  558. settings,
  559. clusterService.getClusterSettings()
  560. );
  561. final ClusterModule clusterModule = new ClusterModule(
  562. settings,
  563. clusterService,
  564. clusterPlugins,
  565. clusterInfoService,
  566. snapshotsInfoService,
  567. threadPool,
  568. systemIndices,
  569. writeLoadForecaster
  570. );
  571. modules.add(clusterModule);
  572. IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
  573. modules.add(indicesModule);
  574. List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
  575. .stream()
  576. .map(plugin -> plugin.getCircuitBreaker(settings))
  577. .toList();
  578. final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(
  579. settingsModule.getSettings(),
  580. pluginCircuitBreakers,
  581. settingsModule.getClusterSettings()
  582. );
  583. pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
  584. CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
  585. plugin.setCircuitBreaker(breaker);
  586. });
  587. resourcesToClose.add(circuitBreakerService);
  588. modules.add(new GatewayModule());
  589. CompatibilityVersions compatibilityVersions = new CompatibilityVersions(
  590. TransportVersion.current(),
  591. systemIndices.getMappingsVersions()
  592. );
  593. PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
  594. BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
  595. modules.add(settingsModule);
  596. final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
  597. final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(
  598. xContentRegistry,
  599. clusterService.getClusterSettings(),
  600. threadPool,
  601. compatibilityVersions
  602. );
  603. // collect engine factory providers from plugins
  604. final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
  605. final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders = enginePlugins.stream()
  606. .map(plugin -> (Function<IndexSettings, Optional<EngineFactory>>) plugin::getEngineFactory)
  607. .toList();
  608. final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = pluginsService.filterPlugins(IndexStorePlugin.class)
  609. .stream()
  610. .map(IndexStorePlugin::getDirectoryFactories)
  611. .flatMap(m -> m.entrySet().stream())
  612. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  613. final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = pluginsService.filterPlugins(
  614. IndexStorePlugin.class
  615. )
  616. .stream()
  617. .map(IndexStorePlugin::getRecoveryStateFactories)
  618. .flatMap(m -> m.entrySet().stream())
  619. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  620. final List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners = pluginsService.filterPlugins(
  621. IndexStorePlugin.class
  622. ).stream().map(IndexStorePlugin::getIndexFoldersDeletionListeners).flatMap(List::stream).toList();
  623. final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers = pluginsService.filterPlugins(
  624. IndexStorePlugin.class
  625. )
  626. .stream()
  627. .map(IndexStorePlugin::getSnapshotCommitSuppliers)
  628. .flatMap(m -> m.entrySet().stream())
  629. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  630. if (DiscoveryNode.isMasterNode(settings)) {
  631. clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
  632. clusterService.addListener(new TransportVersionsFixupListener(clusterService, client.admin().cluster(), threadPool));
  633. }
  634. final RerouteService rerouteService = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
  635. rerouteServiceReference.set(rerouteService);
  636. clusterService.setRerouteService(rerouteService);
  637. final IndicesService indicesService = new IndicesService(
  638. settings,
  639. pluginsService,
  640. nodeEnvironment,
  641. xContentRegistry,
  642. analysisModule.getAnalysisRegistry(),
  643. clusterModule.getIndexNameExpressionResolver(),
  644. indicesModule.getMapperRegistry(),
  645. namedWriteableRegistry,
  646. threadPool,
  647. settingsModule.getIndexScopedSettings(),
  648. circuitBreakerService,
  649. bigArrays,
  650. scriptService,
  651. clusterService,
  652. client,
  653. metaStateService,
  654. engineFactoryProviders,
  655. indexStoreFactories,
  656. searchModule.getValuesSourceRegistry(),
  657. recoveryStateFactories,
  658. indexFoldersDeletionListeners,
  659. snapshotCommitSuppliers,
  660. searchModule.getRequestCacheKeyDifferentiator(),
  661. documentParsingObserverSupplier
  662. );
  663. final var parameters = new IndexSettingProvider.Parameters(indicesService::createIndexMapperServiceForValidation);
  664. IndexSettingProviders indexSettingProviders = new IndexSettingProviders(
  665. pluginsService.flatMap(p -> p.getAdditionalIndexSettingProviders(parameters)).collect(Collectors.toSet())
  666. );
  667. final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
  668. final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
  669. settings,
  670. clusterService,
  671. indicesService,
  672. clusterModule.getAllocationService(),
  673. shardLimitValidator,
  674. environment,
  675. settingsModule.getIndexScopedSettings(),
  676. threadPool,
  677. xContentRegistry,
  678. systemIndices,
  679. forbidPrivateIndexSettings,
  680. indexSettingProviders
  681. );
  682. final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(
  683. threadPool,
  684. clusterService,
  685. metadataCreateIndexService
  686. );
  687. final MetadataDataStreamsService metadataDataStreamsService = new MetadataDataStreamsService(clusterService, indicesService);
  688. final MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(
  689. clusterService,
  690. clusterModule.getAllocationService(),
  691. settingsModule.getIndexScopedSettings(),
  692. indicesService,
  693. shardLimitValidator,
  694. threadPool
  695. );
  696. Collection<Object> pluginComponents = pluginsService.flatMap(
  697. p -> p.createComponents(
  698. client,
  699. clusterService,
  700. threadPool,
  701. resourceWatcherService,
  702. scriptService,
  703. xContentRegistry,
  704. environment,
  705. nodeEnvironment,
  706. namedWriteableRegistry,
  707. clusterModule.getIndexNameExpressionResolver(),
  708. repositoriesServiceReference::get,
  709. tracer,
  710. clusterModule.getAllocationService(),
  711. indicesService
  712. )
  713. ).toList();
  714. List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();
  715. // add all reserved state handlers from server
  716. reservedStateHandlers.add(new ReservedClusterSettingsAction(settingsModule.getClusterSettings()));
  717. var templateService = new MetadataIndexTemplateService(
  718. clusterService,
  719. metadataCreateIndexService,
  720. indicesService,
  721. settingsModule.getIndexScopedSettings(),
  722. xContentRegistry,
  723. systemIndices,
  724. indexSettingProviders
  725. );
  726. reservedStateHandlers.add(new ReservedComposableIndexTemplateAction(templateService, settingsModule.getIndexScopedSettings()));
  727. // add all reserved state handlers from plugins
  728. List<? extends ReservedClusterStateHandlerProvider> pluginHandlers = pluginsService.loadServiceProviders(
  729. ReservedClusterStateHandlerProvider.class
  730. );
  731. pluginHandlers.forEach(h -> reservedStateHandlers.addAll(h.handlers()));
  732. List<TerminationHandler> terminationHandlers = pluginsService.loadServiceProviders(TerminationHandlerProvider.class)
  733. .stream()
  734. .map(prov -> prov.handler())
  735. .toList();
  736. if (terminationHandlers.size() == 1) {
  737. this.terminationHandler.set(terminationHandlers.get(0));
  738. } else if (terminationHandlers.size() > 1) {
  739. throw new IllegalStateException(
  740. Strings.format(
  741. "expected at most one termination handler, but found %s: [%s]",
  742. terminationHandlers.size(),
  743. terminationHandlers.stream().map(it -> it.getClass().getCanonicalName())
  744. )
  745. );
  746. }
  747. ActionModule actionModule = new ActionModule(
  748. settings,
  749. clusterModule.getIndexNameExpressionResolver(),
  750. settingsModule.getIndexScopedSettings(),
  751. settingsModule.getClusterSettings(),
  752. settingsModule.getSettingsFilter(),
  753. threadPool,
  754. pluginsService.filterPlugins(ActionPlugin.class),
  755. client,
  756. circuitBreakerService,
  757. usageService,
  758. systemIndices,
  759. tracer,
  760. clusterService,
  761. reservedStateHandlers,
  762. pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
  763. );
  764. modules.add(actionModule);
  765. final RestController restController = actionModule.getRestController();
  766. final NetworkModule networkModule = new NetworkModule(
  767. settings,
  768. pluginsService.filterPlugins(NetworkPlugin.class),
  769. threadPool,
  770. bigArrays,
  771. pageCacheRecycler,
  772. circuitBreakerService,
  773. namedWriteableRegistry,
  774. xContentRegistry,
  775. networkService,
  776. restController,
  777. actionModule::copyRequestHeadersToThreadContext,
  778. clusterService.getClusterSettings(),
  779. tracer
  780. );
  781. Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.map(
  782. Plugin::getIndexTemplateMetadataUpgrader
  783. ).toList();
  784. final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);
  785. final IndexMetadataVerifier indexMetadataVerifier = new IndexMetadataVerifier(
  786. settings,
  787. clusterService,
  788. xContentRegistry,
  789. indicesModule.getMapperRegistry(),
  790. settingsModule.getIndexScopedSettings(),
  791. scriptService
  792. );
  793. if (DiscoveryNode.isMasterNode(settings)) {
  794. clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
  795. }
  796. new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
  797. final Transport transport = networkModule.getTransportSupplier().get();
  798. final TransportService transportService = newTransportService(
  799. settings,
  800. transport,
  801. threadPool,
  802. networkModule.getTransportInterceptor(),
  803. localNodeFactory,
  804. settingsModule.getClusterSettings(),
  805. taskManager,
  806. tracer
  807. );
  808. final GatewayMetaState gatewayMetaState = new GatewayMetaState();
  809. final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
  810. final SearchTransportService searchTransportService = new SearchTransportService(
  811. transportService,
  812. client,
  813. SearchExecutionStatsCollector.makeWrapper(responseCollectorService)
  814. );
  815. final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
  816. final IndexingPressure indexingLimits = new IndexingPressure(settings);
  817. final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
  818. RepositoriesModule repositoriesModule = new RepositoriesModule(
  819. this.environment,
  820. pluginsService.filterPlugins(RepositoryPlugin.class),
  821. transportService,
  822. clusterService,
  823. bigArrays,
  824. xContentRegistry,
  825. recoverySettings
  826. );
  827. RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
  828. repositoriesServiceReference.set(repositoryService);
  829. SnapshotsService snapshotsService = new SnapshotsService(
  830. settings,
  831. clusterService,
  832. clusterModule.getIndexNameExpressionResolver(),
  833. repositoryService,
  834. transportService,
  835. actionModule.getActionFilters(),
  836. systemIndices
  837. );
  838. SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
  839. settings,
  840. clusterService,
  841. repositoryService,
  842. transportService,
  843. indicesService
  844. );
  845. actionModule.getReservedClusterStateService().installStateHandler(new ReservedRepositoryAction(repositoryService));
  846. actionModule.getReservedClusterStateService().installStateHandler(new ReservedPipelineAction());
  847. FileSettingsService fileSettingsService = new FileSettingsService(
  848. clusterService,
  849. actionModule.getReservedClusterStateService(),
  850. environment
  851. );
  852. RestoreService restoreService = new RestoreService(
  853. clusterService,
  854. repositoryService,
  855. clusterModule.getAllocationService(),
  856. metadataCreateIndexService,
  857. indexMetadataVerifier,
  858. shardLimitValidator,
  859. systemIndices,
  860. indicesService,
  861. fileSettingsService,
  862. threadPool
  863. );
  864. final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
  865. settings,
  866. clusterService::state,
  867. clusterService.getClusterSettings(),
  868. client,
  869. threadPool::relativeTimeInMillis,
  870. rerouteService
  871. );
  872. clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
  873. final DiscoveryModule discoveryModule = new DiscoveryModule(
  874. settings,
  875. transportService,
  876. client,
  877. namedWriteableRegistry,
  878. networkService,
  879. clusterService.getMasterService(),
  880. clusterService.getClusterApplierService(),
  881. clusterService.getClusterSettings(),
  882. pluginsService.filterPlugins(DiscoveryPlugin.class),
  883. pluginsService.filterPlugins(ClusterCoordinationPlugin.class),
  884. clusterModule.getAllocationService(),
  885. environment.configFile(),
  886. gatewayMetaState,
  887. rerouteService,
  888. fsHealthService,
  889. circuitBreakerService,
  890. compatibilityVersions
  891. );
  892. this.nodeService = new NodeService(
  893. settings,
  894. threadPool,
  895. monitorService,
  896. discoveryModule.getCoordinator(),
  897. transportService,
  898. indicesService,
  899. pluginsService,
  900. circuitBreakerService,
  901. scriptService,
  902. httpServerTransport,
  903. ingestService,
  904. clusterService,
  905. settingsModule.getSettingsFilter(),
  906. responseCollectorService,
  907. searchTransportService,
  908. indexingLimits,
  909. searchModule.getValuesSourceRegistry().getUsageService(),
  910. repositoryService
  911. );
  912. final SearchService searchService = newSearchService(
  913. clusterService,
  914. indicesService,
  915. threadPool,
  916. scriptService,
  917. bigArrays,
  918. searchModule.getFetchPhase(),
  919. responseCollectorService,
  920. circuitBreakerService,
  921. executorSelector,
  922. tracer
  923. );
  924. final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
  925. final SystemIndexMigrationExecutor systemIndexMigrationExecutor = new SystemIndexMigrationExecutor(
  926. client,
  927. clusterService,
  928. systemIndices,
  929. metadataUpdateSettingsService,
  930. metadataCreateIndexService,
  931. settingsModule.getIndexScopedSettings()
  932. );
  933. final HealthNodeTaskExecutor healthNodeTaskExecutor = HealthNodeTaskExecutor.create(
  934. clusterService,
  935. persistentTasksService,
  936. settings,
  937. clusterService.getClusterSettings()
  938. );
  939. final List<PersistentTasksExecutor<?>> builtinTaskExecutors = List.of(systemIndexMigrationExecutor, healthNodeTaskExecutor);
  940. final List<PersistentTasksExecutor<?>> pluginTaskExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
  941. .stream()
  942. .map(
  943. p -> p.getPersistentTasksExecutor(
  944. clusterService,
  945. threadPool,
  946. client,
  947. settingsModule,
  948. clusterModule.getIndexNameExpressionResolver()
  949. )
  950. )
  951. .flatMap(List::stream)
  952. .collect(toList());
  953. final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(
  954. concatLists(pluginTaskExecutors, builtinTaskExecutors)
  955. );
  956. final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(
  957. settings,
  958. registry,
  959. clusterService,
  960. threadPool
  961. );
  962. resourcesToClose.add(persistentTasksClusterService);
  963. final List<ShutdownAwarePlugin> shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class);
  964. final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
  965. clusterService.addListener(pluginShutdownService);
  966. final RecoveryPlannerService recoveryPlannerService = getRecoveryPlannerService(threadPool, clusterService, repositoryService);
  967. final DesiredNodesSettingsValidator desiredNodesSettingsValidator = new DesiredNodesSettingsValidator(
  968. clusterService.getClusterSettings()
  969. );
  970. final MasterHistoryService masterHistoryService = new MasterHistoryService(transportService, threadPool, clusterService);
  971. final CoordinationDiagnosticsService coordinationDiagnosticsService = new CoordinationDiagnosticsService(
  972. clusterService,
  973. transportService,
  974. discoveryModule.getCoordinator(),
  975. masterHistoryService
  976. );
  977. final HealthService healthService = createHealthService(
  978. clusterService,
  979. clusterModule,
  980. coordinationDiagnosticsService,
  981. threadPool,
  982. systemIndices
  983. );
  984. HealthPeriodicLogger healthPeriodicLogger = createHealthPeriodicLogger(clusterService, settings, client, healthService);
  985. healthPeriodicLogger.init();
  986. HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
  987. LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
  988. HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
  989. HealthApiStats healthApiStats = new HealthApiStats();
  990. List<ReloadablePlugin> reloadablePlugins = pluginsService.filterPlugins(ReloadablePlugin.class);
  991. pluginsService.filterPlugins(ReloadAwarePlugin.class).forEach(p -> p.setReloadCallback(wrapPlugins(reloadablePlugins)));
  992. modules.add(b -> {
  993. b.bind(Node.class).toInstance(this);
  994. b.bind(NodeService.class).toInstance(nodeService);
  995. b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
  996. b.bind(PluginsService.class).toInstance(pluginsService);
  997. b.bind(Client.class).toInstance(client);
  998. b.bind(NodeClient.class).toInstance(client);
  999. b.bind(Environment.class).toInstance(this.environment);
  1000. b.bind(ThreadPool.class).toInstance(threadPool);
  1001. b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
  1002. b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
  1003. b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
  1004. b.bind(BigArrays.class).toInstance(bigArrays);
  1005. b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
  1006. b.bind(ScriptService.class).toInstance(scriptService);
  1007. b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
  1008. b.bind(IngestService.class).toInstance(ingestService);
  1009. b.bind(IndexingPressure.class).toInstance(indexingLimits);
  1010. b.bind(UsageService.class).toInstance(usageService);
  1011. b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
  1012. b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
  1013. b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);
  1014. b.bind(MetaStateService.class).toInstance(metaStateService);
  1015. b.bind(PersistedClusterStateService.class).toInstance(persistedClusterStateService);
  1016. b.bind(IndicesService.class).toInstance(indicesService);
  1017. b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
  1018. b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
  1019. b.bind(MetadataDataStreamsService.class).toInstance(metadataDataStreamsService);
  1020. b.bind(MetadataUpdateSettingsService.class).toInstance(metadataUpdateSettingsService);
  1021. b.bind(SearchService.class).toInstance(searchService);
  1022. b.bind(SearchTransportService.class).toInstance(searchTransportService);
  1023. b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder));
  1024. b.bind(Transport.class).toInstance(transport);
  1025. b.bind(TransportService.class).toInstance(transportService);
  1026. b.bind(NetworkService.class).toInstance(networkService);
  1027. b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
  1028. b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier);
  1029. b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
  1030. b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
  1031. b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
  1032. b.bind(Coordinator.class).toInstance(discoveryModule.getCoordinator());
  1033. b.bind(Reconfigurator.class).toInstance(discoveryModule.getReconfigurator());
  1034. {
  1035. processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
  1036. final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoryService);
  1037. b.bind(PeerRecoverySourceService.class)
  1038. .toInstance(
  1039. new PeerRecoverySourceService(
  1040. transportService,
  1041. indicesService,
  1042. clusterService,
  1043. recoverySettings,
  1044. recoveryPlannerService
  1045. )
  1046. );
  1047. b.bind(PeerRecoveryTargetService.class)
  1048. .toInstance(
  1049. new PeerRecoveryTargetService(
  1050. client,
  1051. threadPool,
  1052. transportService,
  1053. recoverySettings,
  1054. clusterService,
  1055. snapshotFilesProvider
  1056. )
  1057. );
  1058. }
  1059. b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
  1060. pluginComponents.forEach(p -> {
  1061. if (p instanceof PluginComponentBinding<?, ?> pcb) {
  1062. @SuppressWarnings("unchecked")
  1063. Class<Object> clazz = (Class<Object>) pcb.inter();
  1064. b.bind(clazz).toInstance(pcb.impl());
  1065. } else {
  1066. @SuppressWarnings("unchecked")
  1067. Class<Object> clazz = (Class<Object>) p.getClass();
  1068. b.bind(clazz).toInstance(p);
  1069. }
  1070. });
  1071. b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
  1072. b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
  1073. b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
  1074. b.bind(RepositoriesService.class).toInstance(repositoryService);
  1075. b.bind(SnapshotsService.class).toInstance(snapshotsService);
  1076. b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
  1077. b.bind(RestoreService.class).toInstance(restoreService);
  1078. b.bind(RerouteService.class).toInstance(rerouteService);
  1079. b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
  1080. b.bind(FsHealthService.class).toInstance(fsHealthService);
  1081. b.bind(SystemIndices.class).toInstance(systemIndices);
  1082. b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
  1083. b.bind(ExecutorSelector.class).toInstance(executorSelector);
  1084. b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders);
  1085. b.bind(DesiredNodesSettingsValidator.class).toInstance(desiredNodesSettingsValidator);
  1086. b.bind(HealthService.class).toInstance(healthService);
  1087. b.bind(MasterHistoryService.class).toInstance(masterHistoryService);
  1088. b.bind(CoordinationDiagnosticsService.class).toInstance(coordinationDiagnosticsService);
  1089. b.bind(HealthNodeTaskExecutor.class).toInstance(healthNodeTaskExecutor);
  1090. b.bind(HealthMetadataService.class).toInstance(healthMetadataService);
  1091. b.bind(LocalHealthMonitor.class).toInstance(localHealthMonitor);
  1092. b.bind(HealthInfoCache.class).toInstance(nodeHealthOverview);
  1093. b.bind(HealthApiStats.class).toInstance(healthApiStats);
  1094. b.bind(Tracer.class).toInstance(tracer);
  1095. b.bind(FileSettingsService.class).toInstance(fileSettingsService);
  1096. b.bind(WriteLoadForecaster.class).toInstance(writeLoadForecaster);
  1097. b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
  1098. b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
  1099. });
  1100. if (ReadinessService.enabled(environment)) {
  1101. modules.add(b -> b.bind(ReadinessService.class).toInstance(newReadinessService(clusterService, environment)));
  1102. }
  1103. injector = modules.createInjector();
  1104. // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
  1105. // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
  1106. // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
  1107. // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
  1108. // reroute, which needs to call into the allocation service. We close the loop here:
  1109. clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
  1110. List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream().map(p -> {
  1111. if (p instanceof PluginComponentBinding<?, ?> pcb) {
  1112. return pcb.impl();
  1113. }
  1114. return p;
  1115. }).filter(p -> p instanceof LifecycleComponent).map(p -> (LifecycleComponent) p).toList();
  1116. resourcesToClose.addAll(pluginLifecycleComponents);
  1117. resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
  1118. this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
  1119. // Due to Java's type erasure with generics, the injector can't give us exactly what we need, and we have
  1120. // to resort to some evil casting.
  1121. @SuppressWarnings("rawtypes")
  1122. Map<ActionType<? extends ActionResponse>, TransportAction<? extends ActionRequest, ? extends ActionResponse>> actions =
  1123. forciblyCast(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
  1124. }));
  1125. client.initialize(
  1126. actions,
  1127. transportService.getTaskManager(),
  1128. () -> clusterService.localNode().getId(),
  1129. transportService.getLocalNodeConnection(),
  1130. transportService.getRemoteClusterService(),
  1131. namedWriteableRegistry
  1132. );
  1133. this.namedWriteableRegistry = namedWriteableRegistry;
  1134. this.namedXContentRegistry = xContentRegistry;
  1135. logger.debug("initializing HTTP handlers ...");
  1136. actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered());
  1137. logger.info("initialized");
  1138. success = true;
  1139. } catch (IOException ex) {
  1140. throw new ElasticsearchException("failed to bind service", ex);
  1141. } finally {
  1142. if (success == false) {
  1143. IOUtils.closeWhileHandlingException(resourcesToClose);
  1144. }
  1145. }
  1146. }
  1147. private Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier() {
  1148. List<DocumentParsingObserverPlugin> plugins = pluginsService.filterPlugins(DocumentParsingObserverPlugin.class);
  1149. if (plugins.size() == 1) {
  1150. return plugins.get(0).getDocumentParsingObserverSupplier();
  1151. } else if (plugins.size() == 0) {
  1152. return () -> DocumentParsingObserver.EMPTY_INSTANCE;
  1153. }
  1154. throw new IllegalStateException("too many DocumentParsingObserverPlugin instances");
  1155. }
  1156. /**
  1157. * If the JVM was started with the Elastic APM agent and a config file argument was specified, then
  1158. * delete the config file. The agent only reads it once, when supplied in this fashion, and it
  1159. * may contain a secret token.
  1160. * <p>
  1161. * Public for testing only
  1162. */
  1163. @SuppressForbidden(reason = "Cannot guarantee that the temp config path is relative to the environment")
  1164. public static void deleteTemporaryApmConfig(JvmInfo jvmInfo, BiConsumer<Exception, Path> errorHandler) {
  1165. for (String inputArgument : jvmInfo.getInputArguments()) {
  1166. if (inputArgument.startsWith("-javaagent:")) {
  1167. final String agentArg = inputArgument.substring(11);
  1168. final String[] parts = agentArg.split("=", 2);
  1169. String APM_AGENT_CONFIG_FILE_REGEX = String.join(
  1170. "\\" + File.separator,
  1171. ".*modules",
  1172. "apm",
  1173. "elastic-apm-agent-\\d+\\.\\d+\\.\\d+\\.jar"
  1174. );
  1175. if (parts[0].matches(APM_AGENT_CONFIG_FILE_REGEX)) {
  1176. if (parts.length == 2 && parts[1].startsWith("c=")) {
  1177. final Path apmConfig = PathUtils.get(parts[1].substring(2));
  1178. if (apmConfig.getFileName().toString().matches("^\\.elstcapm\\..*\\.tmp")) {
  1179. try {
  1180. Files.deleteIfExists(apmConfig);
  1181. } catch (IOException e) {
  1182. errorHandler.accept(e, apmConfig);
  1183. }
  1184. }
  1185. }
  1186. return;
  1187. }
  1188. }
  1189. }
  1190. }
  1191. /**
  1192. * Wrap a group of reloadable plugins into a single reloadable plugin interface
  1193. * @param reloadablePlugins A list of reloadable plugins
  1194. * @return A single ReloadablePlugin that, upon reload, reloads the plugins it wraps
  1195. */
  1196. private static ReloadablePlugin wrapPlugins(List<ReloadablePlugin> reloadablePlugins) {
  1197. return settings -> {
  1198. for (ReloadablePlugin plugin : reloadablePlugins) {
  1199. try {
  1200. plugin.reload(settings);
  1201. } catch (IOException e) {
  1202. throw new RuntimeException(e);
  1203. }
  1204. }
  1205. };
  1206. }
  1207. private Tracer getTracer(PluginsService pluginsService, Settings settings) {
  1208. final List<TracerPlugin> tracerPlugins = pluginsService.filterPlugins(TracerPlugin.class);
  1209. if (tracerPlugins.size() > 1) {
  1210. throw new IllegalStateException("A single TracerPlugin was expected but got: " + tracerPlugins);
  1211. }
  1212. return tracerPlugins.isEmpty() ? Tracer.NOOP : tracerPlugins.get(0).getTracer(settings);
  1213. }
  1214. private HealthService createHealthService(
  1215. ClusterService clusterService,
  1216. ClusterModule clusterModule,
  1217. CoordinationDiagnosticsService coordinationDiagnosticsService,
  1218. ThreadPool threadPool,
  1219. SystemIndices systemIndices
  1220. ) {
  1221. var serverHealthIndicatorServices = List.of(
  1222. new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService),
  1223. new RepositoryIntegrityHealthIndicatorService(clusterService),
  1224. new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService(), systemIndices),
  1225. new DiskHealthIndicatorService(clusterService),
  1226. new ShardsCapacityHealthIndicatorService(clusterService)
  1227. );
  1228. var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
  1229. .stream()
  1230. .flatMap(plugin -> plugin.getHealthIndicatorServices().stream())
  1231. .toList();
  1232. return new HealthService(concatLists(serverHealthIndicatorServices, pluginHealthIndicatorServices), threadPool);
  1233. }
  1234. private HealthPeriodicLogger createHealthPeriodicLogger(
  1235. ClusterService clusterService,
  1236. Settings settings,
  1237. NodeClient client,
  1238. HealthService healthService
  1239. ) {
  1240. return new HealthPeriodicLogger(settings, clusterService, client, healthService);
  1241. }
  1242. private RecoveryPlannerService getRecoveryPlannerService(
  1243. ThreadPool threadPool,
  1244. ClusterService clusterService,
  1245. RepositoriesService repositoryService
  1246. ) {
  1247. final List<RecoveryPlannerService> recoveryPlannerServices = pluginsService.filterPlugins(RecoveryPlannerPlugin.class)
  1248. .stream()
  1249. .map(
  1250. plugin -> plugin.createRecoveryPlannerService(
  1251. new ShardSnapshotsService(client, repositoryService, threadPool, clusterService)
  1252. )
  1253. )
  1254. .filter(Optional::isPresent)
  1255. .map(Optional::get)
  1256. .toList();
  1257. if (recoveryPlannerServices.isEmpty()) {
  1258. return new PeerOnlyRecoveryPlannerService();
  1259. } else if (recoveryPlannerServices.size() > 1) {
  1260. throw new IllegalStateException("Expected a single RecoveryPlannerService but got: " + recoveryPlannerServices.size());
  1261. }
  1262. return recoveryPlannerServices.get(0);
  1263. }
  1264. private WriteLoadForecaster getWriteLoadForecaster(ThreadPool threadPool, Settings settings, ClusterSettings clusterSettings) {
  1265. final List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
  1266. final List<WriteLoadForecaster> writeLoadForecasters = clusterPlugins.stream()
  1267. .flatMap(clusterPlugin -> clusterPlugin.createWriteLoadForecasters(threadPool, settings, clusterSettings).stream())
  1268. .toList();
  1269. if (writeLoadForecasters.isEmpty()) {
  1270. return WriteLoadForecaster.DEFAULT;
  1271. }
  1272. if (writeLoadForecasters.size() > 1) {
  1273. throw new IllegalStateException("A single WriteLoadForecaster was expected but got: " + writeLoadForecasters);
  1274. }
  1275. return writeLoadForecasters.get(0);
  1276. }
  1277. private PersistedClusterStateService newPersistedClusterStateService(
  1278. NamedXContentRegistry xContentRegistry,
  1279. ClusterSettings clusterSettings,
  1280. ThreadPool threadPool,
  1281. CompatibilityVersions compatibilityVersions
  1282. ) {
  1283. final List<ClusterCoordinationPlugin.PersistedClusterStateServiceFactory> persistedClusterStateServiceFactories = pluginsService
  1284. .filterPlugins(ClusterCoordinationPlugin.class)
  1285. .stream()
  1286. .map(ClusterCoordinationPlugin::getPersistedClusterStateServiceFactory)
  1287. .flatMap(Optional::stream)
  1288. .toList();
  1289. if (persistedClusterStateServiceFactories.size() > 1) {
  1290. throw new IllegalStateException("multiple persisted-state-service factories found: " + persistedClusterStateServiceFactories);
  1291. }
  1292. if (persistedClusterStateServiceFactories.size() == 1) {
  1293. return persistedClusterStateServiceFactories.get(0)
  1294. .newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool, compatibilityVersions);
  1295. }
  1296. return new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis);
  1297. }
  1298. protected TransportService newTransportService(
  1299. Settings settings,
  1300. Transport transport,
  1301. ThreadPool threadPool,
  1302. TransportInterceptor interceptor,
  1303. Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
  1304. ClusterSettings clusterSettings,
  1305. TaskManager taskManager,
  1306. Tracer tracer
  1307. ) {
  1308. return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskManager, tracer);
  1309. }
  1310. protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
  1311. // Noop in production, overridden by tests
  1312. }
  1313. /**
  1314. * The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
  1315. */
  1316. public Settings settings() {
  1317. return this.environment.settings();
  1318. }
  1319. /**
  1320. * A client that can be used to execute actions (operations) against the cluster.
  1321. */
  1322. public Client client() {
  1323. return client;
  1324. }
  1325. /**
  1326. * Returns the environment of the node
  1327. */
  1328. public Environment getEnvironment() {
  1329. return environment;
  1330. }
  1331. /**
  1332. * Returns the {@link NodeEnvironment} instance of this node
  1333. */
  1334. public NodeEnvironment getNodeEnvironment() {
  1335. return nodeEnvironment;
  1336. }
  1337. /**
  1338. * Start the node. If the node is already started, this method is no-op.
  1339. */
  1340. public Node start() throws NodeValidationException {
  1341. if (lifecycle.moveToStarted() == false) {
  1342. return this;
  1343. }
  1344. logger.info("starting ...");
  1345. pluginLifecycleComponents.forEach(LifecycleComponent::start);
  1346. if (ReadinessService.enabled(environment)) {
  1347. injector.getInstance(ReadinessService.class).start();
  1348. }
  1349. injector.getInstance(MappingUpdatedAction.class).setClient(client);
  1350. injector.getInstance(IndicesService.class).start();
  1351. injector.getInstance(IndicesClusterStateService.class).start();
  1352. injector.getInstance(SnapshotsService.class).start();
  1353. injector.getInstance(SnapshotShardsService.class).start();
  1354. injector.getInstance(RepositoriesService.class).start();
  1355. injector.getInstance(SearchService.class).start();
  1356. injector.getInstance(FsHealthService.class).start();
  1357. nodeService.getMonitorService().start();
  1358. final ClusterService clusterService = injector.getInstance(ClusterService.class);
  1359. final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
  1360. nodeConnectionsService.start();
  1361. clusterService.setNodeConnectionsService(nodeConnectionsService);
  1362. injector.getInstance(GatewayService.class).start();
  1363. final Coordinator coordinator = injector.getInstance(Coordinator.class);
  1364. clusterService.getMasterService().setClusterStatePublisher(coordinator);
  1365. // Start the transport service now so the publish address will be added to the local disco node in ClusterService
  1366. TransportService transportService = injector.getInstance(TransportService.class);
  1367. transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
  1368. transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
  1369. transportService.start();
  1370. assert localNodeFactory.getNode() != null;
  1371. assert transportService.getLocalNode().equals(localNodeFactory.getNode())
  1372. : "transportService has a different local node than the factory provided";
  1373. injector.getInstance(PeerRecoverySourceService.class).start();
  1374. // Load (and maybe upgrade) the metadata stored on disk
  1375. final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
  1376. gatewayMetaState.start(
  1377. settings(),
  1378. transportService,
  1379. clusterService,
  1380. injector.getInstance(MetaStateService.class),
  1381. injector.getInstance(IndexMetadataVerifier.class),
  1382. injector.getInstance(MetadataUpgrader.class),
  1383. injector.getInstance(PersistedClusterStateService.class),
  1384. pluginsService.filterPlugins(ClusterCoordinationPlugin.class),
  1385. injector.getInstance(CompatibilityVersions.class)
  1386. );
  1387. // TODO: Do not expect that the legacy metadata file is always present https://github.com/elastic/elasticsearch/issues/95211
  1388. if (Assertions.ENABLED && DiscoveryNode.isStateless(settings()) == false) {
  1389. try {
  1390. assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
  1391. final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(
  1392. logger,
  1393. NamedXContentRegistry.EMPTY,
  1394. nodeEnvironment.nodeDataPaths()
  1395. );
  1396. assert nodeMetadata != null;
  1397. assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
  1398. assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
  1399. } catch (IOException e) {
  1400. assert false : e;
  1401. }
  1402. }
  1403. // we load the global state here (the persistent part of the cluster state stored on disk) to
  1404. // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
  1405. final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();
  1406. assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
  1407. validateNodeBeforeAcceptingRequests(
  1408. new BootstrapContext(environment, onDiskMetadata),
  1409. transportService.boundAddress(),
  1410. pluginsService.flatMap(Plugin::getBootstrapChecks).toList()
  1411. );
  1412. final FileSettingsService fileSettingsService = injector.getInstance(FileSettingsService.class);
  1413. fileSettingsService.start();
  1414. // if we are using the readiness service, listen for the file settings being applied
  1415. if (ReadinessService.enabled(environment)) {
  1416. fileSettingsService.addFileChangedListener(injector.getInstance(ReadinessService.class));
  1417. }
  1418. clusterService.addStateApplier(transportService.getTaskManager());
  1419. // start after transport service so the local disco is known
  1420. coordinator.start(); // start before cluster service so that it can set initial state on ClusterApplierService
  1421. clusterService.start();
  1422. assert clusterService.localNode().equals(localNodeFactory.getNode())
  1423. : "clusterService has a different local node than the factory provided";
  1424. transportService.acceptIncomingRequests();
  1425. /*
  1426. * CoordinationDiagnosticsService expects to be able to send transport requests and use the cluster state, so it is important to
  1427. * start it here after the clusterService and transportService have been started.
  1428. */
  1429. injector.getInstance(CoordinationDiagnosticsService.class).start();
  1430. coordinator.startInitialJoin();
  1431. final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());
  1432. configureNodeAndClusterIdStateListener(clusterService);
  1433. if (initialStateTimeout.millis() > 0) {
  1434. final ThreadPool thread = injector.getInstance(ThreadPool.class);
  1435. ClusterState clusterState = clusterService.state();
  1436. ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
  1437. if (clusterState.nodes().getMasterNodeId() == null) {
  1438. logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
  1439. final CountDownLatch latch = new CountDownLatch(1);
  1440. observer.waitForNextChange(new ClusterStateObserver.Listener() {
  1441. @Override
  1442. public void onNewClusterState(ClusterState state) {
  1443. latch.countDown();
  1444. }
  1445. @Override
  1446. public void onClusterServiceClose() {
  1447. latch.countDown();
  1448. }
  1449. @Override
  1450. public void onTimeout(TimeValue timeout) {
  1451. logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout);
  1452. latch.countDown();
  1453. }
  1454. }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
  1455. try {
  1456. latch.await();
  1457. } catch (InterruptedException e) {
  1458. throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
  1459. }
  1460. }
  1461. }
  1462. injector.getInstance(HttpServerTransport.class).start();
  1463. if (WRITE_PORTS_FILE_SETTING.get(settings())) {
  1464. TransportService transport = injector.getInstance(TransportService.class);
  1465. writePortsFile("transport", transport.boundAddress());
  1466. HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
  1467. writePortsFile("http", http.boundAddress());
  1468. if (ReadinessService.enabled(environment)) {
  1469. ReadinessService readiness = injector.getInstance(ReadinessService.class);
  1470. readiness.addBoundAddressListener(address -> writePortsFile("readiness", address));
  1471. }
  1472. if (RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.get(environment.settings())) {
  1473. writePortsFile("remote_cluster", transport.boundRemoteAccessAddress());
  1474. }
  1475. }
  1476. logger.info("started {}", transportService.getLocalNode());
  1477. pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
  1478. return this;
  1479. }
  1480. protected void configureNodeAndClusterIdStateListener(ClusterService clusterService) {
  1481. NodeAndClusterIdStateListener.getAndSetNodeIdAndClusterId(
  1482. clusterService,
  1483. injector.getInstance(ThreadPool.class).getThreadContext()
  1484. );
  1485. }
  1486. private void stop() {
  1487. if (lifecycle.moveToStopped() == false) {
  1488. return;
  1489. }
  1490. logger.info("stopping ...");
  1491. if (ReadinessService.enabled(environment)) {
  1492. stopIfStarted(ReadinessService.class);
  1493. }
  1494. stopIfStarted(FileSettingsService.class);
  1495. injector.getInstance(ResourceWatcherService.class).close();
  1496. stopIfStarted(HttpServerTransport.class);
  1497. stopIfStarted(SnapshotsService.class);
  1498. stopIfStarted(SnapshotShardsService.class);
  1499. stopIfStarted(RepositoriesService.class);
  1500. // stop any changes happening as a result of cluster state changes
  1501. stopIfStarted(IndicesClusterStateService.class);
  1502. // close cluster coordinator early to not react to pings anymore.
  1503. // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
  1504. stopIfStarted(Coordinator.class);
  1505. // we close indices first, so operations won't be allowed on it
  1506. stopIfStarted(ClusterService.class);
  1507. stopIfStarted(NodeConnectionsService.class);
  1508. stopIfStarted(FsHealthService.class);
  1509. stopIfStarted(nodeService.getMonitorService());
  1510. stopIfStarted(GatewayService.class);
  1511. stopIfStarted(SearchService.class);
  1512. stopIfStarted(TransportService.class);
  1513. pluginLifecycleComponents.forEach(Node::stopIfStarted);
  1514. // we should stop this last since it waits for resources to get released
  1515. // if we had scroll searchers etc or recovery going on we wait for to finish.
  1516. stopIfStarted(IndicesService.class);
  1517. logger.info("stopped");
  1518. }
  1519. private <T extends LifecycleComponent> void stopIfStarted(Class<T> componentClass) {
  1520. stopIfStarted(injector.getInstance(componentClass));
  1521. }
  1522. private static void stopIfStarted(LifecycleComponent component) {
  1523. // if we failed during startup then some of our components might not have started yet
  1524. if (component.lifecycleState() == Lifecycle.State.STARTED) {
  1525. component.stop();
  1526. }
  1527. }
  1528. // During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle.
  1529. // If not, the hook that is added in Bootstrap#setup() will be useless:
  1530. // close() might not be executed, in case another (for example api) call to close() has already set some lifecycles to stopped.
  1531. // In this case the process will be terminated even if the first call to close() has not finished yet.
  1532. @Override
  1533. public synchronized void close() throws IOException {
  1534. synchronized (lifecycle) {
  1535. if (lifecycle.started()) {
  1536. stop();
  1537. }
  1538. if (lifecycle.moveToClosed() == false) {
  1539. return;
  1540. }
  1541. }
  1542. logger.info("closing ...");
  1543. List<Closeable> toClose = new ArrayList<>();
  1544. StopWatch stopWatch = new StopWatch("node_close");
  1545. toClose.add(() -> stopWatch.start("node_service"));
  1546. toClose.add(nodeService);
  1547. toClose.add(() -> stopWatch.stop().start("http"));
  1548. toClose.add(injector.getInstance(HttpServerTransport.class));
  1549. toClose.add(() -> stopWatch.stop().start("snapshot_service"));
  1550. toClose.add(injector.getInstance(SnapshotsService.class));
  1551. toClose.add(injector.getInstance(SnapshotShardsService.class));
  1552. toClose.add(injector.getInstance(RepositoriesService.class));
  1553. toClose.add(() -> stopWatch.stop().start("client"));
  1554. Releasables.close(injector.getInstance(Client.class));
  1555. toClose.add(() -> stopWatch.stop().start("indices_cluster"));
  1556. toClose.add(injector.getInstance(IndicesClusterStateService.class));
  1557. toClose.add(() -> stopWatch.stop().start("indices"));
  1558. toClose.add(injector.getInstance(IndicesService.class));
  1559. // close filter/fielddata caches after indices
  1560. toClose.add(injector.getInstance(IndicesStore.class));
  1561. toClose.add(injector.getInstance(PeerRecoverySourceService.class));
  1562. toClose.add(() -> stopWatch.stop().start("cluster"));
  1563. toClose.add(injector.getInstance(ClusterService.class));
  1564. toClose.add(() -> stopWatch.stop().start("node_connections_service"));
  1565. toClose.add(injector.getInstance(NodeConnectionsService.class));
  1566. toClose.add(() -> stopWatch.stop().start("cluster_coordinator"));
  1567. toClose.add(injector.getInstance(Coordinator.class));
  1568. toClose.add(() -> stopWatch.stop().start("monitor"));
  1569. toClose.add(nodeService.getMonitorService());
  1570. toClose.add(() -> stopWatch.stop().start("fsHealth"));
  1571. toClose.add(injector.getInstance(FsHealthService.class));
  1572. toClose.add(() -> stopWatch.stop().start("gateway"));
  1573. toClose.add(injector.getInstance(GatewayService.class));
  1574. toClose.add(() -> stopWatch.stop().start("search"));
  1575. toClose.add(injector.getInstance(SearchService.class));
  1576. toClose.add(() -> stopWatch.stop().start("transport"));
  1577. toClose.add(injector.getInstance(TransportService.class));
  1578. if (ReadinessService.enabled(environment)) {
  1579. toClose.add(injector.getInstance(ReadinessService.class));
  1580. }
  1581. toClose.add(injector.getInstance(FileSettingsService.class));
  1582. toClose.add(injector.getInstance(HealthPeriodicLogger.class));
  1583. for (LifecycleComponent plugin : pluginLifecycleComponents) {
  1584. toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
  1585. toClose.add(plugin);
  1586. }
  1587. toClose.addAll(pluginsService.filterPlugins(Plugin.class));
  1588. toClose.add(() -> stopWatch.stop().start("script"));
  1589. toClose.add(injector.getInstance(ScriptService.class));
  1590. toClose.add(() -> stopWatch.stop().start("thread_pool"));
  1591. toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
  1592. // Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
  1593. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
  1594. // awaitClose if the node doesn't finish closing within the specified time.
  1595. toClose.add(() -> stopWatch.stop().start("gateway_meta_state"));
  1596. toClose.add(injector.getInstance(GatewayMetaState.class));
  1597. toClose.add(() -> stopWatch.stop().start("node_environment"));
  1598. toClose.add(injector.getInstance(NodeEnvironment.class));
  1599. toClose.add(stopWatch::stop);
  1600. if (logger.isTraceEnabled()) {
  1601. toClose.add(() -> logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()));
  1602. }
  1603. IOUtils.close(toClose);
  1604. logger.info("closed");
  1605. }
  1606. /**
  1607. * Invokes hooks to prepare this node to be closed. This should be called when Elasticsearch receives a request to shut down
  1608. * gracefully from the underlying operating system, before system resources are closed. This method will block
  1609. * until the node is ready to shut down.
  1610. *
  1611. * Note that this class is part of infrastructure to react to signals from the operating system - most graceful shutdown
  1612. * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
  1613. */
  1614. public void prepareForClose() {
  1615. HttpServerTransport httpServerTransport = injector.getInstance(HttpServerTransport.class);
  1616. FutureTask<Void> stopper = new FutureTask<>(() -> {
  1617. httpServerTransport.stop();
  1618. return null;
  1619. });
  1620. new Thread(stopper, "http-server-transport-stop").start();
  1621. Optional.ofNullable(terminationHandler.get()).ifPresent(TerminationHandler::handleTermination);
  1622. try {
  1623. stopper.get();
  1624. } catch (Exception e) {
  1625. logger.warn("unexpected exception while waiting for http server to close", e);
  1626. }
  1627. }
  1628. /**
  1629. * Wait for this node to be effectively closed.
  1630. */
  1631. // synchronized to prevent running concurrently with close()
  1632. public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
  1633. if (lifecycle.closed() == false) {
  1634. // We don't want to shutdown the threadpool or interrupt threads on a node that is not
  1635. // closed yet.
  1636. throw new IllegalStateException("Call close() first");
  1637. }
  1638. ThreadPool threadPool = injector.getInstance(ThreadPool.class);
  1639. final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
  1640. if (terminated) {
  1641. // All threads terminated successfully. Because search, recovery and all other operations
  1642. // that run on shards run in the threadpool, indices should be effectively closed by now.
  1643. if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
  1644. throw new IllegalStateException(
  1645. "Some shards are still open after the threadpool terminated. "
  1646. + "Something is leaking index readers or store references."
  1647. );
  1648. }
  1649. }
  1650. return terminated;
  1651. }
  1652. /**
  1653. * Returns {@code true} if the node is closed.
  1654. */
  1655. public boolean isClosed() {
  1656. return lifecycle.closed();
  1657. }
  1658. public Injector injector() {
  1659. return this.injector;
  1660. }
  1661. /**
  1662. * Hook for validating the node after network
  1663. * services are started but before the cluster service is started
  1664. * and before the network service starts accepting incoming network
  1665. * requests.
  1666. *
  1667. * @param context the bootstrap context for this node
  1668. * @param boundTransportAddress the network addresses the node is
  1669. * bound and publishing to
  1670. */
  1671. @SuppressWarnings("unused")
  1672. protected void validateNodeBeforeAcceptingRequests(
  1673. final BootstrapContext context,
  1674. final BoundTransportAddress boundTransportAddress,
  1675. List<BootstrapCheck> bootstrapChecks
  1676. ) throws NodeValidationException {}
  1677. /**
  1678. * Writes a file to the logs dir containing the ports for the given transport type
  1679. */
  1680. private void writePortsFile(String type, BoundTransportAddress boundAddress) {
  1681. Path tmpPortsFile = environment.logsFile().resolve(type + ".ports.tmp");
  1682. try (BufferedWriter writer = Files.newBufferedWriter(tmpPortsFile, Charset.forName("UTF-8"))) {
  1683. for (TransportAddress address : boundAddress.boundAddresses()) {
  1684. InetAddress inetAddress = InetAddress.getByName(address.getAddress());
  1685. writer.write(NetworkAddress.format(new InetSocketAddress(inetAddress, address.getPort())) + "\n");
  1686. }
  1687. } catch (IOException e) {
  1688. throw new RuntimeException("Failed to write ports file", e);
  1689. }
  1690. Path portsFile = environment.logsFile().resolve(type + ".ports");
  1691. try {
  1692. Files.move(tmpPortsFile, portsFile, StandardCopyOption.ATOMIC_MOVE);
  1693. } catch (IOException e) {
  1694. throw new RuntimeException("Failed to rename ports file", e);
  1695. }
  1696. }
  1697. /**
  1698. * The {@link PluginsService} used to build this node's components.
  1699. */
  1700. protected PluginsService getPluginsService() {
  1701. return pluginsService;
  1702. }
  1703. /**
  1704. * Plugins can provide additional settings for the node, but two plugins
  1705. * cannot provide the same setting.
  1706. * @param pluginMap A map of plugin names to plugin instances
  1707. * @param originalSettings The node's original settings, which silently override any setting provided by the plugins.
  1708. * @return A {@link Settings} with the merged node and plugin settings
  1709. * @throws IllegalArgumentException if two plugins provide the same additional setting key
  1710. */
  1711. static Settings mergePluginSettings(Map<String, Plugin> pluginMap, Settings originalSettings) {
  1712. Map<String, String> foundSettings = new HashMap<>();
  1713. final Settings.Builder builder = Settings.builder();
  1714. for (Map.Entry<String, Plugin> entry : pluginMap.entrySet()) {
  1715. Settings settings = entry.getValue().additionalSettings();
  1716. for (String setting : settings.keySet()) {
  1717. String oldPlugin = foundSettings.put(setting, entry.getKey());
  1718. if (oldPlugin != null) {
  1719. throw new IllegalArgumentException(
  1720. "Cannot have additional setting ["
  1721. + setting
  1722. + "] "
  1723. + "in plugin ["
  1724. + entry.getKey()
  1725. + "], already added in plugin ["
  1726. + oldPlugin
  1727. + "]"
  1728. );
  1729. }
  1730. }
  1731. builder.put(settings);
  1732. }
  1733. return builder.put(originalSettings).build();
  1734. }
  1735. /**
  1736. * Creates a new {@link CircuitBreakerService} based on the settings provided.
  1737. *
  1738. * @see #BREAKER_TYPE_KEY
  1739. */
  1740. private static CircuitBreakerService createCircuitBreakerService(
  1741. Settings settings,
  1742. List<BreakerSettings> breakerSettings,
  1743. ClusterSettings clusterSettings
  1744. ) {
  1745. String type = BREAKER_TYPE_KEY.get(settings);
  1746. if (type.equals("hierarchy")) {
  1747. return new HierarchyCircuitBreakerService(settings, breakerSettings, clusterSettings);
  1748. } else if (type.equals("none")) {
  1749. return new NoneCircuitBreakerService();
  1750. } else {
  1751. throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
  1752. }
  1753. }
  1754. /**
  1755. * Creates a new {@link BigArrays} instance used for this node.
  1756. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
  1757. */
  1758. BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
  1759. return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
  1760. }
  1761. /**
  1762. * Creates a new {@link BigArrays} instance used for this node.
  1763. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
  1764. */
  1765. PageCacheRecycler createPageCacheRecycler(Settings settings) {
  1766. return new PageCacheRecycler(settings);
  1767. }
  1768. /**
  1769. * Creates a new SearchService. This method can be overwritten by tests to inject mock implementations.
  1770. */
  1771. protected SearchService newSearchService(
  1772. ClusterService clusterService,
  1773. IndicesService indicesService,
  1774. ThreadPool threadPool,
  1775. ScriptService scriptService,
  1776. BigArrays bigArrays,
  1777. FetchPhase fetchPhase,
  1778. ResponseCollectorService responseCollectorService,
  1779. CircuitBreakerService circuitBreakerService,
  1780. ExecutorSelector executorSelector,
  1781. Tracer tracer
  1782. ) {
  1783. return new SearchService(
  1784. clusterService,
  1785. indicesService,
  1786. threadPool,
  1787. scriptService,
  1788. bigArrays,
  1789. fetchPhase,
  1790. responseCollectorService,
  1791. circuitBreakerService,
  1792. executorSelector,
  1793. tracer
  1794. );
  1795. }
  1796. /**
  1797. * Creates a new ScriptService. This method can be overwritten by tests to inject mock implementations.
  1798. */
  1799. protected ScriptService newScriptService(
  1800. Settings settings,
  1801. Map<String, ScriptEngine> engines,
  1802. Map<String, ScriptContext<?>> contexts,
  1803. LongSupplier timeProvider
  1804. ) {
  1805. return new ScriptService(settings, engines, contexts, timeProvider);
  1806. }
  1807. /**
  1808. * Creates a new ReadinessService. This method can be overwritten by tests to inject mock implementations.
  1809. */
  1810. protected ReadinessService newReadinessService(ClusterService clusterService, Environment environment) {
  1811. return new ReadinessService(clusterService, environment);
  1812. }
  1813. /**
  1814. * Get Custom Name Resolvers list based on a Discovery Plugins list
  1815. *
  1816. * @param discoveryPlugins Discovery plugins list
  1817. */
  1818. private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<DiscoveryPlugin> discoveryPlugins) {
  1819. List<NetworkService.CustomNameResolver> customNameResolvers = new ArrayList<>();
  1820. for (DiscoveryPlugin discoveryPlugin : discoveryPlugins) {
  1821. NetworkService.CustomNameResolver customNameResolver = discoveryPlugin.getCustomNameResolver(settings());
  1822. if (customNameResolver != null) {
  1823. customNameResolvers.add(customNameResolver);
  1824. }
  1825. }
  1826. return customNameResolvers;
  1827. }
  1828. /**
  1829. * Constructs a ClusterInfoService which may be mocked for tests.
  1830. */
  1831. protected ClusterInfoService newClusterInfoService(
  1832. Settings settings,
  1833. ClusterService clusterService,
  1834. ThreadPool threadPool,
  1835. NodeClient client
  1836. ) {
  1837. final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
  1838. if (DiscoveryNode.isMasterNode(settings)) {
  1839. // listen for state changes (this node starts/stops being the elected master, or new nodes are added)
  1840. clusterService.addListener(service);
  1841. }
  1842. return service;
  1843. }
  1844. /**
  1845. * Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests.
  1846. */
  1847. protected HttpServerTransport newHttpTransport(NetworkModule networkModule) {
  1848. return networkModule.getHttpServerTransportSupplier().get();
  1849. }
  1850. private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
  1851. private final SetOnce<DiscoveryNode> localNode = new SetOnce<>();
  1852. private final String persistentNodeId;
  1853. private final Settings settings;
  1854. private LocalNodeFactory(Settings settings, String persistentNodeId) {
  1855. this.persistentNodeId = persistentNodeId;
  1856. this.settings = settings;
  1857. }
  1858. @Override
  1859. public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
  1860. localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
  1861. return localNode.get();
  1862. }
  1863. DiscoveryNode getNode() {
  1864. assert localNode.get() != null;
  1865. return localNode.get();
  1866. }
  1867. }
  1868. }