소스 검색

Identify backing indices for data streams (#55410)

Dan Hermann 5 년 전
부모
커밋
8ef209f119

+ 30 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

@@ -67,6 +67,12 @@ public interface IndexAbstraction {
     @Nullable
     IndexMetadata getWriteIndex();
 
+    /**
+     * @return the data stream to which this index belongs or <code>null</code> if this is not a concrete index or
+     * if it is a concrete index that does not belong to a data stream.
+     */
+    @Nullable DataStream getParentDataStream();
+
     /**
      * @return whether this index abstraction is hidden or not
      */
@@ -114,9 +120,15 @@ public interface IndexAbstraction {
     class Index implements IndexAbstraction {
 
         private final IndexMetadata concreteIndex;
+        private final DataStream dataStream;
 
-        public Index(IndexMetadata indexMetadata) {
+        public Index(IndexMetadata indexMetadata, DataStream dataStream) {
             this.concreteIndex = indexMetadata;
+            this.dataStream = dataStream;
+        }
+
+        public Index(IndexMetadata indexMetadata) {
+            this(indexMetadata, null);
         }
 
         @Override
@@ -139,6 +151,11 @@ public interface IndexAbstraction {
             return concreteIndex;
         }
 
+        @Override
+        public DataStream getParentDataStream() {
+            return dataStream;
+        }
+
         @Override
         public boolean isHidden() {
             return INDEX_HIDDEN_SETTING.get(concreteIndex.getSettings());
@@ -182,6 +199,12 @@ public interface IndexAbstraction {
             return writeIndex.get();
         }
 
+        @Override
+        public DataStream getParentDataStream() {
+            // aliases may not be part of a data stream
+            return null;
+        }
+
         @Override
         public boolean isHidden() {
             return isHidden;
@@ -291,6 +314,12 @@ public interface IndexAbstraction {
             return writeIndex;
         }
 
+        @Override
+        public DataStream getParentDataStream() {
+            // a data stream cannot have a parent data stream
+            return null;
+        }
+
         @Override
         public boolean isHidden() {
             return false;

+ 74 - 34
server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

@@ -1287,7 +1287,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
             final List<String> visibleOpenIndices = new ArrayList<>();
             final List<String> allClosedIndices = new ArrayList<>();
             final List<String> visibleClosedIndices = new ArrayList<>();
-            final Set<String> duplicateAliasesIndices = new HashSet<>();
+            final Set<String> allAliases = new HashSet<>();
             for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
                 final IndexMetadata indexMetadata = cursor.value;
                 final String name = indexMetadata.getIndex().getName();
@@ -1308,23 +1308,55 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                         visibleClosedIndices.add(name);
                     }
                 }
-                indexMetadata.getAliases().keysIt().forEachRemaining(duplicateAliasesIndices::add);
+                indexMetadata.getAliases().keysIt().forEachRemaining(allAliases::add);
             }
-            duplicateAliasesIndices.retainAll(allIndices);
-            if (duplicateAliasesIndices.isEmpty() == false) {
+
+            final Set<String> allDataStreams = new HashSet<>();
+            DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
+            if (dataStreamMetadata != null) {
+                for (DataStream dataStream : dataStreamMetadata.dataStreams().values()) {
+                    allDataStreams.add(dataStream.getName());
+                }
+            }
+
+            final Set<String> aliasDuplicatesWithIndices = new HashSet<>(allAliases);
+            aliasDuplicatesWithIndices.retainAll(allIndices);
+            ArrayList<String> duplicates = new ArrayList<>();
+            if (aliasDuplicatesWithIndices.isEmpty() == false) {
                 // iterate again and constructs a helpful message
-                ArrayList<String> duplicates = new ArrayList<>();
                 for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
-                    for (String alias : duplicateAliasesIndices) {
+                    for (String alias : aliasDuplicatesWithIndices) {
                         if (cursor.value.getAliases().containsKey(alias)) {
-                            duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ")");
+                            duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ") conflicts with index");
                         }
                     }
                 }
-                assert duplicates.size() > 0;
-                throw new IllegalStateException("index and alias names need to be unique, but the following duplicates were found ["
-                    + Strings.collectionToCommaDelimitedString(duplicates) + "]");
+            }
 
+            final Set<String> aliasDuplicatesWithDataStreams = new HashSet<>(allAliases);
+            aliasDuplicatesWithDataStreams.retainAll(allDataStreams);
+            if (aliasDuplicatesWithDataStreams.isEmpty() == false) {
+                // iterate again and constructs a helpful message
+                for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
+                    for (String alias : aliasDuplicatesWithDataStreams) {
+                        if (cursor.value.getAliases().containsKey(alias)) {
+                            duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ") conflicts with data stream");
+                        }
+                    }
+                }
+            }
+
+            final Set<String> dataStreamDuplicatesWithIndices = new HashSet<>(allDataStreams);
+            dataStreamDuplicatesWithIndices.retainAll(allIndices);
+            if (dataStreamDuplicatesWithIndices.isEmpty() == false) {
+                for (String dataStream : dataStreamDuplicatesWithIndices) {
+                    duplicates.add("data stream [" + dataStream + "] conflicts with index");
+                }
+            }
+
+            if (duplicates.size() > 0) {
+                throw new IllegalStateException("index, alias, and data stream names need to be unique, but the following duplicates " +
+                    "were found [" + Strings.collectionToCommaDelimitedString(duplicates) + "]");
             }
 
             SortedMap<String, IndexAbstraction> indicesLookup = Collections.unmodifiableSortedMap(buildIndicesLookup());
@@ -1349,10 +1381,40 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
 
         private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
             SortedMap<String, IndexAbstraction> indicesLookup = new TreeMap<>();
+            Map<String, DataStream> indexToDataStreamLookup = new HashMap<>();
+            DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
+            // If there are no indices, then skip data streams. This happens only when metadata is read from disk
+            if (dataStreamMetadata != null && indices.size() > 0) {
+                for (DataStream dataStream : dataStreamMetadata.dataStreams().values()) {
+                    List<IndexMetadata> backingIndices = dataStream.getIndices().stream()
+                        .map(index -> indices.get(index.getName()))
+                        .collect(Collectors.toList());
+                    assert backingIndices.isEmpty() == false;
+                    assert backingIndices.contains(null) == false;
+
+                    IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
+                        new IndexAbstraction.DataStream(dataStream, backingIndices));
+                    assert existing == null : "duplicate data stream for " + dataStream.getName();
+
+                    for (Index i : dataStream.getIndices()) {
+                        indexToDataStreamLookup.put(i.getName(), dataStream);
+                    }
+                }
+            }
+
             for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
                 IndexMetadata indexMetadata = cursor.value;
-                IndexAbstraction existing =
-                    indicesLookup.put(indexMetadata.getIndex().getName(), new IndexAbstraction.Index(indexMetadata));
+
+                IndexAbstraction.Index index;
+                DataStream parent = indexToDataStreamLookup.get(indexMetadata.getIndex().getName());
+                if (parent != null) {
+                    assert parent.getIndices().contains(indexMetadata.getIndex());
+                    index = new IndexAbstraction.Index(indexMetadata, (IndexAbstraction.DataStream) indicesLookup.get(parent.getName()));
+                } else {
+                    index = new IndexAbstraction.Index(indexMetadata);
+                }
+
+                IndexAbstraction existing = indicesLookup.put(indexMetadata.getIndex().getName(), index);
                 assert existing == null : "duplicate for " + indexMetadata.getIndex();
 
                 for (ObjectObjectCursor<String, AliasMetadata> aliasCursor : indexMetadata.getAliases()) {
@@ -1369,28 +1431,6 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
                 }
             }
 
-            DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
-            // If there are no indices then it doesn't make sense to to add data streams to indicesLookup,
-            // since there no concrete indices that a data stream can point to.
-            // (This occurs when only Metadata is read from disk.)
-            if (dataStreamMetadata != null && indices.size() > 0) {
-                for (Map.Entry<String, DataStream> entry : dataStreamMetadata.dataStreams().entrySet()) {
-                    DataStream dataStream = entry.getValue();
-                    List<IndexMetadata> backingIndices = dataStream.getIndices().stream()
-                        .map(index -> indices.get(index.getName()))
-                        .collect(Collectors.toList());
-                    assert backingIndices.isEmpty() == false;
-                    assert backingIndices.contains(null) == false;
-
-                    IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
-                        new IndexAbstraction.DataStream(dataStream, backingIndices));
-                    if (existing != null) {
-                        throw new IllegalStateException("data stream [" + dataStream.getName() +
-                            "] conflicts with existing " + existing.getType().getDisplayName() + " [" + existing.getName() + "]");
-                    }
-                }
-            }
-
             indicesLookup.values().stream()
                 .filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
                 .forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());

+ 61 - 12
server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.cluster.metadata;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.cluster.ClusterModule;
+import org.elasticsearch.cluster.DataStreamTestHelper;
 import org.elasticsearch.cluster.coordination.CoordinationMetadata;
 import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
 import org.elasticsearch.common.Strings;
@@ -50,6 +51,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 
 import static org.elasticsearch.cluster.DataStreamTestHelper.createBackingIndex;
 import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
@@ -163,7 +165,8 @@ public class MetadataTests extends ESTestCase {
             fail("exception should have been thrown");
         } catch (IllegalStateException e) {
             assertThat(e.getMessage(),
-                equalTo("index and alias names need to be unique, but the following duplicates were found [index (alias of [index])]"));
+                equalTo("index, alias, and data stream names need to be unique, but the following duplicates were found [index (alias " +
+                    "of [index]) conflicts with index]"));
         }
     }
 
@@ -198,7 +201,7 @@ public class MetadataTests extends ESTestCase {
             metadataBuilder.build();
             fail("exception should have been thrown");
         } catch (IllegalStateException e) {
-            assertThat(e.getMessage(), startsWith("index and alias names need to be unique"));
+            assertThat(e.getMessage(), startsWith("index, alias, and data stream names need to be unique"));
         }
     }
 
@@ -925,7 +928,8 @@ public class MetadataTests extends ESTestCase {
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(),
-            containsString("data stream [" + dataStreamName + "] conflicts with existing concrete index [" + dataStreamName + "]"));
+            containsString("index, alias, and data stream names need to be unique, but the following duplicates were found [data " +
+                "stream [" + dataStreamName + "] conflicts with index]"));
     }
 
     public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
@@ -939,7 +943,8 @@ public class MetadataTests extends ESTestCase {
 
         IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
         assertThat(e.getMessage(),
-            containsString("data stream [" + dataStreamName + "] conflicts with existing alias [" + dataStreamName + "]"));
+            containsString("index, alias, and data stream names need to be unique, but the following duplicates were found [" +
+                dataStreamName + " (alias of [" + DataStream.getBackingIndexName(dataStreamName, 1) + "]) conflicts with data stream]"));
     }
 
     public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
@@ -1026,6 +1031,54 @@ public class MetadataTests extends ESTestCase {
         }
     }
 
+    public void testIndicesLookupRecordsDataStreamForBackingIndices() {
+        // create some indices that do not back a data stream
+        final List<Index> indices = new ArrayList<>();
+        final int numIndices = randomIntBetween(2, 5);
+        int lastIndexNum = randomIntBetween(9, 50);
+        Metadata.Builder b = Metadata.builder();
+        for (int k = 1; k <= numIndices; k++) {
+            IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName("index", lastIndexNum))
+                .settings(settings(Version.CURRENT))
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .build();
+            b.put(im, false);
+            indices.add(im.getIndex());
+            lastIndexNum = randomIntBetween(lastIndexNum + 1, lastIndexNum + 50);
+        }
+
+        // create some backing indices for a data stream
+        final String dataStreamName = "my-data-stream";
+        final List<Index> backingIndices = new ArrayList<>();
+        final int numBackingIndices = randomIntBetween(2, 5);
+        int lastBackingIndexNum = 0;
+        for (int k = 1; k <= numBackingIndices; k++) {
+            lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
+            IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, lastBackingIndexNum))
+                .settings(settings(Version.CURRENT))
+                .numberOfShards(1)
+                .numberOfReplicas(1)
+                .build();
+            b.put(im, false);
+            backingIndices.add(im.getIndex());
+        }
+        b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
+        Metadata metadata = b.build();
+
+        SortedMap<String, IndexAbstraction> indicesLookup = metadata.getIndicesLookup();
+        assertThat(indicesLookup.size(), equalTo(indices.size() + backingIndices.size() + 1));
+        for (Index index : indices) {
+            assertTrue(indicesLookup.containsKey(index.getName()));
+            assertNull(indicesLookup.get(index.getName()).getParentDataStream());
+        }
+        for (Index index : backingIndices) {
+            assertTrue(indicesLookup.containsKey(index.getName()));
+            assertNotNull(indicesLookup.get(index.getName()).getParentDataStream());
+            assertThat(indicesLookup.get(index.getName()).getParentDataStream().getName(), equalTo(dataStreamName));
+        }
+    }
+
     public void testSerialization() throws IOException {
         final Metadata orig = randomMetadata();
         final BytesStreamOutput out = new BytesStreamOutput();
@@ -1037,8 +1090,6 @@ public class MetadataTests extends ESTestCase {
     }
 
     public static Metadata randomMetadata() {
-        DataStream randomDataStream = DataStreamTests.randomInstance();
-
         Metadata.Builder md = Metadata.builder()
             .put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
             .put(IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3))
@@ -1058,15 +1109,13 @@ public class MetadataTests extends ESTestCase {
             .indexGraveyard(IndexGraveyardTests.createRandom())
             .version(randomNonNegativeLong())
             .put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance())
-            .put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance())
-            .put(randomDataStream);
+            .put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance());
 
+        DataStream randomDataStream = DataStreamTests.randomInstance();
         for (Index index : randomDataStream.getIndices()) {
-            md.put(IndexMetadata.builder(index.getName())
-                .settings(ESTestCase.settings(Version.CURRENT).put("index.hidden", true))
-                .numberOfShards(1)
-                .numberOfReplicas(1));
+            md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
         }
+        md.put(randomDataStream);
 
         return md.build();
     }

+ 18 - 3
test/framework/src/main/java/org/elasticsearch/cluster/DataStreamTestHelper.java

@@ -22,18 +22,33 @@ package org.elasticsearch.cluster;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.test.ESTestCase;
 
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
+
 public final class DataStreamTestHelper {
 
+    private static final Settings.Builder SETTINGS = ESTestCase.settings(Version.CURRENT).put("index.hidden", true);
+    private static final int NUMBER_OF_SHARDS = 1;
+    private static final int NUMBER_OF_REPLICAS = 1;
+
     public static IndexMetadata.Builder createFirstBackingIndex(String dataStreamName) {
         return createBackingIndex(dataStreamName, 1);
     }
 
     public static IndexMetadata.Builder createBackingIndex(String dataStreamName, int generation) {
         return IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, generation))
-            .settings(ESTestCase.settings(Version.CURRENT).put("index.hidden", true))
-            .numberOfShards(1)
-            .numberOfReplicas(1);
+            .settings(SETTINGS)
+            .numberOfShards(NUMBER_OF_SHARDS)
+            .numberOfReplicas(NUMBER_OF_REPLICAS);
+    }
+
+    public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) {
+        return IndexMetadata.builder(index.getName())
+            .settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID()))
+            .numberOfShards(NUMBER_OF_SHARDS)
+            .numberOfReplicas(NUMBER_OF_REPLICAS);
     }
 }