Browse Source

Fix DataStreamIT#testWriteLoadAndAvgShardSizeIsStoredInABestEffort (#91655)

Closes #91384
Francisco Fernández Castaño 2 năm trước cách đây
mục cha
commit
05d2bc29de

+ 30 - 24
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -100,6 +100,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -2024,24 +2025,7 @@ public class DataStreamIT extends ESIntegTestCase {
         final var request = new CreateDataStreamAction.Request(dataStreamName);
         assertAcked(client().execute(CreateDataStreamAction.INSTANCE, request).actionGet());
 
-        assertBusy(() -> {
-            for (int i = 0; i < 10; i++) {
-                indexDocs(dataStreamName, randomIntBetween(100, 200));
-            }
-
-            final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
-            final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
-            final String writeIndex = dataStream.getWriteIndex().getName();
-            final IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(writeIndex).get();
-            for (IndexShardStats indexShardStats : indicesStatsResponse.getIndex(writeIndex).getIndexShards().values()) {
-                for (ShardStats shard : indexShardStats.getShards()) {
-                    final IndexingStats.Stats shardIndexingStats = shard.getStats().getIndexing().getTotal();
-                    // Ensure that we have enough clock granularity before rolling over to ensure that we capture _some_ write load
-                    assertThat(shardIndexingStats.getTotalActiveTimeInMillis(), is(greaterThan(0L)));
-                    assertThat(shardIndexingStats.getWriteLoad(), is(greaterThan(0.0)));
-                }
-            }
-        });
+        indexDocsAndEnsureThereIsCapturedWriteLoad(dataStreamName);
 
         assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
         final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
@@ -2078,6 +2062,7 @@ public class DataStreamIT extends ESIntegTestCase {
         // - We want to simulate two possible cases here:
         // - All the assigned nodes for shard 0 will fail to respond to the IndicesStatsRequest
         // - Only the shard 1 replica will respond successfully to the IndicesStatsRequest ensuring that we fall back in that case
+        // (only if it's not co-located with some other shard copies)
 
         final List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(4);
         final String dataStreamName = "logs-es";
@@ -2091,20 +2076,20 @@ public class DataStreamIT extends ESIntegTestCase {
         final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
         assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());
 
-        for (int i = 0; i < 10; i++) {
-            indexDocs(dataStreamName, randomIntBetween(100, 200));
-        }
+        indexDocsAndEnsureThereIsCapturedWriteLoad(dataStreamName);
 
         final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
         final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
         final IndexRoutingTable currentDataStreamWriteIndexRoutingTable = clusterStateBeforeRollover.routingTable()
             .index(dataStreamBeforeRollover.getWriteIndex());
 
-        final List<String> failingIndicesStatsNodeIds = new ArrayList<>();
+        final Set<String> failingIndicesStatsNodeIds = new HashSet<>();
         for (ShardRouting shardRouting : currentDataStreamWriteIndexRoutingTable.shard(0).assignedShards()) {
             failingIndicesStatsNodeIds.add(shardRouting.currentNodeId());
         }
         failingIndicesStatsNodeIds.add(currentDataStreamWriteIndexRoutingTable.shard(1).primaryShard().currentNodeId());
+        final String shard1ReplicaNodeId = currentDataStreamWriteIndexRoutingTable.shard(1).replicaShards().get(0).currentNodeId();
+        final boolean shard1ReplicaIsAllocatedInAReachableNode = failingIndicesStatsNodeIds.contains(shard1ReplicaNodeId) == false;
 
         for (String nodeId : failingIndicesStatsNodeIds) {
             String nodeName = clusterStateBeforeRollover.nodes().resolveNode(nodeId).getName();
@@ -2114,7 +2099,6 @@ public class DataStreamIT extends ESIntegTestCase {
                 (handler, request, channel, task) -> channel.sendResponse(new RuntimeException("Unable to get stats"))
             );
         }
-        assertThat(failingIndicesStatsNodeIds.size(), is(equalTo(3)));
 
         assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
         final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
@@ -2124,7 +2108,8 @@ public class DataStreamIT extends ESIntegTestCase {
             final IndexMetadata indexMetadata = clusterState.metadata().index(index);
             final IndexMetadataStats metadataStats = indexMetadata.getStats();
 
-            if (index.equals(dataStream.getWriteIndex()) == false) {
+            // If all the shards are co-located within the failing nodes, no stats will be stored during rollover
+            if (index.equals(dataStream.getWriteIndex()) == false && shard1ReplicaIsAllocatedInAReachableNode) {
                 assertThat(metadataStats, is(notNullValue()));
 
                 final IndexWriteLoad indexWriteLoad = metadataStats.writeLoad();
@@ -2247,6 +2232,27 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(forecastedShardSizeInBytes.getAsLong(), is(equalTo(expectedTotalSizeInBytes / shardCount)));
     }
 
+    private void indexDocsAndEnsureThereIsCapturedWriteLoad(String dataStreamName) throws Exception {
+        assertBusy(() -> {
+            for (int i = 0; i < 10; i++) {
+                indexDocs(dataStreamName, randomIntBetween(100, 200));
+            }
+
+            final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
+            final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+            final String writeIndex = dataStream.getWriteIndex().getName();
+            final IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(writeIndex).get();
+            for (IndexShardStats indexShardStats : indicesStatsResponse.getIndex(writeIndex).getIndexShards().values()) {
+                for (ShardStats shard : indexShardStats.getShards()) {
+                    final IndexingStats.Stats shardIndexingStats = shard.getStats().getIndexing().getTotal();
+                    // Ensure that we have enough clock granularity before rolling over to ensure that we capture _some_ write load
+                    assertThat(shardIndexingStats.getTotalActiveTimeInMillis(), is(greaterThan(0L)));
+                    assertThat(shardIndexingStats.getWriteLoad(), is(greaterThan(0.0)));
+                }
+            }
+        });
+    }
+
     static void putComposableIndexTemplate(
         String id,
         @Nullable String mappings,