Parcourir la source

ES-10037 Configurable metrics in data stream auto-sharding (#125612)

This adds cluster settings to allow for a choice of write load metrics
in the data stream auto-sharding calculations. There are separate
settings for the increasing and decreasing calculations. Both default
to the existing 'all-time' metric for now.

This also refactors `DataStreamAutoShardingServiceTests`. The main two things done are:

 - Split large test methods which do several independent tests in
   blank code blocks into more smaller methods.

 - Fix an unnecessarily complicated pattern where the code would
   create a `Function` in a local variable and then immediately
   `apply` it exactly once... rather than just executing the code
   normally.
Pete Gillin il y a 6 mois
Parent
commit
f91f1323a8

+ 3 - 14
server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

@@ -278,23 +278,12 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
                 final IndexAbstraction indexAbstraction = projectMetadata.getIndicesLookup().get(resolvedRolloverTarget.resource());
                 if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
                     DataStream dataStream = (DataStream) indexAbstraction;
-                    final Optional<IndexStats> indexStats = Optional.ofNullable(statsResponse)
-                        .map(stats -> stats.getIndex(dataStream.getWriteIndex().getName()));
-
-                    Double indexWriteLoad = indexStats.map(
-                        stats -> Arrays.stream(stats.getShards())
-                            .filter(shardStats -> shardStats.getStats().indexing != null)
-                            // only take primaries into account as in stateful the replicas also index data
-                            .filter(shardStats -> shardStats.getShardRouting().primary())
-                            .map(shardStats -> shardStats.getStats().indexing.getTotal().getWriteLoad())
-                            .reduce(0.0, Double::sum)
-                    ).orElse(null);
-
-                    rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexWriteLoad);
+                    IndexStats indexStats = statsResponse != null ? statsResponse.getIndex(dataStream.getWriteIndex().getName()) : null;
+                    rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexStats);
                     logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding);
 
                     // if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no
-                    // other "regular" conditions matching (we want to aggressively increse the number of shards) so we're adding the
+                    // other "regular" conditions matching (we want to aggressively increase the number of shards) so we're adding the
                     // automatic {@link OptimalShardCountCondition} to the rollover request conditions so it gets evaluated and triggers
                     // the rollover operation (having this condition met will also provide a useful paper trail as it'll get stored in
                     // the {@link org.elasticsearch.action.admin.indices.rollover.RolloverInfo#metConditions} )

+ 173 - 37
server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.datastreams.autosharding;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -24,7 +25,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.IndexingStats;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalDouble;
@@ -95,6 +98,51 @@ public class DataStreamAutoShardingService {
         Setting.Property.Dynamic,
         Setting.Property.NodeScope
     );
+
+    /**
+     * Enumerates the different ways of measuring write load which we can choose between to use in the auto-sharding calculations.
+     */
+    public enum WriteLoadMetric {
+
+        /**
+         * An unweighted average of the load across the whole time since each shard started (see
+         * {@link IndexingStats.Stats#getWriteLoad()}).
+         */
+        ALL_TIME,
+
+        /**
+         * A weighted average of the load favoring recent load (see {@link IndexingStats.Stats#getRecentWriteLoad()}).
+         */
+        RECENT,
+
+        /**
+         * A measure of the peak value observed for the {@link #RECENT} metric (see {@link IndexingStats.Stats#getPeakWriteLoad()}).
+         */
+        PEAK
+    }
+
+    /**
+     * Represents which write load metric should be used for the calculation when considering increasing shards.
+     */
+    public static final Setting<WriteLoadMetric> DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC = Setting.enumSetting(
+        WriteLoadMetric.class,
+        "data_streams.auto_sharding.increase_shards.load_metric",
+        WriteLoadMetric.ALL_TIME,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
+    /**
+     * Represents which write load metric should be used for the calculation when considering decreasing shards.
+     */
+    public static final Setting<WriteLoadMetric> DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC = Setting.enumSetting(
+        WriteLoadMetric.class,
+        "data_streams.auto_sharding.decrease_shards.load_metric",
+        WriteLoadMetric.ALL_TIME,
+        Setting.Property.Dynamic,
+        Setting.Property.NodeScope
+    );
+
     private final ClusterService clusterService;
     private final boolean isAutoShardingEnabled;
     private final LongSupplier nowSupplier;
@@ -103,6 +151,8 @@ public class DataStreamAutoShardingService {
     private volatile int minWriteThreads;
     private volatile int maxWriteThreads;
     private volatile List<String> dataStreamExcludePatterns;
+    private volatile WriteLoadMetric increaseShardsMetric;
+    private volatile WriteLoadMetric decreaseShardsMetric;
 
     public DataStreamAutoShardingService(Settings settings, ClusterService clusterService, LongSupplier nowSupplier) {
         this.clusterService = clusterService;
@@ -112,6 +162,8 @@ public class DataStreamAutoShardingService {
         this.minWriteThreads = CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS.get(settings);
         this.maxWriteThreads = CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS.get(settings);
         this.dataStreamExcludePatterns = DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.get(settings);
+        this.increaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC.get(settings);
+        this.decreaseShardsMetric = DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC.get(settings);
         this.nowSupplier = nowSupplier;
     }
 
@@ -124,74 +176,109 @@ public class DataStreamAutoShardingService {
         clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS, this::updateMaxWriteThreads);
         clusterService.getClusterSettings()
             .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING, this::updateDataStreamExcludePatterns);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC, this::updateIncreaseShardsMetric);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC, this::updateDecreaseShardsMetric);
     }
 
     /**
      * Computes the optimal number of shards for the provided data stream according to the write index's indexing load (to check if we must
      * increase the number of shards, whilst the heuristics for decreasing the number of shards _might_ use the provided write indexing
      * load).
-     * The result type will indicate the recommendation of the auto sharding service :
-     * - not applicable if the data stream is excluded from auto sharding as configured by
+     *
+     * <p>The result type will indicate the recommendation of the auto sharding service:
+     * <ul>
+     * <li>not applicable if the data stream is excluded from auto sharding as configured by
      * {@link #DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING} or if the auto sharding functionality is disabled according to
-     * {@link #DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING}, or if the cluster doesn't have the feature available
-     * - increase number of shards if the optimal number of shards it deems necessary for the provided data stream is GT the current number
-     * of shards
-     * - decrease the number of shards if the optimal number of shards it deems necessary for the provided data stream is LT the current
+     * {@link #DATA_STREAMS_AUTO_SHARDING_ENABLED}, or if the write index's indexing load is not available
+     * <li>increase number of shards if the optimal number of shards it deems necessary for the provided data stream is GT the current
+     * number of shards
+     * <li>decrease the number of shards if the optimal number of shards it deems necessary for the provided data stream is LT the current
      * number of shards
+     * </ul>
      *
-     * If the recommendation is to INCREASE/DECREASE shards the reported cooldown period will be TimeValue.ZERO.
+     * <p>If the recommendation is to INCREASE/DECREASE shards the reported cooldown period will be TimeValue.ZERO.
      * If the auto sharding service thinks the number of shards must be changed but it can't recommend a change due to the cooldown
      * period not lapsing, the result will be of type {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} or
      * {@link AutoShardingType#COOLDOWN_PREVENTED_INCREASE} with the remaining cooldown configured and the number of shards that should
      * be configured for the data stream once the remaining cooldown lapses as the target number of shards.
      *
-     * The NOT_APPLICABLE type result will report a cooldown period of TimeValue.MAX_VALUE.
+     * <p>The NOT_APPLICABLE type result will report a cooldown period of TimeValue.MAX_VALUE.
      *
-     * The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as
+     * <p>The NO_CHANGE_REQUIRED type will potentially report the remaining cooldown always report a cool down period of TimeValue.ZERO (as
      * there'll be no new auto sharding event)
      */
-    public AutoShardingResult calculate(ProjectState state, DataStream dataStream, @Nullable Double writeIndexLoad) {
+    public AutoShardingResult calculate(ProjectState state, DataStream dataStream, @Nullable IndexStats writeIndexStats) {
         if (isAutoShardingEnabled == false) {
-            logger.debug("Data stream auto sharding service is not enabled.");
+            logger.debug("Data stream auto-sharding service is not enabled.");
             return NOT_APPLICABLE_RESULT;
         }
 
         if (dataStreamExcludePatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, dataStream.getName()))) {
             logger.debug(
-                "Data stream [{}] is excluded from auto sharding via the [{}] setting",
+                "Data stream [{}] is excluded from auto-sharding via the [{}] setting",
                 dataStream.getName(),
                 DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey()
             );
             return NOT_APPLICABLE_RESULT;
         }
 
-        if (writeIndexLoad == null) {
+        if (writeIndexStats == null) {
             logger.debug(
-                "Data stream auto sharding service cannot compute the optimal number of shards for data stream [{}] as the write index "
-                    + "load is not available",
+                "Data stream auto-sharding service cannot compute the optimal number of shards for data stream [{}] as the write index "
+                    + "stats are not available",
                 dataStream.getName()
             );
             return NOT_APPLICABLE_RESULT;
         }
-        return innerCalculate(state.metadata(), dataStream, writeIndexLoad, nowSupplier);
+
+        double writeIndexLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getWriteLoad);
+        double writeIndexRecentLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getRecentWriteLoad);
+        double writeIndexPeakLoad = sumLoadMetrics(writeIndexStats, IndexingStats.Stats::getPeakWriteLoad);
+        double writeIndexLoadForIncrease = pickMetric(increaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad);
+        double writeIndexLoadForDecrease = pickMetric(decreaseShardsMetric, writeIndexLoad, writeIndexRecentLoad, writeIndexPeakLoad);
+
+        logger.trace(
+            "Data stream auto-sharding service calculating recommendation with all-time load {}, recent load {}, peak load {}, "
+                + "using {} for increase and {} for decrease",
+            writeIndexLoad,
+            writeIndexRecentLoad,
+            writeIndexPeakLoad,
+            increaseShardsMetric,
+            decreaseShardsMetric
+        );
+
+        return innerCalculate(state.metadata(), dataStream, writeIndexLoadForIncrease, writeIndexLoadForDecrease, nowSupplier);
+    }
+
+    private static double sumLoadMetrics(IndexStats stats, Function<IndexingStats.Stats, Double> loadMetric) {
+        return Arrays.stream(stats.getShards())
+            .filter(shardStats -> shardStats.getStats().indexing != null)
+            // only take primaries into account as in stateful the replicas also index data
+            .filter(shardStats -> shardStats.getShardRouting().primary())
+            .map(shardStats -> shardStats.getStats().indexing.getTotal())
+            .map(loadMetric)
+            .reduce(0.0, Double::sum);
     }
 
     private AutoShardingResult innerCalculate(
         ProjectMetadata project,
         DataStream dataStream,
-        double writeIndexLoad,
+        double writeIndexLoadForIncrease,
+        double writeIndexLoadForDecrease,
         LongSupplier nowSupplier
     ) {
         // increasing the number of shards is calculated solely based on the index load of the write index
         IndexMetadata writeIndex = project.index(dataStream.getWriteIndex());
         assert writeIndex != null : "the data stream write index must exist in the provided cluster metadata";
-        AutoShardingResult increaseShardsResult = getIncreaseShardsResult(dataStream, writeIndexLoad, nowSupplier, writeIndex);
+        AutoShardingResult increaseShardsResult = getIncreaseShardsResult(dataStream, writeIndexLoadForIncrease, nowSupplier, writeIndex);
         return Objects.requireNonNullElseGet(
             increaseShardsResult,
             () -> getDecreaseShardsResult(
                 project,
                 dataStream,
-                writeIndexLoad,
+                writeIndexLoadForDecrease,
                 nowSupplier,
                 writeIndex,
                 getRemainingDecreaseShardsCooldown(project, dataStream)
@@ -203,12 +290,22 @@ public class DataStreamAutoShardingService {
     @Nullable
     private AutoShardingResult getIncreaseShardsResult(
         DataStream dataStream,
-        double writeIndexLoad,
+        double writeIndexLoadForIncrease,
         LongSupplier nowSupplier,
         IndexMetadata writeIndex
     ) {
         // increasing the number of shards is calculated solely based on the index load of the write index
-        long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, writeIndexLoad);
+        long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, writeIndexLoadForIncrease);
+        logger.trace(
+            "Calculated the optimal number of shards for a potential increase in number of shards for data stream [{}] as [{}]"
+                + " with the {} indexing load [{}] for the write index assuming [{}-{}] threads per shard",
+            dataStream.getName(),
+            optimalShardCount,
+            increaseShardsMetric,
+            writeIndexLoadForIncrease,
+            minWriteThreads,
+            maxWriteThreads
+        );
         if (optimalShardCount > writeIndex.getNumberOfShards()) {
             TimeValue timeSinceLastAutoShardingEvent = dataStream.getAutoShardingEvent() != null
                 ? dataStream.getAutoShardingEvent().getTimeSinceLastAutoShardingEvent(nowSupplier)
@@ -218,7 +315,7 @@ public class DataStreamAutoShardingService {
                 Math.max(0L, increaseShardsCooldown.millis() - timeSinceLastAutoShardingEvent.millis())
             );
             logger.debug(
-                "data stream autosharding service recommends increasing the number of shards from [{}] to [{}] after [{}] cooldown for "
+                "Data stream auto-sharding service recommends increasing the number of shards from [{}] to [{}] after [{}] cooldown for "
                     + "data stream [{}]",
                 writeIndex.getNumberOfShards(),
                 optimalShardCount,
@@ -230,7 +327,7 @@ public class DataStreamAutoShardingService {
                 writeIndex.getNumberOfShards(),
                 Math.toIntExact(optimalShardCount),
                 coolDownRemaining,
-                writeIndexLoad
+                writeIndexLoadForIncrease
             );
         }
         return null;
@@ -264,7 +361,7 @@ public class DataStreamAutoShardingService {
     private AutoShardingResult getDecreaseShardsResult(
         ProjectMetadata project,
         DataStream dataStream,
-        double writeIndexLoad,
+        double writeIndexLoadForDecrease,
         LongSupplier nowSupplier,
         IndexMetadata writeIndex,
         TimeValue remainingReduceShardsCooldown
@@ -272,21 +369,27 @@ public class DataStreamAutoShardingService {
         double maxIndexLoadWithinCoolingPeriod = getMaxIndexLoadWithinCoolingPeriod(
             project,
             dataStream,
-            writeIndexLoad,
+            writeIndexLoadForDecrease,
             reduceShardsCooldown,
-            nowSupplier
+            nowSupplier,
+            decreaseShardsMetric
         );
 
+        long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, maxIndexLoadWithinCoolingPeriod);
         logger.trace(
-            "calculating the optimal number of shards for a potential decrease in number of shards for data stream [{}] with the"
-                + " max indexing load [{}] over the decrease shards cool down period",
+            "Calculated the optimal number of shards for a potential decrease in number of shards for data stream [{}] as [{}]"
+                + " shards, using a max {} indexing load [{}] over the cool down period [{}] assuming [{}-{}] threads per shard",
             dataStream.getName(),
-            maxIndexLoadWithinCoolingPeriod
+            optimalShardCount,
+            decreaseShardsMetric,
+            maxIndexLoadWithinCoolingPeriod,
+            reduceShardsCooldown,
+            minWriteThreads,
+            maxWriteThreads
         );
-        long optimalShardCount = computeOptimalNumberOfShards(minWriteThreads, maxWriteThreads, maxIndexLoadWithinCoolingPeriod);
         if (optimalShardCount < writeIndex.getNumberOfShards()) {
             logger.debug(
-                "data stream autosharding service recommends decreasing the number of shards from [{}] to [{}] after [{}] cooldown for "
+                "data stream auto-sharding service recommends decreasing the number of shards from [{}] to [{}] after [{}] cooldown for "
                     + "data stream [{}]",
                 writeIndex.getNumberOfShards(),
                 optimalShardCount,
@@ -307,7 +410,7 @@ public class DataStreamAutoShardingService {
         }
 
         logger.trace(
-            "data stream autosharding service recommends maintaining the number of shards [{}] for data stream [{}]",
+            "data stream auto-sharding service recommends maintaining the number of shards [{}] for data stream [{}]",
             writeIndex.getNumberOfShards(),
             dataStream.getName()
         );
@@ -355,9 +458,10 @@ public class DataStreamAutoShardingService {
     static double getMaxIndexLoadWithinCoolingPeriod(
         ProjectMetadata project,
         DataStream dataStream,
-        double writeIndexLoad,
+        double writeIndexLoadForDecrease,
         TimeValue coolingPeriod,
-        LongSupplier nowSupplier
+        LongSupplier nowSupplier,
+        WriteLoadMetric decreaseShardsMetric
     ) {
         // for reducing the number of shards we look at more than just the write index
         List<IndexWriteLoad> writeLoadsWithinCoolingPeriod = DataStream.getIndicesWithinMaxAgeRange(
@@ -377,12 +481,19 @@ public class DataStreamAutoShardingService {
             .toList();
 
         // assume the current write index load is the highest observed and look back to find the actual maximum
-        double maxIndexLoadWithinCoolingPeriod = writeIndexLoad;
+        double maxIndexLoadWithinCoolingPeriod = writeIndexLoadForDecrease;
         for (IndexWriteLoad writeLoad : writeLoadsWithinCoolingPeriod) {
             double totalIndexLoad = 0;
             for (int shardId = 0; shardId < writeLoad.numberOfShards(); shardId++) {
-                final OptionalDouble writeLoadForShard = writeLoad.getWriteLoadForShard(shardId);
-                totalIndexLoad += writeLoadForShard.orElse(0);
+                Double writeLoadForShard = pickMetric(
+                    decreaseShardsMetric,
+                    optionalDoubleToNullable(writeLoad.getWriteLoadForShard(shardId)),
+                    optionalDoubleToNullable(writeLoad.getRecentWriteLoadForShard(shardId)),
+                    optionalDoubleToNullable(writeLoad.getPeakWriteLoadForShard(shardId))
+                );
+                if (writeLoadForShard != null) {
+                    totalIndexLoad += writeLoadForShard;
+                }
             }
             if (totalIndexLoad > maxIndexLoadWithinCoolingPeriod) {
                 maxIndexLoadWithinCoolingPeriod = totalIndexLoad;
@@ -410,4 +521,29 @@ public class DataStreamAutoShardingService {
     private void updateDataStreamExcludePatterns(List<String> newExcludePatterns) {
         this.dataStreamExcludePatterns = newExcludePatterns;
     }
+
+    private void updateIncreaseShardsMetric(WriteLoadMetric newMetric) {
+        this.increaseShardsMetric = newMetric;
+    }
+
+    private void updateDecreaseShardsMetric(WriteLoadMetric newMetric) {
+        this.decreaseShardsMetric = newMetric;
+    }
+
+    private static Double pickMetric(
+        WriteLoadMetric metric,
+        Double writeIndexLoad,
+        Double writeIndexRecentLoad,
+        Double writeIndexPeakLoad
+    ) {
+        return switch (metric) {
+            case ALL_TIME -> writeIndexLoad;
+            case RECENT -> writeIndexRecentLoad != null ? writeIndexRecentLoad : writeIndexLoad; // fall-back to all-time metric if null
+            case PEAK -> writeIndexPeakLoad != null ? writeIndexPeakLoad : writeIndexLoad; // fall-back to all-time metric if null
+        };
+    }
+
+    private static Double optionalDoubleToNullable(OptionalDouble optional) {
+        return optional.isPresent() ? optional.getAsDouble() : null;
+    }
 }

+ 2 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -230,6 +230,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
         DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING,
         DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS,
         DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
+        DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_INCREASE_SHARDS_LOAD_METRIC,
+        DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_DECREASE_SHARDS_LOAD_METRIC,
         DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
         DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
         DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,

Fichier diff supprimé car celui-ci est trop grand
+ 668 - 340
server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java


Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff