IndexLevelReplicationTests.java 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.index.replication;
  20. import org.apache.lucene.index.IndexWriter;
  21. import org.apache.lucene.index.IndexableField;
  22. import org.apache.lucene.index.Term;
  23. import org.apache.lucene.search.TermQuery;
  24. import org.apache.lucene.search.TopDocs;
  25. import org.elasticsearch.Version;
  26. import org.elasticsearch.action.ActionListener;
  27. import org.elasticsearch.action.DocWriteResponse;
  28. import org.elasticsearch.action.bulk.BulkItemResponse;
  29. import org.elasticsearch.action.bulk.BulkShardRequest;
  30. import org.elasticsearch.action.delete.DeleteRequest;
  31. import org.elasticsearch.action.index.IndexRequest;
  32. import org.elasticsearch.cluster.metadata.IndexMetaData;
  33. import org.elasticsearch.cluster.routing.ShardRouting;
  34. import org.elasticsearch.common.settings.Settings;
  35. import org.elasticsearch.common.unit.TimeValue;
  36. import org.elasticsearch.common.util.iterable.Iterables;
  37. import org.elasticsearch.common.xcontent.XContentType;
  38. import org.elasticsearch.index.IndexSettings;
  39. import org.elasticsearch.index.engine.Engine;
  40. import org.elasticsearch.index.engine.EngineFactory;
  41. import org.elasticsearch.index.engine.InternalEngine;
  42. import org.elasticsearch.index.engine.InternalEngineTests;
  43. import org.elasticsearch.index.engine.SegmentsStats;
  44. import org.elasticsearch.index.engine.VersionConflictEngineException;
  45. import org.elasticsearch.index.mapper.SeqNoFieldMapper;
  46. import org.elasticsearch.index.seqno.SeqNoStats;
  47. import org.elasticsearch.index.seqno.SequenceNumbers;
  48. import org.elasticsearch.index.shard.IndexShard;
  49. import org.elasticsearch.index.shard.IndexShardTests;
  50. import org.elasticsearch.index.store.Store;
  51. import org.elasticsearch.index.translog.SnapshotMatchers;
  52. import org.elasticsearch.index.translog.Translog;
  53. import org.elasticsearch.indices.recovery.RecoveryTarget;
  54. import org.elasticsearch.threadpool.TestThreadPool;
  55. import org.elasticsearch.threadpool.ThreadPool;
  56. import org.hamcrest.Matcher;
  57. import java.io.IOException;
  58. import java.util.ArrayList;
  59. import java.util.Collections;
  60. import java.util.List;
  61. import java.util.Map;
  62. import java.util.concurrent.CountDownLatch;
  63. import java.util.concurrent.CyclicBarrier;
  64. import java.util.concurrent.Future;
  65. import java.util.concurrent.TimeUnit;
  66. import java.util.concurrent.atomic.AtomicBoolean;
  67. import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
  68. import static org.hamcrest.Matchers.anyOf;
  69. import static org.hamcrest.Matchers.containsString;
  70. import static org.hamcrest.Matchers.either;
  71. import static org.hamcrest.Matchers.equalTo;
  72. import static org.hamcrest.Matchers.greaterThan;
  73. import static org.hamcrest.Matchers.instanceOf;
  74. import static org.hamcrest.Matchers.notNullValue;
  75. import static org.hamcrest.Matchers.nullValue;
  76. import static org.hamcrest.core.Is.is;
  77. public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
  78. public void testSimpleReplication() throws Exception {
  79. try (ReplicationGroup shards = createGroup(randomInt(2))) {
  80. shards.startAll();
  81. final int docCount = randomInt(50);
  82. shards.indexDocs(docCount);
  83. shards.assertAllEqual(docCount);
  84. }
  85. }
  86. public void testSimpleAppendOnlyReplication() throws Exception {
  87. try (ReplicationGroup shards = createGroup(randomInt(2))) {
  88. shards.startAll();
  89. final int docCount = randomInt(50);
  90. shards.appendDocs(docCount);
  91. shards.assertAllEqual(docCount);
  92. }
  93. }
  94. public void testAppendWhileRecovering() throws Exception {
  95. try (ReplicationGroup shards = createGroup(0)) {
  96. shards.startAll();
  97. CountDownLatch latch = new CountDownLatch(2);
  98. int numDocs = randomIntBetween(100, 200);
  99. shards.appendDocs(1);// just append one to the translog so we can assert below
  100. Thread thread = new Thread() {
  101. @Override
  102. public void run() {
  103. try {
  104. latch.countDown();
  105. latch.await();
  106. shards.appendDocs(numDocs - 1);
  107. } catch (Exception e) {
  108. throw new AssertionError(e);
  109. }
  110. }
  111. };
  112. thread.start();
  113. IndexShard replica = shards.addReplica();
  114. Future<Void> future = shards.asyncRecoverReplica(replica,
  115. (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener) {
  116. @Override
  117. public void cleanFiles(int totalTranslogOps, long globalCheckpoint,
  118. Store.MetadataSnapshot sourceMetaData, ActionListener<Void> listener) {
  119. super.cleanFiles(totalTranslogOps, globalCheckpoint, sourceMetaData, ActionListener.runAfter(listener, () -> {
  120. latch.countDown();
  121. try {
  122. latch.await();
  123. } catch (InterruptedException e) {
  124. throw new AssertionError(e);
  125. }
  126. }));
  127. }
  128. });
  129. future.get();
  130. thread.join();
  131. shards.assertAllEqual(numDocs);
  132. Engine engine = IndexShardTests.getEngineFromShard(shards.getPrimary());
  133. assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine));
  134. assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine));
  135. }
  136. }
  137. public void testRetryAppendOnlyAfterRecovering() throws Exception {
  138. try (ReplicationGroup shards = createGroup(0)) {
  139. shards.startAll();
  140. final IndexRequest originalRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
  141. originalRequest.process(Version.CURRENT, null, index.getName());
  142. final IndexRequest retryRequest = copyIndexRequest(originalRequest);
  143. retryRequest.onRetry();
  144. shards.index(retryRequest);
  145. IndexShard replica = shards.addReplica();
  146. shards.recoverReplica(replica);
  147. shards.assertAllEqual(1);
  148. shards.index(originalRequest); // original append-only arrives after recovery completed
  149. shards.assertAllEqual(1);
  150. assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp()));
  151. }
  152. }
  153. public void testAppendOnlyRecoveryThenReplication() throws Exception {
  154. CountDownLatch indexedOnPrimary = new CountDownLatch(1);
  155. CountDownLatch recoveryDone = new CountDownLatch(1);
  156. try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1)) {
  157. @Override
  158. protected EngineFactory getEngineFactory(ShardRouting routing) {
  159. return config -> new InternalEngine(config) {
  160. @Override
  161. public IndexResult index(Index op) throws IOException {
  162. IndexResult result = super.index(op);
  163. if (op.origin() == Operation.Origin.PRIMARY) {
  164. indexedOnPrimary.countDown();
  165. // prevent the indexing on the primary from returning (it was added to Lucene and translog already)
  166. // to make sure that this operation is replicated to the replica via recovery, then via replication.
  167. try {
  168. recoveryDone.await();
  169. } catch (InterruptedException e) {
  170. throw new AssertionError(e);
  171. }
  172. }
  173. return result;
  174. }
  175. };
  176. }
  177. }) {
  178. shards.startAll();
  179. Thread thread = new Thread(() -> {
  180. IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
  181. try {
  182. shards.index(indexRequest);
  183. } catch (Exception e) {
  184. throw new AssertionError(e);
  185. }
  186. });
  187. thread.start();
  188. IndexShard replica = shards.addReplica();
  189. Future<Void> fut = shards.asyncRecoverReplica(replica,
  190. (shard, node) -> new RecoveryTarget(shard, node, recoveryListener) {
  191. @Override
  192. public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
  193. ActionListener<Void> listener) {
  194. try {
  195. indexedOnPrimary.await();
  196. } catch (InterruptedException e) {
  197. throw new AssertionError(e);
  198. }
  199. super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
  200. }
  201. });
  202. fut.get();
  203. recoveryDone.countDown();
  204. thread.join();
  205. shards.assertAllEqual(1);
  206. }
  207. }
  208. public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception {
  209. try (ReplicationGroup shards = createGroup(0)) {
  210. shards.startAll();
  211. final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
  212. indexRequest.onRetry(); // force an update of the timestamp
  213. final BulkItemResponse response = shards.index(indexRequest);
  214. assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult());
  215. if (randomBoolean()) { // lets check if that also happens if no translog record is replicated
  216. shards.flush();
  217. }
  218. IndexShard replica = shards.addReplica();
  219. shards.recoverReplica(replica);
  220. SegmentsStats segmentsStats = replica.segmentStats(false, false);
  221. SegmentsStats primarySegmentStats = shards.getPrimary().segmentStats(false, false);
  222. assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp());
  223. assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp());
  224. assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp());
  225. }
  226. }
  227. public void testCheckpointsAdvance() throws Exception {
  228. try (ReplicationGroup shards = createGroup(randomInt(3))) {
  229. shards.startPrimary();
  230. int numDocs = 0;
  231. int startedShards;
  232. do {
  233. numDocs += shards.indexDocs(randomInt(20));
  234. startedShards = shards.startReplicas(randomIntBetween(1, 2));
  235. } while (startedShards > 0);
  236. for (IndexShard shard : shards) {
  237. final SeqNoStats shardStats = shard.seqNoStats();
  238. final ShardRouting shardRouting = shard.routingEntry();
  239. assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L));
  240. /*
  241. * After the last indexing operation completes, the primary will advance its global checkpoint. Without another indexing
  242. * operation, or a background sync, the primary will not have broadcast this global checkpoint to its replicas. However, a
  243. * shard could have recovered from the primary in which case its global checkpoint will be in-sync with the primary.
  244. * Therefore, we can only assert that the global checkpoint is number of docs minus one (matching the primary, in case of a
  245. * recovery), or number of docs minus two (received indexing operations but has not received a global checkpoint sync after
  246. * the last operation completed).
  247. */
  248. final Matcher<Long> globalCheckpointMatcher;
  249. if (shardRouting.primary()) {
  250. globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbers.NO_OPS_PERFORMED) : equalTo(numDocs - 1L);
  251. } else {
  252. globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbers.NO_OPS_PERFORMED)
  253. : anyOf(equalTo(numDocs - 1L), equalTo(numDocs - 2L));
  254. }
  255. assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), globalCheckpointMatcher);
  256. assertThat(shardRouting + " max seq no mismatch", shardStats.getMaxSeqNo(), equalTo(numDocs - 1L));
  257. }
  258. // simulate a background global checkpoint sync at which point we expect the global checkpoint to advance on the replicas
  259. shards.syncGlobalCheckpoint();
  260. final long noOpsPerformed = SequenceNumbers.NO_OPS_PERFORMED;
  261. for (IndexShard shard : shards) {
  262. final SeqNoStats shardStats = shard.seqNoStats();
  263. final ShardRouting shardRouting = shard.routingEntry();
  264. assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L));
  265. assertThat(
  266. shardRouting + " global checkpoint mismatch",
  267. shardStats.getGlobalCheckpoint(),
  268. numDocs == 0 ? equalTo(noOpsPerformed) : equalTo(numDocs - 1L));
  269. assertThat(shardRouting + " max seq no mismatch", shardStats.getMaxSeqNo(), equalTo(numDocs - 1L));
  270. }
  271. }
  272. }
  273. public void testConflictingOpsOnReplica() throws Exception {
  274. Map<String, String> mappings =
  275. Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
  276. try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
  277. shards.startAll();
  278. List<IndexShard> replicas = shards.getReplicas();
  279. IndexShard replica1 = replicas.get(0);
  280. IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
  281. logger.info("--> isolated replica " + replica1.routingEntry());
  282. BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary());
  283. for (int i = 1; i < replicas.size(); i++) {
  284. indexOnReplica(replicationRequest, shards, replicas.get(i));
  285. }
  286. logger.info("--> promoting replica to primary " + replica1.routingEntry());
  287. shards.promoteReplicaToPrimary(replica1).get();
  288. indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"2\"}", XContentType.JSON);
  289. shards.index(indexRequest);
  290. shards.refresh("test");
  291. for (IndexShard shard : shards) {
  292. try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
  293. TopDocs search = searcher.searcher().search(new TermQuery(new Term("f", "2")), 10);
  294. assertEquals("shard " + shard.routingEntry() + " misses new version", 1, search.totalHits.value);
  295. }
  296. }
  297. }
  298. }
  299. public void testReplicaTermIncrementWithConcurrentPrimaryPromotion() throws Exception {
  300. Map<String, String> mappings =
  301. Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
  302. try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) {
  303. shards.startAll();
  304. long primaryPrimaryTerm = shards.getPrimary().getPendingPrimaryTerm();
  305. List<IndexShard> replicas = shards.getReplicas();
  306. IndexShard replica1 = replicas.get(0);
  307. IndexShard replica2 = replicas.get(1);
  308. shards.promoteReplicaToPrimary(replica1, (shard, listener) -> {});
  309. long newReplica1Term = replica1.getPendingPrimaryTerm();
  310. assertEquals(primaryPrimaryTerm + 1, newReplica1Term);
  311. assertEquals(primaryPrimaryTerm, replica2.getPendingPrimaryTerm());
  312. IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
  313. BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, replica1);
  314. CyclicBarrier barrier = new CyclicBarrier(2);
  315. Thread t1 = new Thread(() -> {
  316. try {
  317. barrier.await();
  318. indexOnReplica(replicationRequest, shards, replica2, newReplica1Term);
  319. } catch (IllegalStateException ise) {
  320. assertThat(ise.getMessage(), either(containsString("is too old"))
  321. .or(containsString("cannot be a replication target")).or(containsString("engine is closed")));
  322. } catch (Exception e) {
  323. throw new RuntimeException(e);
  324. }
  325. });
  326. Thread t2 = new Thread(() -> {
  327. try {
  328. barrier.await();
  329. shards.promoteReplicaToPrimary(replica2).get();
  330. } catch (Exception e) {
  331. throw new RuntimeException(e);
  332. }
  333. });
  334. t2.start();
  335. t1.start();
  336. t1.join();
  337. t2.join();
  338. assertEquals(newReplica1Term + 1, replica2.getPendingPrimaryTerm());
  339. }
  340. }
  341. public void testReplicaOperationWithConcurrentPrimaryPromotion() throws Exception {
  342. Map<String, String> mappings =
  343. Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}");
  344. try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1, mappings))) {
  345. shards.startAll();
  346. long primaryPrimaryTerm = shards.getPrimary().getPendingPrimaryTerm();
  347. IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON);
  348. BulkShardRequest replicationRequest = indexOnPrimary(indexRequest, shards.getPrimary());
  349. List<IndexShard> replicas = shards.getReplicas();
  350. IndexShard replica = replicas.get(0);
  351. CyclicBarrier barrier = new CyclicBarrier(2);
  352. AtomicBoolean successFullyIndexed = new AtomicBoolean();
  353. Thread t1 = new Thread(() -> {
  354. try {
  355. barrier.await();
  356. indexOnReplica(replicationRequest, shards, replica, primaryPrimaryTerm);
  357. successFullyIndexed.set(true);
  358. } catch (IllegalStateException ise) {
  359. assertThat(ise.getMessage(), either(containsString("is too old"))
  360. .or(containsString("cannot be a replication target")).or(containsString("engine is closed")));
  361. } catch (Exception e) {
  362. throw new RuntimeException(e);
  363. }
  364. });
  365. Thread t2 = new Thread(() -> {
  366. try {
  367. barrier.await();
  368. shards.promoteReplicaToPrimary(replica).get();
  369. } catch (Exception e) {
  370. throw new RuntimeException(e);
  371. }
  372. });
  373. t2.start();
  374. t1.start();
  375. t1.join();
  376. t2.join();
  377. assertEquals(primaryPrimaryTerm + 1, replica.getPendingPrimaryTerm());
  378. if (successFullyIndexed.get()) {
  379. try(Translog.Snapshot snapshot = getTranslog(replica).newSnapshot()) {
  380. assertThat(snapshot.totalOperations(), equalTo(1));
  381. Translog.Operation op = snapshot.next();
  382. assertThat(op.primaryTerm(), equalTo(primaryPrimaryTerm));
  383. }
  384. }
  385. }
  386. }
  387. /**
  388. * test document failures (failures after seq_no generation) are added as noop operation to the translog
  389. * for primary and replica shards
  390. */
  391. public void testDocumentFailureReplication() throws Exception {
  392. final IOException indexException = new IOException("simulated indexing failure");
  393. final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
  394. new IndexWriter(dir, iwc) {
  395. @Override
  396. public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
  397. boolean isTombstone = false;
  398. for (IndexableField field : doc) {
  399. if (SeqNoFieldMapper.TOMBSTONE_NAME.equals(field.name())) {
  400. isTombstone = true;
  401. }
  402. }
  403. if (isTombstone) {
  404. return super.addDocument(doc); // allow to add Noop
  405. } else {
  406. throw indexException;
  407. }
  408. }
  409. }, null, null, config);
  410. try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
  411. @Override
  412. protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) {
  413. // start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary.
  414. shards.startPrimary();
  415. long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
  416. List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
  417. BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
  418. assertThat(indexResp.isFailed(), equalTo(true));
  419. assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
  420. expectedTranslogOps.add(new Translog.NoOp(0, primaryTerm, indexException.toString()));
  421. try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
  422. assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
  423. }
  424. shards.assertAllEqual(0);
  425. int nReplica = randomIntBetween(1, 3);
  426. for (int i = 0; i < nReplica; i++) {
  427. shards.addReplica();
  428. }
  429. shards.startReplicas(nReplica);
  430. for (IndexShard shard : shards) {
  431. try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
  432. assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
  433. }
  434. try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
  435. assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
  436. }
  437. }
  438. // the failure replicated directly from the replication channel.
  439. indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
  440. assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
  441. expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
  442. for (IndexShard shard : shards) {
  443. try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
  444. assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
  445. }
  446. try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
  447. assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
  448. }
  449. }
  450. shards.assertAllEqual(0);
  451. }
  452. }
  453. /**
  454. * test request failures (failures before seq_no generation) are not added as a noop to translog
  455. */
  456. public void testRequestFailureReplication() throws Exception {
  457. try (ReplicationGroup shards = createGroup(0)) {
  458. shards.startAll();
  459. BulkItemResponse response = shards.index(
  460. new IndexRequest(index.getName(), "type", "1")
  461. .source("{}", XContentType.JSON)
  462. .version(2)
  463. );
  464. assertTrue(response.isFailed());
  465. assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class));
  466. shards.assertAllEqual(0);
  467. for (IndexShard indexShard : shards) {
  468. assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog",
  469. indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0));
  470. }
  471. // add some replicas
  472. int nReplica = randomIntBetween(1, 3);
  473. for (int i = 0; i < nReplica; i++) {
  474. shards.addReplica();
  475. }
  476. shards.startReplicas(nReplica);
  477. response = shards.index(
  478. new IndexRequest(index.getName(), "type", "1")
  479. .source("{}", XContentType.JSON)
  480. .version(2)
  481. );
  482. assertTrue(response.isFailed());
  483. assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class));
  484. shards.assertAllEqual(0);
  485. for (IndexShard indexShard : shards) {
  486. assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog",
  487. indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0));
  488. }
  489. }
  490. }
  491. public void testSeqNoCollision() throws Exception {
  492. try (ReplicationGroup shards = createGroup(2)) {
  493. shards.startAll();
  494. int initDocs = shards.indexDocs(randomInt(10));
  495. List<IndexShard> replicas = shards.getReplicas();
  496. IndexShard replica1 = replicas.get(0);
  497. IndexShard replica2 = replicas.get(1);
  498. shards.syncGlobalCheckpoint();
  499. logger.info("--> Isolate replica1");
  500. IndexRequest indexDoc1 = new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON);
  501. BulkShardRequest replicationRequest = indexOnPrimary(indexDoc1, shards.getPrimary());
  502. indexOnReplica(replicationRequest, shards, replica2);
  503. final Translog.Operation op1;
  504. final List<Translog.Operation> initOperations = new ArrayList<>(initDocs);
  505. try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
  506. assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
  507. for (int i = 0; i < initDocs; i++) {
  508. Translog.Operation op = snapshot.next();
  509. assertThat(op, is(notNullValue()));
  510. initOperations.add(op);
  511. }
  512. op1 = snapshot.next();
  513. assertThat(op1, notNullValue());
  514. assertThat(snapshot.next(), nullValue());
  515. assertThat(snapshot.skippedOperations(), equalTo(0));
  516. }
  517. // Make sure that replica2 receives translog ops (eg. op2) from replica1
  518. // and does not overwrite its stale operation (op1) as it is trimmed.
  519. logger.info("--> Promote replica1 as the primary");
  520. shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed.
  521. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON));
  522. final Translog.Operation op2;
  523. try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) {
  524. assertThat(snapshot.totalOperations(), equalTo(initDocs + 2));
  525. op2 = snapshot.next();
  526. assertThat(op2.seqNo(), equalTo(op1.seqNo()));
  527. assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm()));
  528. assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations));
  529. assertThat(snapshot.skippedOperations(), equalTo(1));
  530. }
  531. // Make sure that peer-recovery transfers all but non-overridden operations.
  532. IndexShard replica3 = shards.addReplica();
  533. logger.info("--> Promote replica2 as the primary");
  534. shards.promoteReplicaToPrimary(replica2).get();
  535. logger.info("--> Recover replica3 from replica2");
  536. recoverReplica(replica3, replica2, true);
  537. try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
  538. assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
  539. final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
  540. expectedOps.add(op2);
  541. assertThat(snapshot, containsOperationsInAnyOrder(expectedOps));
  542. assertThat("Peer-recovery should not send overridden operations", snapshot.skippedOperations(), equalTo(0));
  543. }
  544. shards.assertAllEqual(initDocs + 1);
  545. }
  546. }
  547. /**
  548. * This test ensures the consistency between primary and replica with late and out of order delivery on the replica.
  549. * An index operation on the primary is followed by a delete operation. The delete operation is delivered first
  550. * and processed on the replica but the index is delayed with an interval that is even longer the gc deletes cycle.
  551. * This makes sure that that replica still remembers the delete operation and correctly ignores the stale index operation.
  552. */
  553. public void testLateDeliveryAfterGCTriggeredOnReplica() throws Exception {
  554. ThreadPool.terminate(this.threadPool, 10, TimeUnit.SECONDS);
  555. this.threadPool = new TestThreadPool(getClass().getName(),
  556. Settings.builder().put(threadPoolSettings()).put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0).build());
  557. try (ReplicationGroup shards = createGroup(1)) {
  558. shards.startAll();
  559. final IndexShard primary = shards.getPrimary();
  560. final IndexShard replica = shards.getReplicas().get(0);
  561. final TimeValue gcInterval = TimeValue.timeValueMillis(between(1, 10));
  562. // I think we can just set this to something very small (10ms?) and also set ThreadPool#ESTIMATED_TIME_INTERVAL_SETTING to 0?
  563. updateGCDeleteCycle(replica, gcInterval);
  564. final BulkShardRequest indexRequest = indexOnPrimary(
  565. new IndexRequest(index.getName(), "type", "d1").source("{}", XContentType.JSON), primary);
  566. final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", "d1"), primary);
  567. deleteOnReplica(deleteRequest, shards, replica); // delete arrives on replica first.
  568. final long deleteTimestamp = threadPool.relativeTimeInMillis();
  569. replica.refresh("test");
  570. assertBusy(() ->
  571. assertThat(threadPool.relativeTimeInMillis() - deleteTimestamp, greaterThan(gcInterval.millis()))
  572. );
  573. getEngine(replica).maybePruneDeletes();
  574. indexOnReplica(indexRequest, shards, replica); // index arrives on replica lately.
  575. shards.assertAllEqual(0);
  576. }
  577. }
  578. private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) {
  579. IndexMetaData.Builder builder = IndexMetaData.builder(shard.indexSettings().getIndexMetaData());
  580. builder.settings(Settings.builder()
  581. .put(shard.indexSettings().getSettings())
  582. .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), interval.getStringRep())
  583. );
  584. shard.indexSettings().updateIndexMetaData(builder.build());
  585. shard.onSettingsChanged();
  586. }
  587. /**
  588. * This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
  589. * of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
  590. * deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
  591. */
  592. public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
  593. try (ReplicationGroup shards = createGroup(1)) {
  594. shards.startAll();
  595. final IndexShard primary = shards.getPrimary();
  596. final IndexShard replica = shards.getReplicas().get(0);
  597. // Append-only request - without id
  598. final BulkShardRequest indexRequest = indexOnPrimary(
  599. new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
  600. final String docId = Iterables.get(getShardDocUIDs(primary), 0);
  601. final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
  602. deleteOnReplica(deleteRequest, shards, replica);
  603. indexOnReplica(indexRequest, shards, replica);
  604. shards.assertAllEqual(0);
  605. }
  606. }
  607. }