Browse Source

Avoid unintentionally clearing the `DataStream.rolloverOnWrite` flag (#107122)

A lot of places in the code use a `DataStream` constructor that sets the
`rolloverOnWrite` flag to `false`. For some places, this was
intentional, but for others, this was erroneous (and for most tests, it
didn't matter much).

This PR fixes the erroneous spots and avoids similar unintentional
behavior in the future by removing the constructor in question
altogether. Most use cases just want to copy the flag over and if you
_do_ want to set the flag to false, it makes more sense to do so
explicitly yourself rather than letting the constructor do it for you.

An additional small bonus is that we have one less constructor for the
`DataStream` class :).

Follow up of
[this](https://github.com/elastic/elasticsearch/pull/107035#discussion_r1549299287)
discussion.
Niels Bauman 1 year ago
parent
commit
887d48dfc2
17 changed files with 88 additions and 57 deletions
  1. 5 0
      docs/changelog/107122.yaml
  2. 2 1
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  3. 2 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java
  4. 2 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java
  5. 2 0
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java
  6. 2 1
      modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java
  7. 23 35
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  8. 1 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  9. 1 0
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java
  10. 1 0
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  11. 1 0
      server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java
  12. 32 15
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  13. 1 0
      server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java
  14. 1 0
      server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java
  15. 5 2
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  16. 4 0
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  17. 3 1
      x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java

+ 5 - 0
docs/changelog/107122.yaml

@@ -0,0 +1,5 @@
+pr: 107122
+summary: Avoid unintentionally clearing the `DataStream.rolloverOnWrite` flag
+area: Data streams
+type: bug
+issues: []

+ 2 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -1791,7 +1791,8 @@ public class DataStreamIT extends ESIntegTestCase {
                         original.getLifecycle(),
                         original.isFailureStore(),
                         original.getFailureIndices(),
-                        null
+                        original.rolloverOnWrite(),
+                        original.getAutoShardingEvent()
                     );
                     brokenDataStreamHolder.set(broken);
                     return ClusterState.builder(currentState)

+ 2 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java

@@ -315,7 +315,8 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase {
                 ds.getLifecycle(),
                 ds.isFailureStore(),
                 ds.getFailureIndices(),
-                null
+                ds.rolloverOnWrite(),
+                ds.getAutoShardingEvent()
             )
         );
         Metadata metadata = mb.build();

+ 2 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/UpdateTimeSeriesRangeServiceTests.java

@@ -154,7 +154,8 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
                     d.getLifecycle(),
                     d.isFailureStore(),
                     d.getFailureIndices(),
-                    null
+                    false,
+                    d.getAutoShardingEvent()
                 )
             )
             .build();

+ 2 - 0
modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

@@ -89,6 +89,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 new DataStreamLifecycle(),
                 true,
                 failureStores,
+                false,
                 null
             );
 
@@ -199,6 +200,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase
                 new DataStreamLifecycle(null, null, false),
                 true,
                 failureStores,
+                false,
                 null
             );
 

+ 2 - 1
modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

@@ -296,7 +296,8 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
                 DataStreamLifecycle.newBuilder().dataRetention(0L).build(),
                 dataStream.isFailureStore(),
                 dataStream.getFailureIndices(),
-                null
+                dataStream.rolloverOnWrite(),
+                dataStream.getAutoShardingEvent()
             )
         );
         clusterState = ClusterState.builder(clusterState).metadata(builder).build();

+ 23 - 35
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -119,40 +119,6 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     @Nullable
     private final DataStreamAutoShardingEvent autoShardingEvent;
 
-    public DataStream(
-        String name,
-        List<Index> indices,
-        long generation,
-        Map<String, Object> metadata,
-        boolean hidden,
-        boolean replicated,
-        boolean system,
-        boolean allowCustomRouting,
-        IndexMode indexMode,
-        DataStreamLifecycle lifecycle,
-        boolean failureStore,
-        List<Index> failureIndices,
-        @Nullable DataStreamAutoShardingEvent autoShardingEvent
-    ) {
-        this(
-            name,
-            indices,
-            generation,
-            metadata,
-            hidden,
-            replicated,
-            system,
-            System::currentTimeMillis,
-            allowCustomRouting,
-            indexMode,
-            lifecycle,
-            failureStore,
-            failureIndices,
-            false,
-            autoShardingEvent
-        );
-    }
-
     public DataStream(
         String name,
         List<Index> indices,
@@ -222,6 +188,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         this.failureStore = failureStore;
         this.failureIndices = failureIndices;
         assert assertConsistent(this.indices);
+        assert replicated == false || rolloverOnWrite == false : "replicated data streams cannot be marked for lazy rollover";
         this.rolloverOnWrite = rolloverOnWrite;
         this.autoShardingEvent = autoShardingEvent;
     }
@@ -238,7 +205,22 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         boolean allowCustomRouting,
         IndexMode indexMode
     ) {
-        this(name, indices, generation, metadata, hidden, replicated, system, allowCustomRouting, indexMode, null, false, List.of(), null);
+        this(
+            name,
+            indices,
+            generation,
+            metadata,
+            hidden,
+            replicated,
+            system,
+            allowCustomRouting,
+            indexMode,
+            null,
+            false,
+            List.of(),
+            false,
+            null
+        );
     }
 
     private static boolean assertConsistent(List<Index> indices) {
@@ -507,6 +489,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            false,
             autoShardingEvent
         );
     }
@@ -544,6 +527,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            false,
             autoShardingEvent
         );
     }
@@ -646,6 +630,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            rolloverOnWrite,
             autoShardingEvent
         );
     }
@@ -692,6 +677,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            rolloverOnWrite,
             autoShardingEvent
         );
     }
@@ -753,6 +739,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            rolloverOnWrite,
             autoShardingEvent
         );
     }
@@ -810,6 +797,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             lifecycle,
             failureStore,
             failureIndices,
+            rolloverOnWrite,
             autoShardingEvent
         );
     }

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -339,6 +339,7 @@ public class MetadataCreateDataStreamService {
             lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle,
             template.getDataStreamTemplate().hasFailureStore(),
             failureIndices,
+            false,
             null
         );
         Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);

+ 1 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

@@ -213,6 +213,7 @@ public class MetadataDataStreamsService {
                     lifecycle,
                     dataStream.isFailureStore(),
                     dataStream.getFailureIndices(),
+                    dataStream.rolloverOnWrite(),
                     dataStream.getAutoShardingEvent()
                 )
             );

+ 1 - 0
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -717,6 +717,7 @@ public final class RestoreService implements ClusterStateApplier {
             dataStream.getLifecycle(),
             dataStream.isFailureStore(),
             dataStream.getFailureIndices(),
+            dataStream.rolloverOnWrite(),
             dataStream.getAutoShardingEvent()
         );
     }

+ 1 - 0
server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java

@@ -794,6 +794,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             null,
             false,
             List.of(),
+            false,
             autoShardingEvent
         );
     }

+ 32 - 15
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -96,8 +96,9 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         var lifecycle = instance.getLifecycle();
         var failureStore = instance.isFailureStore();
         var failureIndices = instance.getFailureIndices();
+        var rolloverOnWrite = instance.rolloverOnWrite();
         var autoShardingEvent = instance.getAutoShardingEvent();
-        switch (between(0, 11)) {
+        switch (between(0, 12)) {
             case 0 -> name = randomAlphaOfLength(10);
             case 1 -> indices = randomNonEmptyIndexInstances();
             case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10);
@@ -110,7 +111,11 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                     isHidden = true;
                 }
             }
-            case 5 -> isReplicated = isReplicated == false;
+            case 5 -> {
+                isReplicated = isReplicated == false;
+                // Replicated data streams cannot be marked for lazy rollover.
+                rolloverOnWrite = isReplicated == false && rolloverOnWrite;
+            }
             case 6 -> {
                 if (isSystem == false) {
                     isSystem = true;
@@ -131,6 +136,10 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
                 failureStore = failureIndices.isEmpty() == false;
             }
             case 11 -> {
+                rolloverOnWrite = rolloverOnWrite == false;
+                isReplicated = rolloverOnWrite == false && isReplicated;
+            }
+            case 12 -> {
                 autoShardingEvent = randomBoolean() && autoShardingEvent != null
                     ? null
                     : new DataStreamAutoShardingEvent(
@@ -154,6 +163,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             lifecycle,
             failureStore,
             failureIndices,
+            rolloverOnWrite,
             autoShardingEvent
         );
     }
@@ -212,6 +222,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             ds.getLifecycle(),
             ds.isFailureStore(),
             ds.getFailureIndices(),
+            ds.rolloverOnWrite(),
             ds.getAutoShardingEvent()
         );
         var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
@@ -240,6 +251,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             ds.getLifecycle(),
             ds.isFailureStore(),
             ds.getFailureIndices(),
+            ds.rolloverOnWrite(),
             ds.getAutoShardingEvent()
         );
         var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
@@ -616,19 +628,21 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         postSnapshotIndices.removeAll(indicesToRemove);
         postSnapshotIndices.addAll(indicesToAdd);
 
+        var replicated = preSnapshotDataStream.isReplicated() && randomBoolean();
         var postSnapshotDataStream = new DataStream(
             preSnapshotDataStream.getName(),
             postSnapshotIndices,
             preSnapshotDataStream.getGeneration() + randomIntBetween(0, 5),
             preSnapshotDataStream.getMetadata() == null ? null : new HashMap<>(preSnapshotDataStream.getMetadata()),
             preSnapshotDataStream.isHidden(),
-            preSnapshotDataStream.isReplicated() && randomBoolean(),
+            replicated,
             preSnapshotDataStream.isSystem(),
             preSnapshotDataStream.isAllowCustomRouting(),
             preSnapshotDataStream.getIndexMode(),
             preSnapshotDataStream.getLifecycle(),
             preSnapshotDataStream.isFailureStore(),
             preSnapshotDataStream.getFailureIndices(),
+            replicated == false && preSnapshotDataStream.rolloverOnWrite(),
             preSnapshotDataStream.getAutoShardingEvent()
         );
 
@@ -670,6 +684,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             preSnapshotDataStream.getLifecycle(),
             preSnapshotDataStream.isFailureStore(),
             preSnapshotDataStream.getFailureIndices(),
+            preSnapshotDataStream.rolloverOnWrite(),
             preSnapshotDataStream.getAutoShardingEvent()
         );
 
@@ -1896,13 +1911,14 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
     public void testWriteFailureIndex() {
         boolean hidden = randomBoolean();
         boolean system = hidden && randomBoolean();
+        boolean replicated = randomBoolean();
         DataStream noFailureStoreDataStream = new DataStream(
             randomAlphaOfLength(10),
             randomNonEmptyIndexInstances(),
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -1910,7 +1926,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             false,
             null,
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(noFailureStoreDataStream.getFailureStoreWriteIndex(), nullValue());
@@ -1921,7 +1937,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -1929,7 +1945,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             true,
             List.of(),
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(failureStoreDataStreamWithEmptyFailureIndices.getFailureStoreWriteIndex(), nullValue());
@@ -1947,7 +1963,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -1955,7 +1971,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             true,
             failureIndices,
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(failureStoreDataStream.getFailureStoreWriteIndex(), is(writeFailureIndex));
@@ -1965,13 +1981,14 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
         boolean hidden = randomBoolean();
         boolean system = hidden && randomBoolean();
         List<Index> backingIndices = randomNonEmptyIndexInstances();
+        boolean replicated = randomBoolean();
         DataStream noFailureStoreDataStream = new DataStream(
             randomAlphaOfLength(10),
             backingIndices,
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -1979,7 +1996,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             false,
             null,
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(
@@ -1994,7 +2011,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -2002,7 +2019,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             true,
             List.of(),
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(
@@ -2026,7 +2043,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             randomNonNegativeInt(),
             null,
             hidden,
-            randomBoolean(),
+            replicated,
             system,
             System::currentTimeMillis,
             randomBoolean(),
@@ -2034,7 +2051,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
             DataStreamLifecycleTests.randomLifecycle(),
             true,
             failureIndices,
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             null
         );
         assertThat(failureStoreDataStream.isFailureStoreIndex(writeFailureIndex.getName()), is(true));

+ 1 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java

@@ -357,6 +357,7 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
             original.getLifecycle(),
             original.isFailureStore(),
             original.getFailureIndices(),
+            original.rolloverOnWrite(),
             original.getAutoShardingEvent()
         );
         var brokenState = ClusterState.builder(state).metadata(Metadata.builder(state.getMetadata()).put(broken).build()).build();

+ 1 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java

@@ -381,6 +381,7 @@ public class WildcardExpressionResolverTests extends ESTestCase {
                 null,
                 false,
                 List.of(),
+                false,
                 null
             );
 

+ 5 - 2
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -143,6 +143,7 @@ public final class DataStreamTestHelper {
             lifecycle,
             false,
             List.of(),
+            false,
             autoShardingEvent
         );
     }
@@ -169,6 +170,7 @@ public final class DataStreamTestHelper {
             lifecycle,
             failureStores.size() > 0,
             failureStores,
+            false,
             null
         );
     }
@@ -352,13 +354,14 @@ public final class DataStreamTestHelper {
             );
         }
 
+        boolean replicated = randomBoolean();
         return new DataStream(
             dataStreamName,
             indices,
             generation,
             metadata,
             randomBoolean(),
-            randomBoolean(),
+            replicated,
             false, // Some tests don't work well with system data streams, since these data streams require special handling
             timeProvider,
             randomBoolean(),
@@ -366,7 +369,7 @@ public final class DataStreamTestHelper {
             randomBoolean() ? DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build() : null,
             failureStore,
             failureIndices,
-            randomBoolean(),
+            replicated == false && randomBoolean(),
             randomBoolean()
                 ? new DataStreamAutoShardingEvent(
                     indices.get(indices.size() - 1).getName(),

+ 4 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -341,6 +341,9 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
                 remoteDataStream.getLifecycle(),
                 remoteDataStream.isFailureStore(),
                 remoteDataStream.getFailureIndices(),
+                // Replicated data streams can't be rolled over, so having the `rolloverOnWrite` flag set to `true` wouldn't make sense
+                // (and potentially even break things).
+                false,
                 remoteDataStream.getAutoShardingEvent()
             );
         } else {
@@ -395,6 +398,7 @@ public final class TransportPutFollowAction extends TransportMasterNodeAction<Pu
                 localDataStream.getLifecycle(),
                 localDataStream.isFailureStore(),
                 localDataStream.getFailureIndices(),
+                localDataStream.rolloverOnWrite(),
                 localDataStream.getAutoShardingEvent()
             );
         }

+ 3 - 1
x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java

@@ -122,19 +122,21 @@ public class DataStreamLifecycleUsageTransportActionIT extends ESIntegTestCase {
                     indices.add(index);
                 }
                 boolean systemDataStream = randomBoolean();
+                boolean replicated = randomBoolean();
                 DataStream dataStream = new DataStream(
                     randomAlphaOfLength(50),
                     indices,
                     randomLongBetween(0, 1000),
                     Map.of(),
                     systemDataStream || randomBoolean(),
-                    randomBoolean(),
+                    replicated,
                     systemDataStream,
                     randomBoolean(),
                     IndexMode.STANDARD,
                     lifecycle,
                     false,
                     List.of(),
+                    replicated == false && randomBoolean(),
                     null
                 );
                 dataStreamMap.put(dataStream.getName(), dataStream);