瀏覽代碼

Adjust log and unmute testFailOverOnFollower (#38762)

There were two documents (seq=2 and seq=103) missing on the follower in
one of the failures of `testFailOverOnFollower`. I spent several hours
on that failure but could not figure out the reason. I adjust log and
unmute this test so we can collect more information.

Relates #38633
Nhat Nguyen 6 年之前
父節點
當前提交
d0035300d6

+ 2 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -779,7 +779,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         try {
             if (logger.isTraceEnabled()) {
                 // don't use index.source().utf8ToString() here source might not be valid UTF-8
-                logger.trace("index [{}][{}] (seq# [{}])",  index.type(), index.id(), index.seqNo());
+                logger.trace("index [{}][{}] seq# [{}] allocation-id {}",
+                    index.type(), index.id(), index.seqNo(), routingEntry().allocationId());
             }
             result = engine.index(index);
         } catch (Exception e) {

+ 3 - 2
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -997,8 +997,9 @@ public abstract class EngineTestCase extends ESTestCase {
                     }
                 }
             }
-            docs.sort(Comparator.comparing(DocIdSeqNoAndTerm::getId)
-                .thenComparingLong(DocIdSeqNoAndTerm::getSeqNo).thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm));
+            docs.sort(Comparator.comparingLong(DocIdSeqNoAndTerm::getSeqNo)
+                .thenComparingLong(DocIdSeqNoAndTerm::getPrimaryTerm)
+                .thenComparing((DocIdSeqNoAndTerm::getId)));
             return docs;
         }
     }

+ 17 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.env.NodeEnvironment;
@@ -451,8 +452,18 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         logger.info("--> asserting <<docId,seqNo>> between {} and {}", leaderIndex, followerIndex);
         assertBusy(() -> {
             Map<Integer, List<DocIdSeqNoAndTerm>> docsOnFollower = getDocIdAndSeqNos(clusterGroup.followerCluster, followerIndex);
-            logger.info("--> docs on the follower {}", docsOnFollower);
-            assertThat(docsOnFollower, equalTo(getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex)));
+            Map<Integer, List<DocIdSeqNoAndTerm>> docsOnLeader = getDocIdAndSeqNos(clusterGroup.leaderCluster, leaderIndex);
+            Map<Integer, Set<DocIdSeqNoAndTerm>> mismatchedDocs = new HashMap<>();
+            for (Map.Entry<Integer, List<DocIdSeqNoAndTerm>> fe : docsOnFollower.entrySet()) {
+                Set<DocIdSeqNoAndTerm> d1 = Sets.difference(
+                    Sets.newHashSet(fe.getValue()), Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())));
+                Set<DocIdSeqNoAndTerm> d2 = Sets.difference(
+                    Sets.newHashSet(docsOnLeader.getOrDefault(fe.getKey(), Collections.emptyList())), Sets.newHashSet(fe.getValue()));
+                if (d1.isEmpty() == false || d2.isEmpty() == false) {
+                    mismatchedDocs.put(fe.getKey(), Sets.union(d1, d2));
+                }
+            }
+            assertThat("mismatched documents [" + mismatchedDocs + "]", docsOnFollower, equalTo(docsOnLeader));
         }, 120, TimeUnit.SECONDS);
 
         logger.info("--> asserting seq_no_stats between {} and {}", leaderIndex, followerIndex);
@@ -481,13 +492,15 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         Randomness.shuffle(shardRoutings);
         final Map<Integer, List<DocIdSeqNoAndTerm>> docs = new HashMap<>();
         for (ShardRouting shardRouting : shardRoutings) {
-            if (shardRouting == null || shardRouting.assignedToNode() == false || docs.containsKey(shardRouting.shardId().id())) {
+            if (shardRouting == null || shardRouting.assignedToNode() == false) {
                 continue;
             }
             IndexShard indexShard = cluster.getInstance(IndicesService.class, state.nodes().get(shardRouting.currentNodeId()).getName())
                 .indexServiceSafe(shardRouting.index()).getShard(shardRouting.id());
             try {
-                docs.put(shardRouting.shardId().id(), IndexShardTestCase.getDocIdAndSeqNos(indexShard).stream()
+                final List<DocIdSeqNoAndTerm> docsOnShard = IndexShardTestCase.getDocIdAndSeqNos(indexShard);
+                logger.info("--> shard {} docs {} seq_no_stats {}", shardRouting, docsOnShard, indexShard.seqNoStats());
+                docs.put(shardRouting.shardId().id(), docsOnShard.stream()
                     // normalize primary term as the follower use its own term
                     .map(d -> new DocIdSeqNoAndTerm(d.getId(), d.getSeqNo(), 1L))
                     .collect(Collectors.toList()));

+ 18 - 16
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java

@@ -45,7 +45,8 @@ import static java.util.Collections.singletonMap;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.equalTo;
 
-@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.index.shard:DEBUG")
+@TestLogging("org.elasticsearch.xpack.ccr:TRACE,org.elasticsearch.xpack.ccr.action.ShardChangesAction:DEBUG,"
+    + "org.elasticsearch.index.shard:TRACE")
 public class FollowerFailOverIT extends CcrIntegTestCase {
 
     @Override
@@ -53,14 +54,15 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
         return false;
     }
 
-    @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38633")
     public void testFailOverOnFollower() throws Exception {
+        final String leaderIndex = "leader_test_failover";
+        final String followerIndex = "follower_test_failover";
         int numberOfReplicas = between(1, 2);
         getFollowerCluster().startMasterOnlyNode();
         getFollowerCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
         String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
             singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
-        assertAcked(leaderClient().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
+        assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
         AtomicBoolean stopped = new AtomicBoolean();
         Thread[] threads = new Thread[between(1, 8)];
         AtomicInteger docID = new AtomicInteger();
@@ -77,20 +79,20 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
                     }
                     if (frequently()) {
                         String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
-                        IndexResponse indexResponse = leaderClient().prepareIndex("leader-index", "doc", id)
+                        IndexResponse indexResponse = leaderClient().prepareIndex(leaderIndex, "doc", id)
                             .setSource("{\"f\":" + id + "}", XContentType.JSON).get();
-                        logger.info("--> index id={} seq_no={}", indexResponse.getId(), indexResponse.getSeqNo());
+                        logger.info("--> index {} id={} seq_no={}", leaderIndex, indexResponse.getId(), indexResponse.getSeqNo());
                     } else {
                         String id = Integer.toString(between(0, docID.get()));
-                        DeleteResponse deleteResponse = leaderClient().prepareDelete("leader-index", "doc", id).get();
-                        logger.info("--> delete id={} seq_no={}", deleteResponse.getId(), deleteResponse.getSeqNo());
+                        DeleteResponse deleteResponse = leaderClient().prepareDelete(leaderIndex, "doc", id).get();
+                        logger.info("--> delete {} id={} seq_no={}", leaderIndex, deleteResponse.getId(), deleteResponse.getSeqNo());
                     }
                 }
             });
             threads[i].start();
         }
         availableDocs.release(between(100, 200));
-        PutFollowAction.Request follow = putFollow("leader-index", "follower-index");
+        PutFollowAction.Request follow = putFollow(leaderIndex, followerIndex);
         follow.getParameters().setMaxReadRequestOperationCount(randomIntBetween(32, 2048));
         follow.getParameters().setMaxReadRequestSize(new ByteSizeValue(randomIntBetween(1, 4096), ByteSizeUnit.KB));
         follow.getParameters().setMaxOutstandingReadRequests(randomIntBetween(1, 10));
@@ -99,11 +101,11 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
         follow.getParameters().setMaxOutstandingWriteRequests(randomIntBetween(1, 10));
         logger.info("--> follow request {}", Strings.toString(follow));
         followerClient().execute(PutFollowAction.INSTANCE, follow).get();
-        disableDelayedAllocation("follower-index");
-        ensureFollowerGreen("follower-index");
-        awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(30, 80));
+        disableDelayedAllocation(followerIndex);
+        ensureFollowerGreen(followerIndex);
+        awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(30, 80));
         final ClusterState clusterState = getFollowerCluster().clusterService().state();
-        for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
+        for (ShardRouting shardRouting : clusterState.routingTable().allShards(followerIndex)) {
             if (shardRouting.primary()) {
                 DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
                 getFollowerCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
@@ -111,15 +113,15 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
             }
         }
         availableDocs.release(between(50, 200));
-        ensureFollowerGreen("follower-index");
+        ensureFollowerGreen(followerIndex);
         availableDocs.release(between(50, 200));
-        awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex("follower-index"), 0), between(100, 150));
+        awaitGlobalCheckpointAtLeast(followerClient(), new ShardId(resolveFollowerIndex(followerIndex), 0), between(100, 150));
         stopped.set(true);
         for (Thread thread : threads) {
             thread.join();
         }
-        assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
-        pauseFollow("follower-index");
+        assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
+        pauseFollow(followerIndex);
     }
 
     @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33337")