AllocationIdIT.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.cluster.routing;
  20. import org.apache.lucene.store.SimpleFSDirectory;
  21. import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
  22. import org.elasticsearch.action.admin.indices.stats.ShardStats;
  23. import org.elasticsearch.action.index.IndexRequestBuilder;
  24. import org.elasticsearch.client.Requests;
  25. import org.elasticsearch.cluster.ClusterState;
  26. import org.elasticsearch.cluster.health.ClusterHealthStatus;
  27. import org.elasticsearch.cluster.metadata.IndexMetaData;
  28. import org.elasticsearch.cluster.routing.allocation.AllocationDecision;
  29. import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
  30. import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
  31. import org.elasticsearch.common.settings.Settings;
  32. import org.elasticsearch.index.IndexService;
  33. import org.elasticsearch.index.IndexSettings;
  34. import org.elasticsearch.index.MockEngineFactoryPlugin;
  35. import org.elasticsearch.index.engine.Engine;
  36. import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT;
  37. import org.elasticsearch.index.shard.ShardId;
  38. import org.elasticsearch.index.shard.ShardPath;
  39. import org.elasticsearch.index.store.Store;
  40. import org.elasticsearch.indices.IndicesService;
  41. import org.elasticsearch.plugins.Plugin;
  42. import org.elasticsearch.test.DummyShardLock;
  43. import org.elasticsearch.test.ESIntegTestCase;
  44. import org.elasticsearch.test.InternalSettingsPlugin;
  45. import org.elasticsearch.test.InternalTestCluster;
  46. import org.elasticsearch.test.transport.MockTransportService;
  47. import java.io.IOException;
  48. import java.nio.file.Path;
  49. import java.util.Arrays;
  50. import java.util.Collection;
  51. import java.util.Set;
  52. import java.util.concurrent.ExecutionException;
  53. import java.util.stream.Collectors;
  54. import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
  55. import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
  56. import static org.hamcrest.Matchers.equalTo;
  57. import static org.hamcrest.Matchers.greaterThan;
  58. import static org.hamcrest.Matchers.hasSize;
  59. import static org.hamcrest.Matchers.is;
  60. import static org.hamcrest.Matchers.not;
  61. @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
  62. public class AllocationIdIT extends ESIntegTestCase {
  63. @Override
  64. protected Collection<Class<? extends Plugin>> nodePlugins() {
  65. return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
  66. }
  67. public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception {
  68. /*
  69. * Allocation id is put on start of shard while historyUUID is adjusted after recovery is done.
  70. *
  71. * If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed
  72. * shard restart skips the stage where historyUUID is changed.
  73. *
  74. * That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and
  75. * replica will receive operations after local checkpoint while documents before checkpoints could be significant different.
  76. *
  77. * Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that)
  78. * and any failure during recovery requires extra AllocateStalePrimary command to be executed.
  79. */
  80. // initial set up
  81. final String indexName = "index42";
  82. final String master = internalCluster().startMasterOnlyNode();
  83. String node1 = internalCluster().startNode();
  84. createIndex(indexName, Settings.builder()
  85. .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
  86. .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
  87. .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build());
  88. final int numDocs = indexDocs(indexName, "foo", "bar");
  89. final IndexSettings indexSettings = getIndexSettings(indexName, node1);
  90. final Set<String> allocationIds = getAllocationIds(indexName);
  91. final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
  92. final Path indexPath = getIndexPath(node1, shardId);
  93. assertThat(allocationIds, hasSize(1));
  94. final String historyUUID = historyUUID(node1, indexName);
  95. String node2 = internalCluster().startNode();
  96. ensureGreen(indexName);
  97. internalCluster().assertSameDocIdsOnShards();
  98. // initial set up is done
  99. internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
  100. // index more docs to node2 that marks node1 as stale
  101. int numExtraDocs = indexDocs(indexName, "foo", "bar2");
  102. assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs);
  103. internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));
  104. // create fake corrupted marker on node1
  105. putFakeCorruptionMarker(indexSettings, shardId, indexPath);
  106. // thanks to master node1 is out of sync
  107. node1 = internalCluster().startNode();
  108. // there is only _stale_ primary
  109. checkNoValidShardCopy(indexName, shardId);
  110. // allocate stale primary
  111. client(node1).admin().cluster().prepareReroute()
  112. .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
  113. .get();
  114. // allocation fails due to corruption marker
  115. assertBusy(() -> {
  116. final ClusterState state = client().admin().cluster().prepareState().get().getState();
  117. final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard();
  118. assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED));
  119. assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
  120. });
  121. try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
  122. store.removeCorruptionMarker();
  123. }
  124. // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything)
  125. checkHealthStatus(indexName, ClusterHealthStatus.RED);
  126. checkNoValidShardCopy(indexName, shardId);
  127. internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK);
  128. // index is still red due to mismatch of allocation id
  129. checkHealthStatus(indexName, ClusterHealthStatus.RED);
  130. checkNoValidShardCopy(indexName, shardId);
  131. // no any valid shard is there; have to invoke AllocateStalePrimary again
  132. client().admin().cluster().prepareReroute()
  133. .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true))
  134. .get();
  135. ensureYellow(indexName);
  136. // bring node2 back
  137. node2 = internalCluster().startNode();
  138. ensureGreen(indexName);
  139. assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID)));
  140. assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName)));
  141. internalCluster().assertSameDocIdsOnShards();
  142. }
  143. public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) {
  144. final ClusterHealthStatus indexHealthStatus = client().admin().cluster()
  145. .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus();
  146. assertThat(indexHealthStatus, is(healthStatus));
  147. }
  148. private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException {
  149. // index some docs in several segments
  150. int numDocs = 0;
  151. for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) {
  152. final int numExtraDocs = between(10, 100);
  153. IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs];
  154. for (int i = 0; i < builders.length; i++) {
  155. builders[i] = client().prepareIndex(indexName, "type").setSource(source);
  156. }
  157. indexRandom(true, false, true, Arrays.asList(builders));
  158. numDocs += numExtraDocs;
  159. }
  160. return numDocs;
  161. }
  162. private Path getIndexPath(String nodeName, ShardId shardId) {
  163. final Set<Path> indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME);
  164. assertThat(indexDirs, hasSize(1));
  165. return indexDirs.iterator().next();
  166. }
  167. private Set<String> getAllocationIds(String indexName) {
  168. final ClusterState state = client().admin().cluster().prepareState().get().getState();
  169. final Set<String> allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0);
  170. return allocationIds;
  171. }
  172. private IndexSettings getIndexSettings(String indexName, String nodeName) {
  173. final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
  174. final IndexService indexService = indicesService.indexService(resolveIndex(indexName));
  175. return indexService.getIndexSettings();
  176. }
  177. private String historyUUID(String node, String indexName) {
  178. final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards();
  179. assertThat(shards.length, greaterThan(0));
  180. final Set<String> historyUUIDs = Arrays.stream(shards)
  181. .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY))
  182. .collect(Collectors.toSet());
  183. assertThat(historyUUIDs, hasSize(1));
  184. return historyUUIDs.iterator().next();
  185. }
  186. private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException {
  187. try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) {
  188. store.markStoreCorrupted(new IOException("fake ioexception"));
  189. }
  190. }
  191. private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
  192. assertBusy(() -> {
  193. final ClusterAllocationExplanation explanation =
  194. client().admin().cluster().prepareAllocationExplain()
  195. .setIndex(indexName).setShard(shardId.id()).setPrimary(true)
  196. .get().getExplanation();
  197. final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
  198. assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
  199. assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
  200. equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
  201. });
  202. }
  203. }