GatewayMetaState.java 29 KB


  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the "Elastic License
  4. * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
  5. * Public License v 1"; you may not use this file except in compliance with, at
  6. * your election, the "Elastic License 2.0", the "GNU Affero General Public
  7. * License v3.0 only", or the "Server Side Public License, v 1".
  8. */
  9. package org.elasticsearch.gateway;
  10. import org.apache.logging.log4j.LogManager;
  11. import org.apache.logging.log4j.Logger;
  12. import org.apache.lucene.store.AlreadyClosedException;
  13. import org.apache.lucene.util.SetOnce;
  14. import org.elasticsearch.ElasticsearchException;
  15. import org.elasticsearch.ExceptionsHelper;
  16. import org.elasticsearch.cluster.ClusterName;
  17. import org.elasticsearch.cluster.ClusterState;
  18. import org.elasticsearch.cluster.coordination.CoordinationMetadata;
  19. import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
  20. import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
  21. import org.elasticsearch.cluster.metadata.IndexMetadata;
  22. import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
  23. import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
  24. import org.elasticsearch.cluster.metadata.Manifest;
  25. import org.elasticsearch.cluster.metadata.Metadata;
  26. import org.elasticsearch.cluster.node.DiscoveryNode;
  27. import org.elasticsearch.cluster.service.ClusterService;
  28. import org.elasticsearch.cluster.version.CompatibilityVersions;
  29. import org.elasticsearch.common.settings.Settings;
  30. import org.elasticsearch.common.util.concurrent.AbstractRunnable;
  31. import org.elasticsearch.common.util.concurrent.EsExecutors;
  32. import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
  33. import org.elasticsearch.core.IOUtils;
  34. import org.elasticsearch.core.Tuple;
  35. import org.elasticsearch.core.UpdateForV9;
  36. import org.elasticsearch.env.BuildVersion;
  37. import org.elasticsearch.env.NodeMetadata;
  38. import org.elasticsearch.index.IndexVersions;
  39. import org.elasticsearch.node.Node;
  40. import org.elasticsearch.plugins.ClusterCoordinationPlugin;
  41. import org.elasticsearch.plugins.MetadataUpgrader;
  42. import org.elasticsearch.threadpool.ThreadPool;
  43. import org.elasticsearch.transport.TransportService;
  44. import java.io.Closeable;
  45. import java.io.IOException;
  46. import java.util.Collections;
  47. import java.util.HashMap;
  48. import java.util.List;
  49. import java.util.Map;
  50. import java.util.Objects;
  51. import java.util.Optional;
  52. import java.util.concurrent.TimeUnit;
  53. import java.util.concurrent.atomic.AtomicReference;
  54. import java.util.function.BiConsumer;
  55. import java.util.function.Consumer;
  56. import java.util.function.Function;
  57. import java.util.function.UnaryOperator;
  58. import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
  59. /**
  60. * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
  61. *
  62. * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
  63. * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
  64. * ClusterState#metadata()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
  65. * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
  66. */
  67. public class GatewayMetaState implements Closeable {
  68. /**
  69. * Fake node ID for a voting configuration written by a master-ineligible data node to indicate that its on-disk state is potentially
  70. * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
  71. * restarted as a master-eligible node then it does not win any elections until it has received a fresh cluster state.
  72. */
  73. public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";
  74. // Set by calling start()
  75. private final SetOnce<PersistedState> persistedState = new SetOnce<>();
  76. public PersistedState getPersistedState() {
  77. final PersistedState persistedState = this.persistedState.get();
  78. assert persistedState != null : "not started";
  79. return persistedState;
  80. }
  81. public Metadata getMetadata() {
  82. return getPersistedState().getLastAcceptedState().metadata();
  83. }
  84. public void start(
  85. Settings settings,
  86. TransportService transportService,
  87. ClusterService clusterService,
  88. MetaStateService metaStateService,
  89. IndexMetadataVerifier indexMetadataVerifier,
  90. MetadataUpgrader metadataUpgrader,
  91. PersistedClusterStateService persistedClusterStateService,
  92. List<ClusterCoordinationPlugin> clusterCoordinationPlugins,
  93. CompatibilityVersions compatibilityVersions
  94. ) {
  95. assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();
  96. try {
  97. persistedState.set(
  98. createPersistedState(
  99. settings,
  100. transportService,
  101. clusterService,
  102. metaStateService,
  103. indexMetadataVerifier,
  104. metadataUpgrader,
  105. persistedClusterStateService,
  106. clusterCoordinationPlugins,
  107. compatibilityVersions
  108. )
  109. );
  110. } catch (IOException e) {
  111. throw new ElasticsearchException("failed to load metadata", e);
  112. }
  113. }
  114. private PersistedState createPersistedState(
  115. Settings settings,
  116. TransportService transportService,
  117. ClusterService clusterService,
  118. MetaStateService metaStateService,
  119. IndexMetadataVerifier indexMetadataVerifier,
  120. MetadataUpgrader metadataUpgrader,
  121. PersistedClusterStateService persistedClusterStateService,
  122. List<ClusterCoordinationPlugin> clusterCoordinationPlugins,
  123. CompatibilityVersions compatibilityVersions
  124. ) throws IOException {
  125. final var persistedStateFactories = clusterCoordinationPlugins.stream()
  126. .map(ClusterCoordinationPlugin::getPersistedStateFactory)
  127. .flatMap(Optional::stream)
  128. .toList();
  129. if (persistedStateFactories.size() > 1) {
  130. throw new IllegalStateException("multiple persisted-state factories found: " + persistedStateFactories);
  131. }
  132. if (persistedStateFactories.size() == 1) {
  133. return persistedStateFactories.get(0).createPersistedState(settings, transportService, persistedClusterStateService);
  134. }
  135. if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.canContainData(settings)) {
  136. return createOnDiskPersistedState(
  137. settings,
  138. transportService,
  139. clusterService,
  140. metaStateService,
  141. indexMetadataVerifier,
  142. metadataUpgrader,
  143. persistedClusterStateService,
  144. compatibilityVersions
  145. );
  146. }
  147. return createInMemoryPersistedState(
  148. settings,
  149. transportService,
  150. clusterService,
  151. metaStateService,
  152. persistedClusterStateService,
  153. compatibilityVersions
  154. );
  155. }
  156. private PersistedState createOnDiskPersistedState(
  157. Settings settings,
  158. TransportService transportService,
  159. ClusterService clusterService,
  160. MetaStateService metaStateService,
  161. IndexMetadataVerifier indexMetadataVerifier,
  162. MetadataUpgrader metadataUpgrader,
  163. PersistedClusterStateService persistedClusterStateService,
  164. CompatibilityVersions compatibilityVersions
  165. ) throws IOException {
  166. final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();
  167. Metadata metadata = onDiskState.metadata;
  168. long lastAcceptedVersion = onDiskState.lastAcceptedVersion;
  169. long currentTerm = onDiskState.currentTerm;
  170. if (onDiskState.empty()) {
  171. @UpdateForV9 // legacy metadata loader is not needed anymore from v9 onwards
  172. final Tuple<Manifest, Metadata> legacyState = metaStateService.loadFullState();
  173. if (legacyState.v1().isEmpty() == false) {
  174. metadata = legacyState.v2();
  175. lastAcceptedVersion = legacyState.v1().clusterStateVersion();
  176. currentTerm = legacyState.v1().currentTerm();
  177. }
  178. }
  179. PersistedState persistedState = null;
  180. boolean success = false;
  181. try {
  182. final ClusterState clusterState = prepareInitialClusterState(
  183. transportService,
  184. clusterService,
  185. ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
  186. .version(lastAcceptedVersion)
  187. .metadata(upgradeMetadataForNode(metadata, indexMetadataVerifier, metadataUpgrader))
  188. .build(),
  189. compatibilityVersions
  190. );
  191. if (DiscoveryNode.isMasterNode(settings)) {
  192. persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
  193. } else {
  194. persistedState = new AsyncPersistedState(
  195. settings,
  196. transportService.getThreadPool(),
  197. new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState)
  198. );
  199. }
  200. if (DiscoveryNode.canContainData(settings)) {
  201. metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality)
  202. } else {
  203. metaStateService.deleteAll(); // delete legacy files
  204. }
  205. // write legacy node metadata to prevent accidental downgrades from spawning empty cluster state
  206. NodeMetadata.FORMAT.writeAndCleanup(
  207. new NodeMetadata(
  208. persistedClusterStateService.getNodeId(),
  209. BuildVersion.current(),
  210. clusterState.metadata().oldestIndexVersion()
  211. ),
  212. persistedClusterStateService.getDataPaths()
  213. );
  214. success = true;
  215. } finally {
  216. if (success == false) {
  217. IOUtils.closeWhileHandlingException(persistedState);
  218. }
  219. }
  220. return persistedState;
  221. }
  222. private PersistedState createInMemoryPersistedState(
  223. Settings settings,
  224. TransportService transportService,
  225. ClusterService clusterService,
  226. MetaStateService metaStateService,
  227. PersistedClusterStateService persistedClusterStateService,
  228. CompatibilityVersions compatibilityVersions
  229. ) throws IOException {
  230. final long currentTerm = 0L;
  231. final ClusterState clusterState = prepareInitialClusterState(
  232. transportService,
  233. clusterService,
  234. ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build(),
  235. compatibilityVersions
  236. );
  237. if (persistedClusterStateService.getDataPaths().length > 0) {
  238. // write empty cluster state just so that we have a persistent node id. There is no need to write out global metadata with
  239. // cluster uuid as coordinating-only nodes do not snap into a cluster as they carry no state
  240. try (PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter()) {
  241. persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState);
  242. }
  243. // delete legacy cluster state files
  244. metaStateService.deleteAll();
  245. // write legacy node metadata to prevent downgrades from spawning empty cluster state
  246. NodeMetadata.FORMAT.writeAndCleanup(
  247. new NodeMetadata(
  248. persistedClusterStateService.getNodeId(),
  249. BuildVersion.current(),
  250. clusterState.metadata().oldestIndexVersion()
  251. ),
  252. persistedClusterStateService.getDataPaths()
  253. );
  254. }
  255. return new InMemoryPersistedState(currentTerm, clusterState);
  256. }
  257. // exposed so it can be overridden by tests
  258. ClusterState prepareInitialClusterState(
  259. TransportService transportService,
  260. ClusterService clusterService,
  261. ClusterState clusterState,
  262. CompatibilityVersions compatibilityVersions
  263. ) {
  264. assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once";
  265. assert transportService.getLocalNode() != null : "transport service is not yet started";
  266. return Function.<ClusterState>identity()
  267. .andThen(ClusterStateUpdaters::addStateNotRecoveredBlock)
  268. .andThen(state -> ClusterStateUpdaters.setLocalNode(state, transportService.getLocalNode(), compatibilityVersions))
  269. .andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings()))
  270. .andThen(ClusterStateUpdaters::recoverClusterBlocks)
  271. .apply(clusterState);
  272. }
  273. // exposed so it can be overridden by tests
  274. Metadata upgradeMetadataForNode(Metadata metadata, IndexMetadataVerifier indexMetadataVerifier, MetadataUpgrader metadataUpgrader) {
  275. return upgradeMetadata(metadata, indexMetadataVerifier, metadataUpgrader);
  276. }
  277. /**
  278. * This method uses {@link IndexMetadataVerifier} to ensure that indices are compatible
  279. * with the current version. It also calls into plugins to update their index templates.
  280. *
  281. * @return input <code>metadata</code> if no upgrade is needed or an upgraded metadata
  282. */
  283. static Metadata upgradeMetadata(Metadata metadata, IndexMetadataVerifier indexMetadataVerifier, MetadataUpgrader metadataUpgrader) {
  284. boolean changed = false;
  285. final Metadata.Builder upgradedMetadata = Metadata.builder(metadata);
  286. for (IndexMetadata indexMetadata : metadata) {
  287. IndexMetadata newMetadata = indexMetadataVerifier.verifyIndexMetadata(
  288. indexMetadata,
  289. IndexVersions.MINIMUM_COMPATIBLE,
  290. IndexVersions.MINIMUM_READONLY_COMPATIBLE
  291. );
  292. changed |= indexMetadata != newMetadata;
  293. upgradedMetadata.put(newMetadata, false);
  294. }
  295. // upgrade current templates
  296. if (applyPluginUpgraders(
  297. metadata.getTemplates(),
  298. metadataUpgrader.indexTemplateMetadataUpgraders,
  299. upgradedMetadata::removeTemplate,
  300. (s, indexTemplateMetadata) -> upgradedMetadata.put(indexTemplateMetadata)
  301. )) {
  302. changed = true;
  303. }
  304. return changed ? upgradedMetadata.build() : metadata;
  305. }
  306. private static boolean applyPluginUpgraders(
  307. Map<String, IndexTemplateMetadata> existingData,
  308. UnaryOperator<Map<String, IndexTemplateMetadata>> upgrader,
  309. Consumer<String> removeData,
  310. BiConsumer<String, IndexTemplateMetadata> putData
  311. ) {
  312. // collect current data
  313. Map<String, IndexTemplateMetadata> existingMap = new HashMap<>();
  314. for (Map.Entry<String, IndexTemplateMetadata> customCursor : existingData.entrySet()) {
  315. existingMap.put(customCursor.getKey(), customCursor.getValue());
  316. }
  317. // upgrade global custom meta data
  318. Map<String, IndexTemplateMetadata> upgradedCustoms = upgrader.apply(existingMap);
  319. if (upgradedCustoms.equals(existingMap) == false) {
  320. // remove all data first so a plugin can remove custom metadata or templates if needed
  321. existingMap.keySet().forEach(removeData);
  322. for (Map.Entry<String, IndexTemplateMetadata> upgradedCustomEntry : upgradedCustoms.entrySet()) {
  323. putData.accept(upgradedCustomEntry.getKey(), upgradedCustomEntry.getValue());
  324. }
  325. return true;
  326. }
  327. return false;
  328. }
  329. @Override
  330. public void close() throws IOException {
  331. IOUtils.close(persistedState.get());
  332. }
  333. // visible for testing
  334. public boolean allPendingAsyncStatesWritten() {
  335. final PersistedState ps = persistedState.get();
  336. if (ps instanceof AsyncPersistedState) {
  337. return ((AsyncPersistedState) ps).allPendingAsyncStatesWritten();
  338. } else {
  339. return true;
  340. }
  341. }
  342. static class AsyncPersistedState extends InMemoryPersistedState {
  343. private static final Logger logger = LogManager.getLogger(AsyncPersistedState.class);
  344. static final String THREAD_NAME = "AsyncLucenePersistedState#updateTask";
  345. private final EsThreadPoolExecutor threadPoolExecutor;
  346. private final PersistedState persistedState;
  347. boolean newCurrentTermQueued = false;
  348. boolean newStateQueued = false;
  349. private final Object mutex = new Object();
  350. AsyncPersistedState(Settings settings, ThreadPool threadPool, PersistedState persistedState) {
  351. super(persistedState.getCurrentTerm(), persistedState.getLastAcceptedState());
  352. final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
  353. threadPoolExecutor = EsExecutors.newFixed(
  354. nodeName + "/" + THREAD_NAME,
  355. 1,
  356. 1,
  357. daemonThreadFactory(nodeName, THREAD_NAME),
  358. threadPool.getThreadContext(),
  359. EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
  360. );
  361. this.persistedState = persistedState;
  362. }
  363. @Override
  364. public void setCurrentTerm(long currentTerm) {
  365. synchronized (mutex) {
  366. super.setCurrentTerm(currentTerm);
  367. if (newCurrentTermQueued) {
  368. logger.trace("term update already queued (setting term to {})", currentTerm);
  369. } else {
  370. logger.trace("queuing term update (setting term to {})", currentTerm);
  371. newCurrentTermQueued = true;
  372. if (newStateQueued == false) {
  373. scheduleUpdate();
  374. }
  375. }
  376. }
  377. }
  378. @Override
  379. public void setLastAcceptedState(ClusterState clusterState) {
  380. synchronized (mutex) {
  381. super.setLastAcceptedState(clusterState);
  382. if (newStateQueued) {
  383. logger.trace("cluster state update already queued (setting cluster state to {})", clusterState.version());
  384. } else {
  385. logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
  386. newStateQueued = true;
  387. if (newCurrentTermQueued == false) {
  388. scheduleUpdate();
  389. }
  390. }
  391. }
  392. }
  393. private void scheduleUpdate() {
  394. assert Thread.holdsLock(mutex);
  395. assert threadPoolExecutor.getQueue().isEmpty() : "threadPoolExecutor queue not empty";
  396. threadPoolExecutor.execute(new AbstractRunnable() {
  397. @Override
  398. public void onFailure(Exception e) {
  399. logger.error("Exception occurred when storing new meta data", e);
  400. }
  401. @Override
  402. public void onRejection(Exception e) {
  403. assert threadPoolExecutor.isShutdown() : "only expect rejections when shutting down";
  404. }
  405. @Override
  406. protected void doRun() {
  407. final Long term;
  408. final ClusterState clusterState;
  409. synchronized (mutex) {
  410. if (newCurrentTermQueued) {
  411. term = getCurrentTerm();
  412. logger.trace("resetting newCurrentTermQueued");
  413. newCurrentTermQueued = false;
  414. } else {
  415. term = null;
  416. }
  417. if (newStateQueued) {
  418. clusterState = getLastAcceptedState();
  419. logger.trace("resetting newStateQueued");
  420. newStateQueued = false;
  421. } else {
  422. clusterState = null;
  423. }
  424. }
  425. // write current term before last accepted state so that it is never below term in last accepted state
  426. if (term != null) {
  427. persistedState.setCurrentTerm(term);
  428. }
  429. if (clusterState != null) {
  430. persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
  431. }
  432. }
  433. });
  434. }
  435. static final CoordinationMetadata.VotingConfiguration staleStateConfiguration = new CoordinationMetadata.VotingConfiguration(
  436. Collections.singleton(STALE_STATE_CONFIG_NODE_ID)
  437. );
  438. static ClusterState resetVotingConfiguration(ClusterState clusterState) {
  439. CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(clusterState.coordinationMetadata())
  440. .lastAcceptedConfiguration(staleStateConfiguration)
  441. .lastCommittedConfiguration(staleStateConfiguration)
  442. .build();
  443. return ClusterState.builder(clusterState)
  444. .metadata(clusterState.metadata().withCoordinationMetadata(newCoordinationMetadata))
  445. .build();
  446. }
  447. @Override
  448. public void close() throws IOException {
  449. try {
  450. ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
  451. } finally {
  452. persistedState.close();
  453. }
  454. }
  455. boolean allPendingAsyncStatesWritten() {
  456. synchronized (mutex) {
  457. if (newCurrentTermQueued || newStateQueued) {
  458. return false;
  459. }
  460. return threadPoolExecutor.getActiveCount() == 0;
  461. }
  462. }
  463. }
  464. /**
  465. * Encapsulates the incremental writing of metadata to a {@link PersistedClusterStateService.Writer}.
  466. */
  467. public static class LucenePersistedState implements PersistedState {
  468. private long currentTerm;
  469. private ClusterState lastAcceptedState;
  470. private final PersistedClusterStateService persistedClusterStateService;
  471. // As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place.
  472. private final AtomicReference<PersistedClusterStateService.Writer> persistenceWriter = new AtomicReference<>();
  473. private boolean writeNextStateFully;
  474. @SuppressWarnings("this-escape")
  475. public LucenePersistedState(
  476. PersistedClusterStateService persistedClusterStateService,
  477. long currentTerm,
  478. ClusterState lastAcceptedState
  479. ) throws IOException {
  480. this.persistedClusterStateService = persistedClusterStateService;
  481. this.currentTerm = currentTerm;
  482. this.lastAcceptedState = lastAcceptedState;
  483. // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that
  484. // (1) throwing an IOException is enough to halt the node, and
  485. // (2) the index is currently empty since it was opened with IndexWriterConfig.OpenMode.CREATE
  486. // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance,
  487. // this is true if there's only one data path on this master node, and the commit we just loaded was already written out
  488. // by this version of Elasticsearch. TODO TBD should we avoid indexing when possible?
  489. final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter();
  490. maybeWriteInitialState(currentTerm, lastAcceptedState, writer);
  491. persistenceWriter.set(writer);
  492. }
  493. protected void maybeWriteInitialState(long currentTerm, ClusterState lastAcceptedState, PersistedClusterStateService.Writer writer)
  494. throws IOException {
  495. try {
  496. writer.writeFullStateAndCommit(currentTerm, lastAcceptedState);
  497. } catch (Exception e) {
  498. try {
  499. writer.close();
  500. } catch (Exception e2) {
  501. e.addSuppressed(e2);
  502. }
  503. throw e;
  504. }
  505. }
  506. @Override
  507. public long getCurrentTerm() {
  508. return currentTerm;
  509. }
  510. @Override
  511. public ClusterState getLastAcceptedState() {
  512. return lastAcceptedState;
  513. }
  514. @Override
  515. public void setCurrentTerm(long currentTerm) {
  516. writeCurrentTermToDisk(currentTerm);
  517. this.currentTerm = currentTerm;
  518. }
  519. protected void writeCurrentTermToDisk(long currentTerm) {
  520. try {
  521. if (writeNextStateFully) {
  522. getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState);
  523. } else {
  524. writeNextStateFully = true; // in case of failure; this flag is cleared on success
  525. Metadata metadata = lastAcceptedState.metadata();
  526. getWriterSafe().writeIncrementalTermUpdateAndCommit(
  527. currentTerm,
  528. lastAcceptedState.version(),
  529. metadata.oldestIndexVersion(),
  530. metadata.clusterUUID(),
  531. metadata.clusterUUIDCommitted()
  532. );
  533. }
  534. } catch (IOException e) {
  535. throw new ElasticsearchException(e);
  536. }
  537. writeNextStateFully = false;
  538. }
  539. @Override
  540. public void setLastAcceptedState(ClusterState clusterState) {
  541. writeClusterStateToDisk(clusterState);
  542. lastAcceptedState = clusterState;
  543. }
  544. protected void writeClusterStateToDisk(ClusterState clusterState) {
  545. try {
  546. if (writeNextStateFully) {
  547. getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
  548. } else {
  549. writeNextStateFully = true; // in case of failure; this flag is cleared on success
  550. if (clusterState.term() != lastAcceptedState.term()) {
  551. assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
  552. // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state,
  553. // so it's simplest to write everything again.
  554. getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
  555. } else {
  556. // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing.
  557. getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
  558. }
  559. }
  560. } catch (IOException e) {
  561. throw new ElasticsearchException(e);
  562. }
  563. writeNextStateFully = false;
  564. }
  565. private PersistedClusterStateService.Writer getWriterSafe() {
  566. final PersistedClusterStateService.Writer writer = persistenceWriter.get();
  567. if (writer == null) {
  568. throw new AlreadyClosedException("persisted state has been closed");
  569. }
  570. if (writer.isOpen()) {
  571. return writer;
  572. } else {
  573. try {
  574. final PersistedClusterStateService.Writer newWriter = persistedClusterStateService.createWriter();
  575. if (persistenceWriter.compareAndSet(writer, newWriter)) {
  576. return newWriter;
  577. } else {
  578. assert persistenceWriter.get() == null : "expected no concurrent calls to getWriterSafe";
  579. newWriter.close();
  580. throw new AlreadyClosedException("persisted state has been closed");
  581. }
  582. } catch (Exception e) {
  583. throw ExceptionsHelper.convertToRuntime(e);
  584. }
  585. }
  586. }
  587. @Override
  588. public void close() throws IOException {
  589. IOUtils.close(persistenceWriter.getAndSet(null));
  590. }
  591. }
  592. }