浏览代码

Fix autoscaling of follower data streams (#83302)

The presence of follower data streams would cause autoscaling capacity
calculation to fail.

Closes #82857
Henning Andersen 3 年之前
父节点
当前提交
c731fefb96

+ 6 - 0
docs/changelog/83302.yaml

@@ -0,0 +1,6 @@
+pr: 83302
+summary: Fix autoscaling of follower data streams
+area: Autoscaling
+type: bug
+issues:
+ - 82857

+ 14 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -273,6 +273,13 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public DataStream rollover(Index writeIndex, long generation) {
     public DataStream rollover(Index writeIndex, long generation) {
         ensureNotReplicated();
         ensureNotReplicated();
 
 
+        return unsafeRollover(writeIndex, generation);
+    }
+
+    /**
+     * Like {@link #rollover(Index, long)}, but does no validation, use with care only.
+     */
+    public DataStream unsafeRollover(Index writeIndex, long generation) {
         List<Index> backingIndices = new ArrayList<>(indices);
         List<Index> backingIndices = new ArrayList<>(indices);
         backingIndices.add(writeIndex);
         backingIndices.add(writeIndex);
         return new DataStream(
         return new DataStream(
@@ -299,6 +306,13 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
      */
      */
     public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata) {
     public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata) {
         ensureNotReplicated();
         ensureNotReplicated();
+        return unsafeNextWriteIndexAndGeneration(clusterMetadata);
+    }
+
+    /**
+     * Like {@link #nextWriteIndexAndGeneration(Metadata)}, but does no validation, use with care only.
+     */
+    public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata) {
         String newWriteIndexName;
         String newWriteIndexName;
         long generation = this.generation;
         long generation = this.generation;
         long currentTimeMillis = timeProvider.getAsLong();
         long currentTimeMillis = timeProvider.getAsLong();

+ 25 - 2
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -90,7 +90,18 @@ public final class DataStreamTestHelper {
         long generation,
         long generation,
         Map<String, Object> metadata
         Map<String, Object> metadata
     ) {
     ) {
-        return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false, false, null);
+        return newInstance(name, timeStampField, indices, generation, metadata, false);
+    }
+
+    public static DataStream newInstance(
+        String name,
+        DataStream.TimestampField timeStampField,
+        List<Index> indices,
+        long generation,
+        Map<String, Object> metadata,
+        boolean replicated
+    ) {
+        return new DataStream(name, timeStampField, indices, generation, metadata, false, replicated, false, false, null);
     }
     }
 
 
     public static String getLegacyDefaultBackingIndexName(
     public static String getLegacyDefaultBackingIndexName(
@@ -265,6 +276,17 @@ public final class DataStreamTestHelper {
         long currentTime,
         long currentTime,
         Settings settings,
         Settings settings,
         int replicas
         int replicas
+    ) {
+        return getClusterStateWithDataStreams(dataStreams, indexNames, currentTime, settings, replicas, false);
+    }
+
+    public static ClusterState getClusterStateWithDataStreams(
+        List<Tuple<String, Integer>> dataStreams,
+        List<String> indexNames,
+        long currentTime,
+        Settings settings,
+        int replicas,
+        boolean replicated
     ) {
     ) {
         Metadata.Builder builder = Metadata.builder();
         Metadata.Builder builder = Metadata.builder();
 
 
@@ -283,7 +305,8 @@ public final class DataStreamTestHelper {
                 createTimestampField("@timestamp"),
                 createTimestampField("@timestamp"),
                 backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
                 backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
                 dsTuple.v2(),
                 dsTuple.v2(),
-                null
+                null,
+                replicated
             );
             );
             builder.put(ds);
             builder.put(ds);
         }
         }

+ 2 - 2
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

@@ -568,8 +568,8 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService
             DataStream dataStream = stream.getDataStream();
             DataStream dataStream = stream.getDataStream();
             for (int i = 0; i < numberNewIndices; ++i) {
             for (int i = 0; i < numberNewIndices; ++i) {
                 final String uuid = UUIDs.randomBase64UUID();
                 final String uuid = UUIDs.randomBase64UUID();
-                final Tuple<String, Long> dummyRolledDatastream = dataStream.nextWriteIndexAndGeneration(state.metadata());
-                dataStream = dataStream.rollover(new Index(dummyRolledDatastream.v1(), uuid), dummyRolledDatastream.v2());
+                final Tuple<String, Long> rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(state.metadata());
+                dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2());
 
 
                 // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices
                 // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices
                 // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.
                 // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.

+ 12 - 3
x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java

@@ -65,7 +65,10 @@ public class ProactiveStorageDeciderServiceTests extends AutoscalingTestCase {
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
             List.of(Tuple.tuple("test", between(1, 10))),
             List.of(Tuple.tuple("test", between(1, 10))),
             List.of(),
             List.of(),
-            0
+            System.currentTimeMillis(),
+            Settings.EMPTY,
+            0,
+            randomBoolean()
         );
         );
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         IntStream.range(0, between(1, 10)).forEach(i -> ReactiveStorageDeciderServiceTests.addNode(stateBuilder));
         IntStream.range(0, between(1, 10)).forEach(i -> ReactiveStorageDeciderServiceTests.addNode(stateBuilder));
@@ -161,7 +164,10 @@ public class ProactiveStorageDeciderServiceTests extends AutoscalingTestCase {
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
             List.of(Tuple.tuple("test", between(1, 10))),
             List.of(Tuple.tuple("test", between(1, 10))),
             List.of(),
             List.of(),
-            between(0, 4)
+            System.currentTimeMillis(),
+            Settings.EMPTY,
+            between(0, 4),
+            randomBoolean()
         );
         );
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
         stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
@@ -220,7 +226,10 @@ public class ProactiveStorageDeciderServiceTests extends AutoscalingTestCase {
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
         ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
             List.of(Tuple.tuple("test", indices)),
             List.of(Tuple.tuple("test", indices)),
             List.of(),
             List.of(),
-            shardCopies - 1
+            System.currentTimeMillis(),
+            Settings.EMPTY,
+            shardCopies - 1,
+            randomBoolean()
         );
         );
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
         stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
         stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());