Преглед на файлове

Simulate impact of shard movement using shard-level write load (#131406)

Co-authored-by: Dianna Hohensee <artemisapple@gmail.com>
Nick Tindall преди 2 месеца
родител
ревизия
ab2e65456b

+ 5 - 3
server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

@@ -10,6 +10,7 @@
 package org.elasticsearch.cluster;
 
 import org.elasticsearch.cluster.ClusterInfo.NodeAndShard;
+import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.common.util.CopyOnFirstWriteMap;
@@ -34,7 +35,7 @@ public class ClusterInfoSimulator {
     private final Map<ShardId, Long> shardDataSetSizes;
     private final Map<NodeAndShard, String> dataPath;
     private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
-    private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
+    private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;
 
     public ClusterInfoSimulator(RoutingAllocation allocation) {
         this.allocation = allocation;
@@ -44,7 +45,7 @@ public class ClusterInfoSimulator {
         this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
         this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
         this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
-        this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
+        this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
     }
 
     /**
@@ -115,6 +116,7 @@ public class ClusterInfoSimulator {
                 shardSizes.put(shardIdentifierFromRouting(shard), project.getIndexSafe(shard.index()).ignoreDiskWatermarks() ? 0 : size);
             }
         }
+        shardMovementWriteLoadSimulator.simulateShardStarted(shard);
     }
 
     private void modifyDiskUsage(String nodeId, long freeDelta) {
@@ -159,7 +161,7 @@ public class ClusterInfoSimulator {
             dataPath,
             Map.of(),
             estimatedHeapUsages,
-            nodeThreadPoolUsageStats,
+            shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
             allocation.clusterInfo().getShardWriteLoads()
         );
     }

+ 98 - 0
server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java

@@ -0,0 +1,98 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import com.carrotsearch.hppc.ObjectDoubleHashMap;
+import com.carrotsearch.hppc.ObjectDoubleMap;
+
+import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.util.Maps;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Simulates the impact to each node's write-load in response to the movement of individual
+ * shards around the cluster.
+ */
+public class ShardMovementWriteLoadSimulator {
+
+    private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools;
+    private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas;
+    private final Map<ShardId, Double> writeLoadsPerShard;
+
+    public ShardMovementWriteLoadSimulator(RoutingAllocation routingAllocation) {
+        this.originalNodeUsageStatsForThreadPools = routingAllocation.clusterInfo().getNodeUsageStatsForThreadPools();
+        this.writeLoadsPerShard = routingAllocation.clusterInfo().getShardWriteLoads();
+        this.simulatedNodeWriteLoadDeltas = new ObjectDoubleHashMap<>();
+    }
+
+    public void simulateShardStarted(ShardRouting shardRouting) {
+        final Double writeLoadForShard = writeLoadsPerShard.get(shardRouting.shardId());
+        if (writeLoadForShard != null) {
+            if (shardRouting.relocatingNodeId() != null) {
+                assert shardRouting.state() == ShardRoutingState.INITIALIZING
+                    : "This should only be happening on the destination node (the source node will have status RELOCATING)";
+                // This is a shard being relocated
+                simulatedNodeWriteLoadDeltas.addTo(shardRouting.relocatingNodeId(), -1 * writeLoadForShard);
+                simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
+            } else {
+                // This is a new shard starting, it's unlikely we'll have a write-load value for a new
+                // shard, but we may be able to estimate if the new shard is created as part of a datastream
+                // rollover. See https://elasticco.atlassian.net/browse/ES-12469
+                simulatedNodeWriteLoadDeltas.addTo(shardRouting.currentNodeId(), writeLoadForShard);
+            }
+        }
+    }
+
+    /**
+     * Apply the simulated shard movements to the original thread pool usage stats for each node.
+     */
+    public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThreadPools() {
+        final Map<String, NodeUsageStatsForThreadPools> adjustedNodeUsageStatsForThreadPools = Maps.newMapWithExpectedSize(
+            originalNodeUsageStatsForThreadPools.size()
+        );
+        for (Map.Entry<String, NodeUsageStatsForThreadPools> entry : originalNodeUsageStatsForThreadPools.entrySet()) {
+            if (simulatedNodeWriteLoadDeltas.containsKey(entry.getKey())) {
+                var adjustedValue = new NodeUsageStatsForThreadPools(
+                    entry.getKey(),
+                    Maps.copyMapWithAddedOrReplacedEntry(
+                        entry.getValue().threadPoolUsageStatsMap(),
+                        ThreadPool.Names.WRITE,
+                        replaceWritePoolStats(entry.getValue(), simulatedNodeWriteLoadDeltas.get(entry.getKey()))
+                    )
+                );
+                adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), adjustedValue);
+            } else {
+                adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools);
+    }
+
+    private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
+        NodeUsageStatsForThreadPools value,
+        double writeLoadDelta
+    ) {
+        final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
+            .get(ThreadPool.Names.WRITE);
+        return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
+            writeThreadPoolStats.totalThreadPoolThreads(),
+            (float) Math.max(
+                (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
+                0.0
+            ),
+            writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
+        );
+    }
+}

+ 207 - 0
server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java

@@ -0,0 +1,207 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
+import org.elasticsearch.cluster.ClusterInfo;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
+import org.elasticsearch.cluster.metadata.ProjectId;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
+import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matchers;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.sameInstance;
+
+public class ShardMovementWriteLoadSimulatorTests extends ESTestCase {
+
+    private static final RoutingChangesObserver NOOP = new RoutingChangesObserver() {
+    };
+    private static final String[] INDICES = { "indexOne", "indexTwo", "indexThree" };
+
+    /**
+     * We should not adjust the values if there's no movement
+     */
+    public void testNoShardMovement() {
+        final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
+        final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
+        final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
+            originalNode0ThreadPoolStats,
+            originalNode1ThreadPoolStats,
+            Set.of()
+        );
+
+        final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
+        final var calculatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
+        assertThat(calculatedNodeUsageStates, Matchers.aMapWithSize(2));
+        assertThat(
+            calculatedNodeUsageStates.get("node_0").threadPoolUsageStatsMap().get("write"),
+            sameInstance(originalNode0ThreadPoolStats)
+        );
+        assertThat(
+            calculatedNodeUsageStates.get("node_1").threadPoolUsageStatsMap().get("write"),
+            sameInstance(originalNode1ThreadPoolStats)
+        );
+    }
+
+    public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
+        final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
+        final var originalNode1ThreadPoolStats = randomThreadPoolUsageStats();
+        final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
+            originalNode0ThreadPoolStats,
+            originalNode1ThreadPoolStats,
+            Set.of()
+        );
+        final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
+
+        // Relocate a random shard from node_0 to node_1
+        final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
+        final var expectedShardSize = randomNonNegativeLong();
+        final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
+        shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
+        final ShardRouting movedAndStartedShard = allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
+
+        final var calculatedNodeUsageStats = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
+        assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2));
+
+        final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
+        final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads();
+        final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads();
+
+        // Some node_0 utilization should have been moved to node_1
+        if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) {
+            // We don't return utilization less than zero because that makes no sense
+            assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f));
+        } else {
+            assertThat(
+                (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization(
+                    shardMovementWriteLoadSimulator,
+                    "node_0"
+                ),
+                closeTo(expectedUtilisationReductionAtSource, 0.001f)
+            );
+        }
+        assertThat(
+            (double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats
+                .averageThreadPoolUtilization(),
+            closeTo(expectedUtilisationIncreaseAtDestination, 0.001f)
+        );
+
+        // Then move it back
+        final var moveBackTuple = allocation.routingNodes()
+            .relocateShard(movedAndStartedShard, "node_0", expectedShardSize, "testing", NOOP);
+        shardMovementWriteLoadSimulator.simulateShardStarted(moveBackTuple.v2());
+
+        // The utilization numbers should return to their original values
+        assertThat(
+            getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"),
+            equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization())
+        );
+        assertThat(
+            getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"),
+            equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization())
+        );
+    }
+
+    public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
+        final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
+        final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
+        final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
+            originalNode0ThreadPoolStats,
+            originalNode1ThreadPoolStats,
+            new HashSet<>(randomSubsetOf(Arrays.asList(INDICES)))
+        );
+        final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
+
+        // Relocate a random shard from node_0 to node_1
+        final var expectedShardSize = randomNonNegativeLong();
+        final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
+        final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
+        shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
+        allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);
+
+        final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
+        assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null));
+        assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
+    }
+
+    private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
+        final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
+        final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
+        return node0WritePoolStats.averageThreadPoolUtilization();
+    }
+
+    private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() {
+        return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
+            randomIntBetween(4, 16),
+            randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true),
+            randomLongBetween(0, 60_000)
+        );
+    }
+
+    private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
+        NodeUsageStatsForThreadPools.ThreadPoolUsageStats node0ThreadPoolStats,
+        NodeUsageStatsForThreadPools.ThreadPoolUsageStats node1ThreadPoolStats,
+        Set<String> indicesWithNoWriteLoad
+    ) {
+        final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
+        if (node0ThreadPoolStats != null) {
+            nodeUsageStats.put("node_0", new NodeUsageStatsForThreadPools("node_0", Map.of("write", node0ThreadPoolStats)));
+        }
+        if (node1ThreadPoolStats != null) {
+            nodeUsageStats.put("node_1", new NodeUsageStatsForThreadPools("node_1", Map.of("write", node1ThreadPoolStats)));
+        }
+
+        final ClusterState clusterState = createClusterState();
+        final ClusterInfo clusterInfo = ClusterInfo.builder()
+            .nodeUsageStatsForThreadPools(nodeUsageStats)
+            .shardWriteLoads(
+                clusterState.metadata()
+                    .getProject(ProjectId.DEFAULT)
+                    .stream()
+                    .filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
+                    .flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
+                    .collect(
+                        Collectors.toUnmodifiableMap(
+                            shardId -> shardId,
+                            shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true)
+                        )
+                    )
+            )
+            .build();
+
+        return new RoutingAllocation(
+            new AllocationDeciders(List.of()),
+            clusterState,
+            clusterInfo,
+            SnapshotShardSizeInfo.EMPTY,
+            System.nanoTime()
+        ).mutableCloneForSimulation();
+    }
+
+    private ClusterState createClusterState() {
+        return ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(INDICES, 3, 0);
+    }
+}