Node.java 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.node;
  9. import org.apache.logging.log4j.LogManager;
  10. import org.apache.logging.log4j.Logger;
  11. import org.apache.lucene.util.Constants;
  12. import org.apache.lucene.util.SetOnce;
  13. import org.elasticsearch.Assertions;
  14. import org.elasticsearch.Build;
  15. import org.elasticsearch.ElasticsearchException;
  16. import org.elasticsearch.ElasticsearchTimeoutException;
  17. import org.elasticsearch.Version;
  18. import org.elasticsearch.action.ActionModule;
  19. import org.elasticsearch.action.ActionType;
  20. import org.elasticsearch.action.search.SearchExecutionStatsCollector;
  21. import org.elasticsearch.action.search.SearchPhaseController;
  22. import org.elasticsearch.action.search.SearchTransportService;
  23. import org.elasticsearch.action.support.TransportAction;
  24. import org.elasticsearch.action.update.UpdateHelper;
  25. import org.elasticsearch.bootstrap.BootstrapCheck;
  26. import org.elasticsearch.bootstrap.BootstrapContext;
  27. import org.elasticsearch.client.Client;
  28. import org.elasticsearch.client.node.NodeClient;
  29. import org.elasticsearch.cluster.ClusterInfoService;
  30. import org.elasticsearch.cluster.ClusterModule;
  31. import org.elasticsearch.cluster.ClusterName;
  32. import org.elasticsearch.cluster.ClusterState;
  33. import org.elasticsearch.cluster.ClusterStateObserver;
  34. import org.elasticsearch.cluster.InternalClusterInfoService;
  35. import org.elasticsearch.cluster.NodeConnectionsService;
  36. import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
  37. import org.elasticsearch.cluster.metadata.AliasValidator;
  38. import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
  39. import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
  40. import org.elasticsearch.cluster.metadata.Metadata;
  41. import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService;
  42. import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
  43. import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
  44. import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
  45. import org.elasticsearch.cluster.node.DiscoveryNode;
  46. import org.elasticsearch.cluster.node.DiscoveryNodeRole;
  47. import org.elasticsearch.cluster.routing.BatchedRerouteService;
  48. import org.elasticsearch.cluster.routing.RerouteService;
  49. import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
  50. import org.elasticsearch.cluster.service.ClusterService;
  51. import org.elasticsearch.common.StopWatch;
  52. import org.elasticsearch.common.breaker.CircuitBreaker;
  53. import org.elasticsearch.common.component.Lifecycle;
  54. import org.elasticsearch.common.component.LifecycleComponent;
  55. import org.elasticsearch.common.inject.Injector;
  56. import org.elasticsearch.common.inject.Key;
  57. import org.elasticsearch.common.inject.ModulesBuilder;
  58. import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
  59. import org.elasticsearch.common.lease.Releasables;
  60. import org.elasticsearch.common.logging.DeprecationCategory;
  61. import org.elasticsearch.common.logging.DeprecationLogger;
  62. import org.elasticsearch.common.logging.HeaderWarning;
  63. import org.elasticsearch.common.logging.NodeAndClusterIdStateListener;
  64. import org.elasticsearch.common.network.NetworkAddress;
  65. import org.elasticsearch.common.network.NetworkModule;
  66. import org.elasticsearch.common.network.NetworkService;
  67. import org.elasticsearch.common.settings.ClusterSettings;
  68. import org.elasticsearch.common.settings.ConsistentSettingsService;
  69. import org.elasticsearch.common.settings.Setting;
  70. import org.elasticsearch.common.settings.Setting.Property;
  71. import org.elasticsearch.common.settings.SettingUpgrader;
  72. import org.elasticsearch.common.settings.Settings;
  73. import org.elasticsearch.common.settings.SettingsModule;
  74. import org.elasticsearch.common.transport.BoundTransportAddress;
  75. import org.elasticsearch.common.transport.TransportAddress;
  76. import org.elasticsearch.common.unit.TimeValue;
  77. import org.elasticsearch.common.util.BigArrays;
  78. import org.elasticsearch.common.util.PageCacheRecycler;
  79. import org.elasticsearch.common.xcontent.NamedXContentRegistry;
  80. import org.elasticsearch.core.internal.io.IOUtils;
  81. import org.elasticsearch.discovery.Discovery;
  82. import org.elasticsearch.discovery.DiscoveryModule;
  83. import org.elasticsearch.env.Environment;
  84. import org.elasticsearch.env.NodeEnvironment;
  85. import org.elasticsearch.env.NodeMetadata;
  86. import org.elasticsearch.gateway.GatewayAllocator;
  87. import org.elasticsearch.gateway.GatewayMetaState;
  88. import org.elasticsearch.gateway.GatewayModule;
  89. import org.elasticsearch.gateway.GatewayService;
  90. import org.elasticsearch.gateway.MetaStateService;
  91. import org.elasticsearch.gateway.PersistedClusterStateService;
  92. import org.elasticsearch.http.HttpServerTransport;
  93. import org.elasticsearch.index.IndexSettings;
  94. import org.elasticsearch.index.IndexingPressure;
  95. import org.elasticsearch.index.analysis.AnalysisRegistry;
  96. import org.elasticsearch.index.engine.EngineFactory;
  97. import org.elasticsearch.indices.ExecutorSelector;
  98. import org.elasticsearch.indices.IndicesModule;
  99. import org.elasticsearch.indices.IndicesService;
  100. import org.elasticsearch.indices.ShardLimitValidator;
  101. import org.elasticsearch.indices.SystemIndexManager;
  102. import org.elasticsearch.indices.SystemIndices;
  103. import org.elasticsearch.indices.analysis.AnalysisModule;
  104. import org.elasticsearch.indices.breaker.BreakerSettings;
  105. import org.elasticsearch.indices.breaker.CircuitBreakerService;
  106. import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
  107. import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
  108. import org.elasticsearch.indices.cluster.IndicesClusterStateService;
  109. import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
  110. import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
  111. import org.elasticsearch.indices.recovery.RecoverySettings;
  112. import org.elasticsearch.indices.store.IndicesStore;
  113. import org.elasticsearch.ingest.IngestService;
  114. import org.elasticsearch.monitor.MonitorService;
  115. import org.elasticsearch.monitor.fs.FsHealthService;
  116. import org.elasticsearch.monitor.jvm.JvmInfo;
  117. import org.elasticsearch.persistent.PersistentTasksClusterService;
  118. import org.elasticsearch.persistent.PersistentTasksExecutor;
  119. import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
  120. import org.elasticsearch.persistent.PersistentTasksService;
  121. import org.elasticsearch.plugins.ActionPlugin;
  122. import org.elasticsearch.plugins.AnalysisPlugin;
  123. import org.elasticsearch.plugins.CircuitBreakerPlugin;
  124. import org.elasticsearch.plugins.ClusterPlugin;
  125. import org.elasticsearch.plugins.DiscoveryPlugin;
  126. import org.elasticsearch.plugins.EnginePlugin;
  127. import org.elasticsearch.plugins.IndexStorePlugin;
  128. import org.elasticsearch.plugins.IngestPlugin;
  129. import org.elasticsearch.plugins.MapperPlugin;
  130. import org.elasticsearch.plugins.MetadataUpgrader;
  131. import org.elasticsearch.plugins.NetworkPlugin;
  132. import org.elasticsearch.plugins.PersistentTaskPlugin;
  133. import org.elasticsearch.plugins.Plugin;
  134. import org.elasticsearch.plugins.PluginsService;
  135. import org.elasticsearch.plugins.RepositoryPlugin;
  136. import org.elasticsearch.plugins.ScriptPlugin;
  137. import org.elasticsearch.plugins.SearchPlugin;
  138. import org.elasticsearch.plugins.SystemIndexPlugin;
  139. import org.elasticsearch.repositories.RepositoriesModule;
  140. import org.elasticsearch.repositories.RepositoriesService;
  141. import org.elasticsearch.rest.RestController;
  142. import org.elasticsearch.script.ScriptContext;
  143. import org.elasticsearch.script.ScriptEngine;
  144. import org.elasticsearch.script.ScriptModule;
  145. import org.elasticsearch.script.ScriptService;
  146. import org.elasticsearch.search.SearchModule;
  147. import org.elasticsearch.search.SearchService;
  148. import org.elasticsearch.search.aggregations.support.AggregationUsageService;
  149. import org.elasticsearch.search.fetch.FetchPhase;
  150. import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
  151. import org.elasticsearch.snapshots.RestoreService;
  152. import org.elasticsearch.snapshots.SnapshotShardsService;
  153. import org.elasticsearch.snapshots.SnapshotsInfoService;
  154. import org.elasticsearch.snapshots.SnapshotsService;
  155. import org.elasticsearch.tasks.Task;
  156. import org.elasticsearch.tasks.TaskCancellationService;
  157. import org.elasticsearch.tasks.TaskResultsService;
  158. import org.elasticsearch.threadpool.ExecutorBuilder;
  159. import org.elasticsearch.threadpool.ThreadPool;
  160. import org.elasticsearch.transport.Transport;
  161. import org.elasticsearch.transport.TransportInterceptor;
  162. import org.elasticsearch.transport.TransportService;
  163. import org.elasticsearch.usage.UsageService;
  164. import org.elasticsearch.watcher.ResourceWatcherService;
  165. import javax.net.ssl.SNIHostName;
  166. import java.io.BufferedWriter;
  167. import java.io.Closeable;
  168. import java.io.IOException;
  169. import java.net.InetAddress;
  170. import java.net.InetSocketAddress;
  171. import java.nio.charset.Charset;
  172. import java.nio.file.Files;
  173. import java.nio.file.Path;
  174. import java.nio.file.StandardCopyOption;
  175. import java.util.ArrayList;
  176. import java.util.Arrays;
  177. import java.util.Collection;
  178. import java.util.Collections;
  179. import java.util.LinkedHashSet;
  180. import java.util.List;
  181. import java.util.Map;
  182. import java.util.Optional;
  183. import java.util.Set;
  184. import java.util.concurrent.CountDownLatch;
  185. import java.util.concurrent.TimeUnit;
  186. import java.util.function.Function;
  187. import java.util.function.UnaryOperator;
  188. import java.util.stream.Collectors;
  189. import java.util.stream.Stream;
  190. import static java.util.stream.Collectors.toList;
  191. /**
  192. * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
  193. * in order to use a {@link Client} to perform actions/operations against the cluster.
  194. */
  195. public class Node implements Closeable {
  196. public static final Setting<Boolean> WRITE_PORTS_FILE_SETTING =
  197. Setting.boolSetting("node.portsfile", false, Property.NodeScope);
  198. public static final Setting<String> NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope);
  199. public static final Setting.AffixSetting<String> NODE_ATTRIBUTES = Setting.prefixKeySetting("node.attr.", (key) ->
  200. new Setting<>(key, "", (value) -> {
  201. if (value.length() > 0
  202. && (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(value.length() - 1)))) {
  203. throw new IllegalArgumentException(key + " cannot have leading or trailing whitespace " +
  204. "[" + value + "]");
  205. }
  206. if (value.length() > 0 && "node.attr.server_name".equals(key)) {
  207. try {
  208. new SNIHostName(value);
  209. } catch (IllegalArgumentException e) {
  210. throw new IllegalArgumentException("invalid node.attr.server_name [" + value + "]", e);
  211. }
  212. }
  213. return value;
  214. }, Property.NodeScope));
  215. public static final Setting<String> BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> {
  216. switch (s) {
  217. case "hierarchy":
  218. case "none":
  219. return s;
  220. default:
  221. throw new IllegalArgumentException("indices.breaker.type must be one of [hierarchy, none] but was: " + s);
  222. }
  223. }, Setting.Property.NodeScope);
  224. public static final Setting<TimeValue> INITIAL_STATE_TIMEOUT_SETTING =
  225. Setting.positiveTimeSetting("discovery.initial_state_timeout", TimeValue.timeValueSeconds(30), Property.NodeScope);
  226. private static final String CLIENT_TYPE = "node";
  227. private final Lifecycle lifecycle = new Lifecycle();
  228. /**
  229. * This logger instance is an instance field as opposed to a static field. This ensures that the field is not
  230. * initialized until an instance of Node is constructed, which is sure to happen after the logging infrastructure
  231. * has been initialized to include the hostname. If this field were static, then it would be initialized when the
  232. * class initializer runs. Alas, this happens too early, before logging is initialized as this class is referred to
  233. * in InternalSettingsPreparer#finalizeSettings, which runs when creating the Environment, before logging is
  234. * initialized.
  235. */
  236. private final Logger logger = LogManager.getLogger(Node.class);
  237. private final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(Node.class);
  238. private final Injector injector;
  239. private final Environment environment;
  240. private final NodeEnvironment nodeEnvironment;
  241. private final PluginsService pluginsService;
  242. private final NodeClient client;
  243. private final Collection<LifecycleComponent> pluginLifecycleComponents;
  244. private final LocalNodeFactory localNodeFactory;
  245. private final NodeService nodeService;
  246. final NamedWriteableRegistry namedWriteableRegistry;
  247. public Node(Environment environment) {
  248. this(environment, Collections.emptyList(), true);
  249. }
  250. /**
  251. * Constructs a node
  252. *
  253. * @param initialEnvironment the initial environment for this node, which will be added to by plugins
  254. * @param classpathPlugins the plugins to be loaded from the classpath
  255. * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
  256. * test framework for tests that rely on being able to set private settings
  257. */
  258. protected Node(final Environment initialEnvironment,
  259. Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
  260. final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
  261. boolean success = false;
  262. try {
  263. Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
  264. .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
  265. final JvmInfo jvmInfo = JvmInfo.jvmInfo();
  266. logger.info(
  267. "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
  268. Build.CURRENT.getQualifiedVersion(),
  269. jvmInfo.pid(),
  270. Build.CURRENT.flavor().displayName(),
  271. Build.CURRENT.type().displayName(),
  272. Build.CURRENT.hash(),
  273. Build.CURRENT.date(),
  274. Constants.OS_NAME,
  275. Constants.OS_VERSION,
  276. Constants.OS_ARCH,
  277. Constants.JVM_VENDOR,
  278. Constants.JVM_NAME,
  279. Constants.JAVA_VERSION,
  280. Constants.JVM_VERSION);
  281. if (jvmInfo.getBundledJdk()) {
  282. logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
  283. } else {
  284. logger.info("JVM home [{}]", System.getProperty("java.home"));
  285. deprecationLogger.deprecate(
  286. DeprecationCategory.OTHER,
  287. "no-jdk",
  288. "no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");
  289. }
  290. logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
  291. if (Build.CURRENT.isProductionRelease() == false) {
  292. logger.warn(
  293. "version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
  294. Build.CURRENT.getQualifiedVersion());
  295. }
  296. if (Environment.PATH_SHARED_DATA_SETTING.exists(tmpSettings)) {
  297. // NOTE: this must be done with an explicit check here because the deprecation property on a path setting will
  298. // cause ES to fail to start since logging is not yet initialized on first read of the setting
  299. deprecationLogger.deprecate(
  300. DeprecationCategory.SETTINGS,
  301. "shared-data-path",
  302. "setting [path.shared_data] is deprecated and will be removed in a future release"
  303. );
  304. }
  305. if (logger.isDebugEnabled()) {
  306. logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
  307. initialEnvironment.configFile(), initialEnvironment.dataFile(),
  308. initialEnvironment.logsFile(), initialEnvironment.pluginsFile());
  309. }
  310. this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
  311. initialEnvironment.pluginsFile(), classpathPlugins);
  312. final Settings settings = pluginsService.updatedSettings();
  313. /*
  314. * Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
  315. * values, no matter they ask for them from.
  316. */
  317. this.environment = new Environment(settings, initialEnvironment.configFile());
  318. Environment.assertEquivalent(initialEnvironment, this.environment);
  319. nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
  320. logger.info(
  321. "node name [{}], node ID [{}], cluster name [{}], roles {}",
  322. NODE_NAME_SETTING.get(tmpSettings),
  323. nodeEnvironment.nodeId(),
  324. ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
  325. DiscoveryNode.getRolesFromSettings(settings).stream()
  326. .map(DiscoveryNodeRole::roleName)
  327. .collect(Collectors.toCollection(LinkedHashSet::new))
  328. );
  329. resourcesToClose.add(nodeEnvironment);
  330. localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
  331. final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
  332. final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
  333. resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
  334. final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
  335. resourcesToClose.add(resourceWatcherService);
  336. // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
  337. HeaderWarning.setThreadContext(threadPool.getThreadContext());
  338. resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));
  339. final List<Setting<?>> additionalSettings = new ArrayList<>();
  340. // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
  341. additionalSettings.addAll(pluginsService.getPluginSettings());
  342. final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
  343. for (final ExecutorBuilder<?> builder : threadPool.builders()) {
  344. additionalSettings.addAll(builder.getRegisteredSettings());
  345. }
  346. client = new NodeClient(settings, threadPool);
  347. final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
  348. final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
  349. AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
  350. // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
  351. // so we might be late here already
  352. final Set<SettingUpgrader<?>> settingsUpgraders = pluginsService.filterPlugins(Plugin.class)
  353. .stream()
  354. .map(Plugin::getSettingUpgraders)
  355. .flatMap(List::stream)
  356. .collect(Collectors.toSet());
  357. final SettingsModule settingsModule =
  358. new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
  359. scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
  360. final NetworkService networkService = new NetworkService(
  361. getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
  362. List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
  363. final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
  364. clusterService.addStateApplier(scriptService);
  365. resourcesToClose.add(clusterService);
  366. final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
  367. if (consistentSettings.isEmpty() == false) {
  368. clusterService.addLocalNodeMasterListener(
  369. new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher());
  370. }
  371. final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
  372. scriptService, analysisModule.getAnalysisRegistry(),
  373. pluginsService.filterPlugins(IngestPlugin.class), client);
  374. final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
  375. final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
  376. final UsageService usageService = new UsageService();
  377. SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class));
  378. List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
  379. NetworkModule.getNamedWriteables().stream(),
  380. IndicesModule.getNamedWriteables().stream(),
  381. searchModule.getNamedWriteables().stream(),
  382. pluginsService.filterPlugins(Plugin.class).stream()
  383. .flatMap(p -> p.getNamedWriteables().stream()),
  384. ClusterModule.getNamedWriteables().stream())
  385. .flatMap(Function.identity()).collect(Collectors.toList());
  386. final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
  387. NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
  388. NetworkModule.getNamedXContents().stream(),
  389. IndicesModule.getNamedXContents().stream(),
  390. searchModule.getNamedXContents().stream(),
  391. pluginsService.filterPlugins(Plugin.class).stream()
  392. .flatMap(p -> p.getNamedXContent().stream()),
  393. ClusterModule.getNamedXWriteables().stream())
  394. .flatMap(Function.identity()).collect(toList()),
  395. getCompatibleNamedXContents()
  396. );
  397. final Map<String, SystemIndices.Feature> featuresMap = pluginsService
  398. .filterPlugins(SystemIndexPlugin.class)
  399. .stream()
  400. .peek(plugin -> SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName()))
  401. .collect(Collectors.toUnmodifiableMap(
  402. SystemIndexPlugin::getFeatureName,
  403. plugin -> SystemIndices.pluginToFeature(plugin, settings)
  404. ));
  405. final SystemIndices systemIndices = new SystemIndices(featuresMap);
  406. final ExecutorSelector executorSelector = systemIndices.getExecutorSelector();
  407. ModulesBuilder modules = new ModulesBuilder();
  408. final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
  409. final FsHealthService fsHealthService = new FsHealthService(settings, clusterService.getClusterSettings(), threadPool,
  410. nodeEnvironment);
  411. final SetOnce<RerouteService> rerouteServiceReference = new SetOnce<>();
  412. final InternalSnapshotsInfoService snapshotsInfoService = new InternalSnapshotsInfoService(settings, clusterService,
  413. repositoriesServiceReference::get, rerouteServiceReference::get);
  414. final ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService,
  415. snapshotsInfoService, threadPool.getThreadContext(), systemIndices);
  416. modules.add(clusterModule);
  417. IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
  418. modules.add(indicesModule);
  419. List<BreakerSettings> pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class)
  420. .stream()
  421. .map(plugin -> plugin.getCircuitBreaker(settings))
  422. .collect(Collectors.toList());
  423. final CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
  424. pluginCircuitBreakers,
  425. settingsModule.getClusterSettings());
  426. pluginsService.filterPlugins(CircuitBreakerPlugin.class)
  427. .forEach(plugin -> {
  428. CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
  429. plugin.setCircuitBreaker(breaker);
  430. });
  431. resourcesToClose.add(circuitBreakerService);
  432. modules.add(new GatewayModule());
  433. PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
  434. BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
  435. modules.add(settingsModule);
  436. final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);
  437. final PersistedClusterStateService lucenePersistedStateFactory
  438. = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(),
  439. threadPool::relativeTimeInMillis);
  440. // collect engine factory providers from plugins
  441. final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
  442. final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
  443. enginePlugins.stream().map(plugin -> (Function<IndexSettings, Optional<EngineFactory>>) plugin::getEngineFactory)
  444. .collect(Collectors.toList());
  445. final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories =
  446. pluginsService.filterPlugins(IndexStorePlugin.class)
  447. .stream()
  448. .map(IndexStorePlugin::getDirectoryFactories)
  449. .flatMap(m -> m.entrySet().stream())
  450. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  451. final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =
  452. pluginsService.filterPlugins(IndexStorePlugin.class)
  453. .stream()
  454. .map(IndexStorePlugin::getRecoveryStateFactories)
  455. .flatMap(m -> m.entrySet().stream())
  456. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  457. final List<IndexStorePlugin.IndexFoldersDeletionListener> indexFoldersDeletionListeners =
  458. pluginsService.filterPlugins(IndexStorePlugin.class)
  459. .stream()
  460. .map(IndexStorePlugin::getIndexFoldersDeletionListeners)
  461. .flatMap(List::stream)
  462. .collect(Collectors.toList());
  463. final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers =
  464. pluginsService.filterPlugins(IndexStorePlugin.class)
  465. .stream()
  466. .map(IndexStorePlugin::getSnapshotCommitSuppliers)
  467. .flatMap(m -> m.entrySet().stream())
  468. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  469. if (DiscoveryNode.isMasterNode(settings)) {
  470. clusterService.addListener(new SystemIndexManager(systemIndices, client));
  471. }
  472. final RerouteService rerouteService
  473. = new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
  474. rerouteServiceReference.set(rerouteService);
  475. clusterService.setRerouteService(rerouteService);
  476. final IndicesService indicesService =
  477. new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
  478. clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
  479. threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
  480. clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
  481. searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners,
  482. snapshotCommitSuppliers);
  483. final AliasValidator aliasValidator = new AliasValidator();
  484. final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
  485. final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
  486. settings,
  487. clusterService,
  488. indicesService,
  489. clusterModule.getAllocationService(),
  490. aliasValidator,
  491. shardLimitValidator,
  492. environment,
  493. settingsModule.getIndexScopedSettings(),
  494. threadPool,
  495. xContentRegistry,
  496. systemIndices,
  497. forbidPrivateIndexSettings
  498. );
  499. pluginsService.filterPlugins(Plugin.class)
  500. .forEach(p -> p.getAdditionalIndexSettingProviders()
  501. .forEach(metadataCreateIndexService::addAdditionalIndexSettingProvider));
  502. final MetadataCreateDataStreamService metadataCreateDataStreamService =
  503. new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService);
  504. Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
  505. .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,
  506. scriptService, xContentRegistry, environment, nodeEnvironment,
  507. namedWriteableRegistry, clusterModule.getIndexNameExpressionResolver(),
  508. repositoriesServiceReference::get).stream())
  509. .collect(Collectors.toList());
  510. ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),
  511. settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
  512. threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);
  513. modules.add(actionModule);
  514. final RestController restController = actionModule.getRestController();
  515. final NetworkModule networkModule = new NetworkModule(settings, pluginsService.filterPlugins(NetworkPlugin.class),
  516. threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
  517. networkService, restController, clusterService.getClusterSettings());
  518. Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders =
  519. pluginsService.filterPlugins(Plugin.class).stream()
  520. .map(Plugin::getIndexTemplateMetadataUpgrader)
  521. .collect(Collectors.toList());
  522. final MetadataUpgrader metadataUpgrader = new MetadataUpgrader(indexTemplateMetadataUpgraders);
  523. final IndexMetadataVerifier indexMetadataVerifier = new IndexMetadataVerifier(settings, xContentRegistry,
  524. indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), scriptService);
  525. if (DiscoveryNode.isMasterNode(settings)) {
  526. clusterService.addListener(new SystemIndexMetadataUpgradeService(systemIndices, clusterService));
  527. }
  528. new TemplateUpgradeService(client, clusterService, threadPool, indexTemplateMetadataUpgraders);
  529. final Transport transport = networkModule.getTransportSupplier().get();
  530. Set<String> taskHeaders = Stream.concat(
  531. pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
  532. Stream.of(Task.X_OPAQUE_ID)
  533. ).collect(Collectors.toSet());
  534. final TransportService transportService = newTransportService(settings, transport, threadPool,
  535. networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
  536. final GatewayMetaState gatewayMetaState = new GatewayMetaState();
  537. final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
  538. final SearchTransportService searchTransportService = new SearchTransportService(transportService, client,
  539. SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
  540. final HttpServerTransport httpServerTransport = newHttpTransport(networkModule);
  541. final IndexingPressure indexingLimits = new IndexingPressure(settings);
  542. final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
  543. RepositoriesModule repositoriesModule = new RepositoriesModule(this.environment,
  544. pluginsService.filterPlugins(RepositoryPlugin.class), transportService, clusterService, bigArrays, xContentRegistry,
  545. recoverySettings);
  546. RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
  547. repositoriesServiceReference.set(repositoryService);
  548. SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,
  549. clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters(),
  550. systemIndices.getFeatures());
  551. SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
  552. transportService, indicesService);
  553. RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
  554. metadataCreateIndexService, clusterModule.getMetadataDeleteIndexService(), indexMetadataVerifier,
  555. shardLimitValidator, systemIndices);
  556. final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
  557. clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis, rerouteService);
  558. clusterInfoService.addListener(diskThresholdMonitor::onNewInfo);
  559. final DiscoveryModule discoveryModule = new DiscoveryModule(settings, transportService, namedWriteableRegistry,
  560. networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
  561. clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
  562. clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, rerouteService,
  563. fsHealthService);
  564. this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
  565. transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
  566. httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
  567. searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());
  568. final SearchService searchService = newSearchService(clusterService, indicesService,
  569. threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
  570. responseCollectorService, circuitBreakerService, executorSelector);
  571. final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
  572. .filterPlugins(PersistentTaskPlugin.class).stream()
  573. .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client, settingsModule,
  574. clusterModule.getIndexNameExpressionResolver()))
  575. .flatMap(List::stream)
  576. .collect(toList());
  577. final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
  578. final PersistentTasksClusterService persistentTasksClusterService =
  579. new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
  580. resourcesToClose.add(persistentTasksClusterService);
  581. final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
  582. modules.add(b -> {
  583. b.bind(Node.class).toInstance(this);
  584. b.bind(NodeService.class).toInstance(nodeService);
  585. b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
  586. b.bind(PluginsService.class).toInstance(pluginsService);
  587. b.bind(Client.class).toInstance(client);
  588. b.bind(NodeClient.class).toInstance(client);
  589. b.bind(Environment.class).toInstance(this.environment);
  590. b.bind(ThreadPool.class).toInstance(threadPool);
  591. b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
  592. b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
  593. b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
  594. b.bind(BigArrays.class).toInstance(bigArrays);
  595. b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
  596. b.bind(ScriptService.class).toInstance(scriptService);
  597. b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
  598. b.bind(IngestService.class).toInstance(ingestService);
  599. b.bind(IndexingPressure.class).toInstance(indexingLimits);
  600. b.bind(UsageService.class).toInstance(usageService);
  601. b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
  602. b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
  603. b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader);
  604. b.bind(MetaStateService.class).toInstance(metaStateService);
  605. b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory);
  606. b.bind(IndicesService.class).toInstance(indicesService);
  607. b.bind(AliasValidator.class).toInstance(aliasValidator);
  608. b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService);
  609. b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService);
  610. b.bind(SearchService.class).toInstance(searchService);
  611. b.bind(SearchTransportService.class).toInstance(searchTransportService);
  612. b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(
  613. namedWriteableRegistry, searchService::aggReduceContextBuilder));
  614. b.bind(Transport.class).toInstance(transport);
  615. b.bind(TransportService.class).toInstance(transportService);
  616. b.bind(NetworkService.class).toInstance(networkService);
  617. b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
  618. b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier);
  619. b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
  620. b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService);
  621. b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
  622. b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
  623. {
  624. processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
  625. b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(transportService,
  626. indicesService, recoverySettings));
  627. b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(threadPool,
  628. transportService, recoverySettings, clusterService));
  629. }
  630. b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
  631. pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
  632. b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
  633. b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
  634. b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
  635. b.bind(RepositoriesService.class).toInstance(repositoryService);
  636. b.bind(SnapshotsService.class).toInstance(snapshotsService);
  637. b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
  638. b.bind(RestoreService.class).toInstance(restoreService);
  639. b.bind(RerouteService.class).toInstance(rerouteService);
  640. b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
  641. b.bind(FsHealthService.class).toInstance(fsHealthService);
  642. b.bind(SystemIndices.class).toInstance(systemIndices);
  643. b.bind(ExecutorSelector.class).toInstance(executorSelector);
  644. }
  645. );
  646. injector = modules.createInjector();
  647. // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there.
  648. // The search for viable copies is triggered by an allocation attempt (i.e. a reroute) and is performed asynchronously. When it
  649. // completes we trigger another reroute to try the allocation again. This means there is a circular dependency: the allocation
  650. // service needs access to the existing shards allocators (e.g. the GatewayAllocator) which need to be able to trigger a
  651. // reroute, which needs to call into the allocation service. We close the loop here:
  652. clusterModule.setExistingShardsAllocators(injector.getInstance(GatewayAllocator.class));
  653. List<LifecycleComponent> pluginLifecycleComponents = pluginComponents.stream()
  654. .filter(p -> p instanceof LifecycleComponent)
  655. .map(p -> (LifecycleComponent) p).collect(Collectors.toList());
  656. resourcesToClose.addAll(pluginLifecycleComponents);
  657. resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
  658. this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
  659. client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
  660. }),
  661. transportService.getTaskManager(),
  662. () -> clusterService.localNode().getId(),
  663. transportService.getLocalNodeConnection(),
  664. transportService.getRemoteClusterService(),
  665. namedWriteableRegistry);
  666. this.namedWriteableRegistry = namedWriteableRegistry;
  667. logger.debug("initializing HTTP handlers ...");
  668. actionModule.initRestHandlers(() -> clusterService.state().nodes());
  669. logger.info("initialized");
  670. success = true;
  671. } catch (IOException ex) {
  672. throw new ElasticsearchException("failed to bind service", ex);
  673. } finally {
  674. if (success == false) {
  675. IOUtils.closeWhileHandlingException(resourcesToClose);
  676. }
  677. }
  678. }
  679. // package scope for testing
  680. List<NamedXContentRegistry.Entry> getCompatibleNamedXContents() {
  681. return pluginsService.filterPlugins(Plugin.class).stream()
  682. .flatMap(p -> p.getNamedXContentForCompatibility().stream()).collect(toList());
  683. }
  684. protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
  685. TransportInterceptor interceptor,
  686. Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
  687. ClusterSettings clusterSettings, Set<String> taskHeaders) {
  688. return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
  689. }
  690. protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
  691. // Noop in production, overridden by tests
  692. }
  693. /**
  694. * The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
  695. */
  696. public Settings settings() {
  697. return this.environment.settings();
  698. }
  699. /**
  700. * A client that can be used to execute actions (operations) against the cluster.
  701. */
  702. public Client client() {
  703. return client;
  704. }
  705. /**
  706. * Returns the environment of the node
  707. */
  708. public Environment getEnvironment() {
  709. return environment;
  710. }
  711. /**
  712. * Returns the {@link NodeEnvironment} instance of this node
  713. */
  714. public NodeEnvironment getNodeEnvironment() {
  715. return nodeEnvironment;
  716. }
  717. /**
  718. * Start the node. If the node is already started, this method is no-op.
  719. */
  720. public Node start() throws NodeValidationException {
  721. if (lifecycle.moveToStarted() == false) {
  722. return this;
  723. }
  724. logger.info("starting ...");
  725. pluginLifecycleComponents.forEach(LifecycleComponent::start);
  726. injector.getInstance(MappingUpdatedAction.class).setClient(client);
  727. injector.getInstance(IndicesService.class).start();
  728. injector.getInstance(IndicesClusterStateService.class).start();
  729. injector.getInstance(SnapshotsService.class).start();
  730. injector.getInstance(SnapshotShardsService.class).start();
  731. injector.getInstance(RepositoriesService.class).start();
  732. injector.getInstance(SearchService.class).start();
  733. injector.getInstance(FsHealthService.class).start();
  734. nodeService.getMonitorService().start();
  735. final ClusterService clusterService = injector.getInstance(ClusterService.class);
  736. final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
  737. nodeConnectionsService.start();
  738. clusterService.setNodeConnectionsService(nodeConnectionsService);
  739. injector.getInstance(GatewayService.class).start();
  740. Discovery discovery = injector.getInstance(Discovery.class);
  741. clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
  742. // Start the transport service now so the publish address will be added to the local disco node in ClusterService
  743. TransportService transportService = injector.getInstance(TransportService.class);
  744. transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
  745. transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
  746. transportService.start();
  747. assert localNodeFactory.getNode() != null;
  748. assert transportService.getLocalNode().equals(localNodeFactory.getNode())
  749. : "transportService has a different local node than the factory provided";
  750. injector.getInstance(PeerRecoverySourceService.class).start();
  751. // Load (and maybe upgrade) the metadata stored on disk
  752. final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
  753. gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class),
  754. injector.getInstance(IndexMetadataVerifier.class), injector.getInstance(MetadataUpgrader.class),
  755. injector.getInstance(PersistedClusterStateService.class));
  756. if (Assertions.ENABLED) {
  757. try {
  758. assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
  759. final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
  760. nodeEnvironment.nodeDataPath());
  761. assert nodeMetadata != null;
  762. assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
  763. assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
  764. } catch (IOException e) {
  765. assert false : e;
  766. }
  767. }
  768. // we load the global state here (the persistent part of the cluster state stored on disk) to
  769. // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
  770. final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();
  771. assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
  772. validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
  773. pluginsService.filterPlugins(Plugin.class).stream()
  774. .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
  775. clusterService.addStateApplier(transportService.getTaskManager());
  776. // start after transport service so the local disco is known
  777. discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
  778. clusterService.start();
  779. assert clusterService.localNode().equals(localNodeFactory.getNode())
  780. : "clusterService has a different local node than the factory provided";
  781. transportService.acceptIncomingRequests();
  782. discovery.startInitialJoin();
  783. final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());
  784. configureNodeAndClusterIdStateListener(clusterService);
  785. if (initialStateTimeout.millis() > 0) {
  786. final ThreadPool thread = injector.getInstance(ThreadPool.class);
  787. ClusterState clusterState = clusterService.state();
  788. ClusterStateObserver observer =
  789. new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
  790. if (clusterState.nodes().getMasterNodeId() == null) {
  791. logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
  792. final CountDownLatch latch = new CountDownLatch(1);
  793. observer.waitForNextChange(new ClusterStateObserver.Listener() {
  794. @Override
  795. public void onNewClusterState(ClusterState state) {
  796. latch.countDown();
  797. }
  798. @Override
  799. public void onClusterServiceClose() {
  800. latch.countDown();
  801. }
  802. @Override
  803. public void onTimeout(TimeValue timeout) {
  804. logger.warn("timed out while waiting for initial discovery state - timeout: {}",
  805. initialStateTimeout);
  806. latch.countDown();
  807. }
  808. }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
  809. try {
  810. latch.await();
  811. } catch (InterruptedException e) {
  812. throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
  813. }
  814. }
  815. }
  816. injector.getInstance(HttpServerTransport.class).start();
  817. if (WRITE_PORTS_FILE_SETTING.get(settings())) {
  818. TransportService transport = injector.getInstance(TransportService.class);
  819. writePortsFile("transport", transport.boundAddress());
  820. HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
  821. writePortsFile("http", http.boundAddress());
  822. }
  823. logger.info("started");
  824. pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
  825. return this;
  826. }
  827. protected void configureNodeAndClusterIdStateListener(ClusterService clusterService) {
  828. NodeAndClusterIdStateListener.getAndSetNodeIdAndClusterId(clusterService,
  829. injector.getInstance(ThreadPool.class).getThreadContext());
  830. }
  831. private Node stop() {
  832. if (lifecycle.moveToStopped() == false) {
  833. return this;
  834. }
  835. logger.info("stopping ...");
  836. injector.getInstance(ResourceWatcherService.class).close();
  837. injector.getInstance(HttpServerTransport.class).stop();
  838. injector.getInstance(SnapshotsService.class).stop();
  839. injector.getInstance(SnapshotShardsService.class).stop();
  840. injector.getInstance(RepositoriesService.class).stop();
  841. // stop any changes happening as a result of cluster state changes
  842. injector.getInstance(IndicesClusterStateService.class).stop();
  843. // close discovery early to not react to pings anymore.
  844. // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
  845. injector.getInstance(Discovery.class).stop();
  846. // we close indices first, so operations won't be allowed on it
  847. injector.getInstance(ClusterService.class).stop();
  848. injector.getInstance(NodeConnectionsService.class).stop();
  849. injector.getInstance(FsHealthService.class).stop();
  850. nodeService.getMonitorService().stop();
  851. injector.getInstance(GatewayService.class).stop();
  852. injector.getInstance(SearchService.class).stop();
  853. injector.getInstance(TransportService.class).stop();
  854. pluginLifecycleComponents.forEach(LifecycleComponent::stop);
  855. // we should stop this last since it waits for resources to get released
  856. // if we had scroll searchers etc or recovery going on we wait for to finish.
  857. injector.getInstance(IndicesService.class).stop();
  858. logger.info("stopped");
  859. return this;
  860. }
  861. // During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle.
  862. // If not, the hook that is added in Bootstrap#setup() will be useless:
  863. // close() might not be executed, in case another (for example api) call to close() has already set some lifecycles to stopped.
  864. // In this case the process will be terminated even if the first call to close() has not finished yet.
  865. @Override
  866. public synchronized void close() throws IOException {
  867. synchronized (lifecycle) {
  868. if (lifecycle.started()) {
  869. stop();
  870. }
  871. if (lifecycle.moveToClosed() == false) {
  872. return;
  873. }
  874. }
  875. logger.info("closing ...");
  876. List<Closeable> toClose = new ArrayList<>();
  877. StopWatch stopWatch = new StopWatch("node_close");
  878. toClose.add(() -> stopWatch.start("node_service"));
  879. toClose.add(nodeService);
  880. toClose.add(() -> stopWatch.stop().start("http"));
  881. toClose.add(injector.getInstance(HttpServerTransport.class));
  882. toClose.add(() -> stopWatch.stop().start("snapshot_service"));
  883. toClose.add(injector.getInstance(SnapshotsService.class));
  884. toClose.add(injector.getInstance(SnapshotShardsService.class));
  885. toClose.add(injector.getInstance(RepositoriesService.class));
  886. toClose.add(() -> stopWatch.stop().start("client"));
  887. Releasables.close(injector.getInstance(Client.class));
  888. toClose.add(() -> stopWatch.stop().start("indices_cluster"));
  889. toClose.add(injector.getInstance(IndicesClusterStateService.class));
  890. toClose.add(() -> stopWatch.stop().start("indices"));
  891. toClose.add(injector.getInstance(IndicesService.class));
  892. // close filter/fielddata caches after indices
  893. toClose.add(injector.getInstance(IndicesStore.class));
  894. toClose.add(injector.getInstance(PeerRecoverySourceService.class));
  895. toClose.add(() -> stopWatch.stop().start("cluster"));
  896. toClose.add(injector.getInstance(ClusterService.class));
  897. toClose.add(() -> stopWatch.stop().start("node_connections_service"));
  898. toClose.add(injector.getInstance(NodeConnectionsService.class));
  899. toClose.add(() -> stopWatch.stop().start("discovery"));
  900. toClose.add(injector.getInstance(Discovery.class));
  901. toClose.add(() -> stopWatch.stop().start("monitor"));
  902. toClose.add(nodeService.getMonitorService());
  903. toClose.add(() -> stopWatch.stop().start("fsHealth"));
  904. toClose.add(injector.getInstance(FsHealthService.class));
  905. toClose.add(() -> stopWatch.stop().start("gateway"));
  906. toClose.add(injector.getInstance(GatewayService.class));
  907. toClose.add(() -> stopWatch.stop().start("search"));
  908. toClose.add(injector.getInstance(SearchService.class));
  909. toClose.add(() -> stopWatch.stop().start("transport"));
  910. toClose.add(injector.getInstance(TransportService.class));
  911. for (LifecycleComponent plugin : pluginLifecycleComponents) {
  912. toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
  913. toClose.add(plugin);
  914. }
  915. toClose.addAll(pluginsService.filterPlugins(Plugin.class));
  916. toClose.add(() -> stopWatch.stop().start("script"));
  917. toClose.add(injector.getInstance(ScriptService.class));
  918. toClose.add(() -> stopWatch.stop().start("thread_pool"));
  919. toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
  920. // Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
  921. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
  922. // awaitClose if the node doesn't finish closing within the specified time.
  923. toClose.add(() -> stopWatch.stop().start("gateway_meta_state"));
  924. toClose.add(injector.getInstance(GatewayMetaState.class));
  925. toClose.add(() -> stopWatch.stop().start("node_environment"));
  926. toClose.add(injector.getInstance(NodeEnvironment.class));
  927. toClose.add(stopWatch::stop);
  928. if (logger.isTraceEnabled()) {
  929. toClose.add(() -> logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint()));
  930. }
  931. IOUtils.close(toClose);
  932. logger.info("closed");
  933. }
  934. /**
  935. * Wait for this node to be effectively closed.
  936. */
  937. // synchronized to prevent running concurrently with close()
  938. public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
  939. if (lifecycle.closed() == false) {
  940. // We don't want to shutdown the threadpool or interrupt threads on a node that is not
  941. // closed yet.
  942. throw new IllegalStateException("Call close() first");
  943. }
  944. ThreadPool threadPool = injector.getInstance(ThreadPool.class);
  945. final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
  946. if (terminated) {
  947. // All threads terminated successfully. Because search, recovery and all other operations
  948. // that run on shards run in the threadpool, indices should be effectively closed by now.
  949. if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
  950. throw new IllegalStateException("Some shards are still open after the threadpool terminated. " +
  951. "Something is leaking index readers or store references.");
  952. }
  953. }
  954. return terminated;
  955. }
  956. /**
  957. * Returns {@code true} if the node is closed.
  958. */
  959. public boolean isClosed() {
  960. return lifecycle.closed();
  961. }
  962. public Injector injector() {
  963. return this.injector;
  964. }
  965. /**
  966. * Hook for validating the node after network
  967. * services are started but before the cluster service is started
  968. * and before the network service starts accepting incoming network
  969. * requests.
  970. *
  971. * @param context the bootstrap context for this node
  972. * @param boundTransportAddress the network addresses the node is
  973. * bound and publishing to
  974. */
  975. @SuppressWarnings("unused")
  976. protected void validateNodeBeforeAcceptingRequests(
  977. final BootstrapContext context,
  978. final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> bootstrapChecks) throws NodeValidationException {
  979. }
  980. /**
  981. * Writes a file to the logs dir containing the ports for the given transport type
  982. */
  983. private void writePortsFile(String type, BoundTransportAddress boundAddress) {
  984. Path tmpPortsFile = environment.logsFile().resolve(type + ".ports.tmp");
  985. try (BufferedWriter writer = Files.newBufferedWriter(tmpPortsFile, Charset.forName("UTF-8"))) {
  986. for (TransportAddress address : boundAddress.boundAddresses()) {
  987. InetAddress inetAddress = InetAddress.getByName(address.getAddress());
  988. writer.write(NetworkAddress.format(new InetSocketAddress(inetAddress, address.getPort())) + "\n");
  989. }
  990. } catch (IOException e) {
  991. throw new RuntimeException("Failed to write ports file", e);
  992. }
  993. Path portsFile = environment.logsFile().resolve(type + ".ports");
  994. try {
  995. Files.move(tmpPortsFile, portsFile, StandardCopyOption.ATOMIC_MOVE);
  996. } catch (IOException e) {
  997. throw new RuntimeException("Failed to rename ports file", e);
  998. }
  999. }
  1000. /**
  1001. * The {@link PluginsService} used to build this node's components.
  1002. */
  1003. protected PluginsService getPluginsService() {
  1004. return pluginsService;
  1005. }
  1006. /**
  1007. * Creates a new {@link CircuitBreakerService} based on the settings provided.
  1008. *
  1009. * @see #BREAKER_TYPE_KEY
  1010. */
  1011. private static CircuitBreakerService createCircuitBreakerService(Settings settings,
  1012. List<BreakerSettings> breakerSettings,
  1013. ClusterSettings clusterSettings) {
  1014. String type = BREAKER_TYPE_KEY.get(settings);
  1015. if (type.equals("hierarchy")) {
  1016. return new HierarchyCircuitBreakerService(settings, breakerSettings, clusterSettings);
  1017. } else if (type.equals("none")) {
  1018. return new NoneCircuitBreakerService();
  1019. } else {
  1020. throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
  1021. }
  1022. }
  1023. /**
  1024. * Creates a new {@link BigArrays} instance used for this node.
  1025. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
  1026. */
  1027. BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
  1028. return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
  1029. }
  1030. /**
  1031. * Creates a new {@link BigArrays} instance used for this node.
  1032. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
  1033. */
  1034. PageCacheRecycler createPageCacheRecycler(Settings settings) {
  1035. return new PageCacheRecycler(settings);
  1036. }
  1037. /**
  1038. * Creates a new the SearchService. This method can be overwritten by tests to inject mock implementations.
  1039. */
  1040. protected SearchService newSearchService(ClusterService clusterService, IndicesService indicesService,
  1041. ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays,
  1042. FetchPhase fetchPhase, ResponseCollectorService responseCollectorService,
  1043. CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector) {
  1044. return new SearchService(clusterService, indicesService, threadPool,
  1045. scriptService, bigArrays, fetchPhase, responseCollectorService, circuitBreakerService,
  1046. executorSelector);
  1047. }
  1048. /**
  1049. * Creates a new the ScriptService. This method can be overwritten by tests to inject mock implementations.
  1050. */
  1051. protected ScriptService newScriptService(Settings settings, Map<String, ScriptEngine> engines, Map<String, ScriptContext<?>> contexts) {
  1052. return new ScriptService(settings, engines, contexts);
  1053. }
  1054. /**
  1055. * Get Custom Name Resolvers list based on a Discovery Plugins list
  1056. *
  1057. * @param discoveryPlugins Discovery plugins list
  1058. */
  1059. private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<DiscoveryPlugin> discoveryPlugins) {
  1060. List<NetworkService.CustomNameResolver> customNameResolvers = new ArrayList<>();
  1061. for (DiscoveryPlugin discoveryPlugin : discoveryPlugins) {
  1062. NetworkService.CustomNameResolver customNameResolver = discoveryPlugin.getCustomNameResolver(settings());
  1063. if (customNameResolver != null) {
  1064. customNameResolvers.add(customNameResolver);
  1065. }
  1066. }
  1067. return customNameResolvers;
  1068. }
  1069. /**
  1070. * Constructs a ClusterInfoService which may be mocked for tests.
  1071. */
  1072. protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
  1073. ThreadPool threadPool, NodeClient client) {
  1074. final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
  1075. if (DiscoveryNode.isMasterNode(settings)) {
  1076. // listen for state changes (this node starts/stops being the elected master, or new nodes are added)
  1077. clusterService.addListener(service);
  1078. }
  1079. return service;
  1080. }
  1081. /**
  1082. * Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests.
  1083. */
  1084. protected HttpServerTransport newHttpTransport(NetworkModule networkModule) {
  1085. return networkModule.getHttpServerTransportSupplier().get();
  1086. }
  1087. private static class LocalNodeFactory implements Function<BoundTransportAddress, DiscoveryNode> {
  1088. private final SetOnce<DiscoveryNode> localNode = new SetOnce<>();
  1089. private final String persistentNodeId;
  1090. private final Settings settings;
  1091. private LocalNodeFactory(Settings settings, String persistentNodeId) {
  1092. this.persistentNodeId = persistentNodeId;
  1093. this.settings = settings;
  1094. }
  1095. @Override
  1096. public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
  1097. localNode.set(DiscoveryNode.createLocal(settings, boundTransportAddress.publishAddress(), persistentNodeId));
  1098. return localNode.get();
  1099. }
  1100. DiscoveryNode getNode() {
  1101. assert localNode.get() != null;
  1102. return localNode.get();
  1103. }
  1104. }
  1105. }