فهرست منبع

Fix NPE in DataStream TSDB validation when getting cluster state (#86112)

The NPE occurred because the data stream was added to cluster state copy (used in the cluster state api response)
before adding the `IndexMetadata` instances of the backing indices of the data stream.

This fix in this change is to first add the `IndexMetadata` instances of the backing indices to cluster state copy and then the `DataStream` instance.

Closes #86111
weizijun 3 سال پیش
والد
کامیت
1e3ad0d354

+ 1 - 1
modules/data-streams/build.gradle

@@ -15,7 +15,7 @@ esplugin {
 restResources {
   restApi {
     include 'bulk', 'count', 'search', '_common', 'indices', 'index', 'cluster', 'rank_eval', 'reindex', 'update_by_query', 'delete_by_query',
-      'eql', 'data_stream', 'ingest'
+      'eql', 'data_stream', 'ingest', 'cat'
   }
 }
 

+ 9 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml

@@ -114,6 +114,15 @@ created the data stream:
   - match: { $body.$backing_index.settings.index.time_series.start_time: '2021-04-28T00:00:00Z' }
   - match: { $body.$backing_index.settings.index.time_series.end_time: '2021-04-29T00:00:00Z' }
 
+  - do:
+      cat.indices:
+        index: k8s
+        h: index
+
+  - match:
+      $body: |
+        /^\.ds-k8s-\d{4}\.\d{2}\.\d{2}-000001\n$/
+
 ---
 fetch the tsid:
   - skip:

+ 1 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

@@ -175,12 +175,12 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
                     IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(filteredIndex);
                     if (indexAbstraction.getParentDataStream() != null) {
                         DataStream dataStream = indexAbstraction.getParentDataStream().getDataStream();
-                        mdBuilder.put(dataStream);
                         // Also the IMD of other backing indices need to be included, otherwise the cluster state api
                         // can't create a valid cluster state instance:
                         for (Index backingIndex : dataStream.getIndices()) {
                             mdBuilder.put(currentState.metadata().index(backingIndex), false);
                         }
+                        mdBuilder.put(dataStream);
                     } else {
                         IndexMetadata indexMetadata = currentState.metadata().index(filteredIndex);
                         if (indexMetadata != null) {

+ 7 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -183,8 +183,13 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
     public void validate(Function<String, IndexMetadata> imSupplier) {
         if (indexMode == IndexMode.TIME_SERIES) {
             // Get a sorted overview of each backing index with there start and end time range:
-            var startAndEndTimes = indices.stream()
-                .map(index -> imSupplier.apply(index.getName()))
+            var startAndEndTimes = indices.stream().map(index -> {
+                IndexMetadata im = imSupplier.apply(index.getName());
+                if (im == null) {
+                    throw new IllegalStateException("index [" + index.getName() + "] is not found in the index metadata supplier");
+                }
+                return im;
+            })
                 .filter(
                     // Migrated tsdb data streams have non tsdb backing indices:
                     im -> IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings())

+ 86 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexMode;
@@ -654,6 +655,91 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
                 )
             );
         }
+        {
+            Instant currentTime = Instant.now().truncatedTo(ChronoUnit.MILLIS);
+
+            // These ranges are on the edge of each other temporal boundaries.
+            Instant start1 = currentTime.minus(6, ChronoUnit.HOURS);
+            Instant end1 = currentTime.minus(2, ChronoUnit.HOURS);
+            Instant start2 = currentTime.minus(2, ChronoUnit.HOURS);
+            Instant end2 = currentTime.plus(2, ChronoUnit.HOURS);
+
+            String dataStreamName = "logs_my-app_prod";
+            var clusterState = DataStreamTestHelper.getClusterStateWithDataStream(
+                dataStreamName,
+                List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2))
+            );
+            DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
+
+            {
+                // IndexMetadata not found case:
+                var e = expectThrows(IllegalStateException.class, () -> dataStream.validate((index) -> null));
+                assertThat(
+                    e.getMessage(),
+                    equalTo(
+                        "index ["
+                            + DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli())
+                            + "] is not found in the index metadata supplier"
+                    )
+                );
+            }
+
+            {
+                // index is not time_series index:
+                dataStream.validate(
+                    (index) -> IndexMetadata.builder(index)
+                        .settings(
+                            Settings.builder()
+                                .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                                .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
+                                .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+                                .build()
+                        )
+                        .build()
+                );
+            }
+
+            {
+                // invalid IndexMetadata result
+                Instant start3 = currentTime.minus(6, ChronoUnit.HOURS);
+                Instant end3 = currentTime.plus(2, ChronoUnit.HOURS);
+                var e = expectThrows(
+                    IllegalArgumentException.class,
+                    () -> dataStream.validate(
+                        (index) -> IndexMetadata.builder(index)
+                            .settings(
+                                Settings.builder()
+                                    .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
+                                    .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
+                                    .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT)
+                                    .put(IndexSettings.TIME_SERIES_START_TIME.getKey(), start3.toEpochMilli())
+                                    .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), end3.toEpochMilli())
+                                    .build()
+                            )
+                            .build()
+                    )
+                );
+                var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
+                assertThat(
+                    e.getMessage(),
+                    equalTo(
+                        "backing index ["
+                            + DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli())
+                            + "] with range ["
+                            + formatter.format(start3)
+                            + " TO "
+                            + formatter.format(end3)
+                            + "] is overlapping with backing index ["
+                            + DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli())
+                            + "] with range ["
+                            + formatter.format(start3)
+                            + " TO "
+                            + formatter.format(end3)
+                            + "]"
+                    )
+                );
+            }
+        }
     }
 
 }