Преглед изворни кода

Remove concurrency in testFailLeaderReplicaShard (#36607)

testFailLeaderReplicaShard periodically fails because we concurrently
index to the leader group and close one of its replicas. If a
replication request hits a closing shard, we will fail that shard;
however, failing a shard is supported by the test framework - this makes
the test fail.
Nhat Nguyen пре 6 година
родитељ
комит
a4b32f1143

+ 20 - 23
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

@@ -106,7 +106,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
         }
     }
 
-    public void testFailLeaderReplicaShard() throws Exception {
+    public void testAddRemoveShardOnLeader() throws Exception {
         try (ReplicationGroup leaderGroup = createGroup(1 + randomInt(1));
              ReplicationGroup followerGroup = createFollowGroup(randomInt(2))) {
             leaderGroup.startAll();
@@ -120,31 +120,28 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
                     leaderSeqNoStats.getMaxSeqNo(),
                     followerSeqNoStats.getGlobalCheckpoint(),
                     followerSeqNoStats.getMaxSeqNo());
-            int docCount = 256;
-            leaderGroup.appendDocs(1);
-            Runnable task = () -> {
-                try {
-                    leaderGroup.appendDocs(docCount - 1);
-                    leaderGroup.syncGlobalCheckpoint();
-                } catch (Exception e) {
-                    throw new AssertionError(e);
+            int batches = between(0, 10);
+            int docCount = 0;
+            for (int i = 0; i < batches; i++) {
+                docCount += leaderGroup.indexDocs(between(1, 5));
+                if (leaderGroup.getReplicas().isEmpty() == false && randomInt(100) < 5) {
+                    IndexShard closingReplica = randomFrom(leaderGroup.getReplicas());
+                    leaderGroup.removeReplica(closingReplica);
+                    closingReplica.close("test", false);
+                    closingReplica.store().close();
+                } else if (leaderGroup.getReplicas().isEmpty() == false && rarely()) {
+                    IndexShard newPrimary = randomFrom(leaderGroup.getReplicas());
+                    leaderGroup.promoteReplicaToPrimary(newPrimary).get();
+                } else if (randomInt(100) < 5) {
+                    leaderGroup.addReplica();
+                    leaderGroup.startReplicas(1);
                 }
-            };
-            Thread thread = new Thread(task);
-            thread.start();
-
-            // Remove and add a new replica
-            IndexShard luckyReplica = randomFrom(leaderGroup.getReplicas());
-            leaderGroup.removeReplica(luckyReplica);
-            luckyReplica.close("stop replica", false);
-            luckyReplica.store().close();
-            leaderGroup.addReplica();
-            leaderGroup.startReplicas(1);
-            thread.join();
-
+                leaderGroup.syncGlobalCheckpoint();
+            }
             leaderGroup.assertAllEqual(docCount);
             assertThat(shardFollowTask.getFailure(), nullValue());
-            assertBusy(() -> followerGroup.assertAllEqual(docCount));
+            int expectedDoc = docCount;
+            assertBusy(() -> followerGroup.assertAllEqual(expectedDoc));
             shardFollowTask.markAsCompleted();
             assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
         }