1
0
Эх сурвалжийг харах

Increase timeout in testFollowIndexWithConcurrentMappingChanges (#60534)

The test failed because the leader was taking a lot of CPUs to process 
many mapping updates. This commit reduces the mapping updates, increases
timeout, and adds more debug info.

Closes #59832
Nhat Nguyen 5 жил өмнө
parent
commit
c87050c1ca

+ 11 - 28
x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

@@ -248,38 +248,23 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         final int firstBatchNumDocs = randomIntBetween(2, 64);
         logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
         for (int i = 0; i < firstBatchNumDocs; i++) {
-            final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
-            leaderClient().prepareIndex("index1").setId(Integer.toString(i)).setSource(source, XContentType.JSON).get();
+            leaderClient().prepareIndex("index1").setId(Integer.toString(i)).setSource("f", i).get();
         }
 
         AtomicBoolean isRunning = new AtomicBoolean(true);
 
         // Concurrently index new docs with mapping changes
+        int numFields = between(10, 20);
         Thread thread = new Thread(() -> {
-            int docID = 10000;
-            char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
-            for (char c : chars) {
+            int numDocs = between(10, 200);
+            for (int i = 0; i < numDocs; i++) {
                 if (isRunning.get() == false) {
                     break;
                 }
-                final String source;
-                long valueToPutInDoc = randomLongBetween(0, 50000);
-                if (randomBoolean()) {
-                    source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc);
-                } else {
-                    source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc);
-                }
-                for (int i = 1; i < 10; i++) {
-                    if (isRunning.get() == false) {
-                        break;
-                    }
-                    leaderClient().prepareIndex("index1").setId(Long.toString(docID++)).setSource(source, XContentType.JSON).get();
-                    if (rarely()) {
-                        leaderClient().admin().indices().prepareFlush("index1").setForce(true).get();
-                    }
-                }
-                if (between(0, 100) < 20) {
-                    leaderClient().admin().indices().prepareFlush("index1").setForce(false).setWaitIfOngoing(false).get();
+                final String field = "f-" + between(1, numFields);
+                leaderClient().prepareIndex("index1").setSource(field, between(0, 1000)).get();
+                if (rarely()) {
+                    leaderClient().admin().indices().prepareFlush("index1").setWaitIfOngoing(false).setForce(false).get();
                 }
             }
         });
@@ -297,16 +282,14 @@ public class IndexFollowingIT extends CcrIntegTestCase {
         final int secondBatchNumDocs = randomIntBetween(2, 64);
         logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
         for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
-            final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
-            leaderClient().prepareIndex("index1").setId(Integer.toString(i)).setSource(source, XContentType.JSON).get();
+            leaderClient().prepareIndex("index1").setId(Integer.toString(i)).setSource("f", i).get();
         }
-
-        for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
+        for (int i = 0; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
             assertBusy(assertExpectedDocumentRunnable(i), 1, TimeUnit.MINUTES);
         }
-
         isRunning.set(false);
         thread.join();
+        assertIndexFullyReplicatedToFollower("index1", "index2");
     }
 
     public void testFollowIndexWithoutWaitForComplete() throws Exception {

+ 21 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

@@ -10,6 +10,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
@@ -355,7 +356,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
 
     protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) {
         logger.info("ensure green follower indices {}", Arrays.toString(indices));
-        return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30),
+        return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(60),
             waitForNoInitializingShards, indices);
     }
 
@@ -377,10 +378,21 @@ public abstract class CcrIntegTestCase extends ESTestCase {
 
         ClusterHealthResponse actionGet = testCluster.client().admin().cluster().health(healthRequest).actionGet();
         if (actionGet.isTimedOut()) {
-            logger.info("{} timed out, cluster state:\n{}\n{}",
+            logger.info("{} timed out: " +
+                    "\nleader cluster state:\n{}" +
+                    "\nleader cluster hot threads:\n{}" +
+                    "\nleader cluster tasks:\n{}" +
+                    "\nfollower cluster state:\n{}" +
+                    "\nfollower cluster hot threads:\n{}" +
+                    "\nfollower cluster tasks:\n{}",
                 method,
-                testCluster.client().admin().cluster().prepareState().get().getState(),
-                testCluster.client().admin().cluster().preparePendingClusterTasks().get());
+                leaderClient().admin().cluster().prepareState().get().getState(),
+                getHotThreads(leaderClient()),
+                leaderClient().admin().cluster().preparePendingClusterTasks().get(),
+                followerClient().admin().cluster().prepareState().get().getState(),
+                getHotThreads(followerClient()),
+                followerClient().admin().cluster().preparePendingClusterTasks().get()
+            );
             fail("timed out waiting for " + color + " state");
         }
         assertThat("Expected at least " + clusterHealthStatus + " but got " + actionGet.getStatus(),
@@ -389,6 +401,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
         return actionGet.getStatus();
     }
 
+    static String getHotThreads(Client client) {
+        return client.admin().cluster().prepareNodesHotThreads().setThreads(99999).setIgnoreIdleThreads(false)
+            .get().getNodes().stream().map(NodeHotThreads::getHotThreads).collect(Collectors.joining("\n"));
+    }
+
     protected final Index resolveLeaderIndex(String index) {
         GetIndexResponse getIndexResponse = leaderClient().admin().indices().prepareGetIndex().setIndices(index).get();
         assertTrue("index " + index + " not found", getIndexResponse.getSettings().containsKey(index));