Browse Source

Data stream support read and write with custom routing and partition size (#74394)

xiaoping 4 years ago
parent
commit
7e08c6b98a
22 changed files with 348 additions and 53 deletions
  1. 14 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java
  2. 2 1
      docs/reference/data-streams/change-mappings-and-settings.asciidoc
  3. 4 2
      docs/reference/indices/get-data-stream.asciidoc
  4. 8 2
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  5. 23 5
      server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java
  6. 62 16
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  7. 25 1
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
  8. 11 2
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java
  9. 15 1
      server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java
  10. 2 1
      server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
  11. 3 2
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  12. 1 1
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java
  13. 3 2
      server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java
  14. 1 1
      server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java
  15. 1 1
      test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java
  16. 2 2
      x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java
  17. 1 1
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java
  18. 11 2
      x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java
  19. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java
  20. 2 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java
  21. 154 3
      x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java
  22. 1 1
      x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java

+ 14 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java

@@ -34,10 +34,11 @@ public final class DataStream {
     String ilmPolicyName;
     @Nullable
     private final Map<String, Object> metadata;
+    private final boolean allowCustomRouting;
 
     public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
                       @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable  Map<String, Object> metadata,
-                      boolean hidden, boolean system) {
+                      boolean hidden, boolean system, boolean allowCustomRouting) {
         this.name = name;
         this.timeStampField = timeStampField;
         this.indices = indices;
@@ -48,6 +49,7 @@ public final class DataStream {
         this.metadata = metadata;
         this.hidden = hidden;
         this.system = system;
+        this.allowCustomRouting = allowCustomRouting;
     }
 
     public String getName() {
@@ -90,6 +92,10 @@ public final class DataStream {
         return system;
     }
 
+    public boolean allowsCustomRouting() {
+        return allowCustomRouting;
+    }
+
     public static final ParseField NAME_FIELD = new ParseField("name");
     public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
     public static final ParseField INDICES_FIELD = new ParseField("indices");
@@ -100,6 +106,7 @@ public final class DataStream {
     public static final ParseField METADATA_FIELD = new ParseField("_meta");
     public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
     public static final ParseField SYSTEM_FIELD = new ParseField("system");
+    public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
@@ -116,8 +123,9 @@ public final class DataStream {
             Map<String, Object> metadata = (Map<String, Object>) args[7];
             boolean hidden = args[8] != null && (boolean) args[8];
             boolean system = args[9] != null && (boolean) args[9];
+            boolean allowCustomRouting = args[10] != null && (boolean) args[10];
             return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden,
-                system);
+                system, allowCustomRouting);
         });
 
     static {
@@ -131,6 +139,7 @@ public final class DataStream {
         PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD);
+        PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
     }
 
     public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -151,12 +160,13 @@ public final class DataStream {
             system == that.system &&
             Objects.equals(indexTemplate, that.indexTemplate) &&
             Objects.equals(ilmPolicyName, that.ilmPolicyName) &&
-            Objects.equals(metadata, that.metadata);
+            Objects.equals(metadata, that.metadata) &&
+            allowCustomRouting == that.allowCustomRouting;
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata, hidden,
-            system);
+            system, allowCustomRouting);
     }
 }

+ 2 - 1
docs/reference/data-streams/change-mappings-and-settings.asciidoc

@@ -577,7 +577,8 @@ stream's oldest backing index.
       "status": "GREEN",
       "template": "my-data-stream-template",
       "hidden": false,
-      "system": false
+      "system": false,
+      "allow_custom_routing": false
     }
   ]
 }

+ 4 - 2
docs/reference/indices/get-data-stream.asciidoc

@@ -246,7 +246,8 @@ The API returns the following response:
       "template": "my-index-template",
       "ilm_policy": "my-lifecycle-policy",
       "hidden": false,
-      "system": false
+      "system": false,
+      "allow_custom_routing": false
     },
     {
       "name": "my-data-stream-two",
@@ -267,7 +268,8 @@ The API returns the following response:
       "template": "my-index-template",
       "ilm_policy": "my-lifecycle-policy",
       "hidden": false,
-      "system": false
+      "system": false,
+      "allow_custom_routing": false
     }
   ]
 }

+ 8 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -347,8 +347,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
 
         if (writeRequest.routing() != null) {
             IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction;
-            throw new IllegalArgumentException("index request targeting data stream [" + dataStream.getName() + "] specifies a custom " +
-                "routing. target the backing indices directly or remove the custom routing.");
+            if (dataStream.getDataStream().isAllowCustomRouting() == false) {
+                throw new IllegalArgumentException(
+                    "index request targeting data stream ["
+                        + dataStream.getName()
+                        + "] specifies a custom routing but the [allow_custom_routing] setting was "
+                        + "not enabled in the data stream's template."
+                );
+            }
         }
     }
 

+ 23 - 5
server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
 import org.elasticsearch.core.Nullable;
@@ -258,28 +259,37 @@ public class ComposableIndexTemplate extends AbstractDiffable<ComposableIndexTem
     public static class DataStreamTemplate implements Writeable, ToXContentObject {
 
         private static final ParseField HIDDEN = new ParseField("hidden");
+        private static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
 
         public static final ConstructingObjectParser<DataStreamTemplate, Void> PARSER = new ConstructingObjectParser<>(
             "data_stream_template",
             false,
-            a -> new DataStreamTemplate(a[0] != null && (boolean) a[0]));
+            a -> new DataStreamTemplate(a[0] != null && (boolean) a[0], a[1] != null && (boolean) a[1]));
 
         static {
             PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN);
+            PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
         }
 
         private final boolean hidden;
+        private final boolean allowCustomRouting;
 
         public DataStreamTemplate() {
-            this(false);
+            this(false, false);
         }
 
-        public DataStreamTemplate(boolean hidden) {
+        public DataStreamTemplate(boolean hidden, boolean allowCustomRouting) {
             this.hidden = hidden;
+            this.allowCustomRouting = allowCustomRouting;
         }
 
         DataStreamTemplate(StreamInput in) throws IOException {
             hidden = in.readBoolean();
+            if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+                allowCustomRouting = in.readBoolean();
+            } else {
+                allowCustomRouting = false;
+            }
         }
 
         public String getTimestampField() {
@@ -298,15 +308,23 @@ public class ComposableIndexTemplate extends AbstractDiffable<ComposableIndexTem
             return hidden;
         }
 
+        public boolean isAllowCustomRouting() {
+            return allowCustomRouting;
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeBoolean(hidden);
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                out.writeBoolean(allowCustomRouting);
+            }
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
             builder.field("hidden", hidden);
+            builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
             builder.endObject();
             return builder;
         }
@@ -316,12 +334,12 @@ public class ComposableIndexTemplate extends AbstractDiffable<ComposableIndexTem
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             DataStreamTemplate that = (DataStreamTemplate) o;
-            return hidden == that.hidden;
+            return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(hidden);
+            return Objects.hash(hidden, allowCustomRouting);
         }
     }
 

+ 62 - 16
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -10,6 +10,7 @@ package org.elasticsearch.cluster.metadata;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.index.LeafReader;
 import org.apache.lucene.index.PointValues;
+import org.elasticsearch.Version;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.cluster.AbstractDiffable;
 import org.elasticsearch.cluster.Diff;
@@ -75,24 +76,36 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     private final boolean hidden;
     private final boolean replicated;
     private final boolean system;
+    private final boolean allowCustomRouting;
 
     public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
-        this(name, timeStampField, indices, generation, metadata, false, false, false);
+        this(name, timeStampField, indices, generation, metadata, false, false, false, false);
     }
 
     public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
-                      boolean hidden, boolean replicated) {
-        this(name, timeStampField, indices, generation, metadata, hidden, replicated, false, System::currentTimeMillis);
+                      boolean hidden, boolean replicated, boolean allowCustomRouting) {
+        this(name, timeStampField, indices, generation, metadata, hidden, replicated, false, System::currentTimeMillis, allowCustomRouting);
     }
 
     public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
-                      boolean hidden, boolean replicated, boolean system) {
-        this(name, timeStampField, indices, generation, metadata, hidden, replicated, system, System::currentTimeMillis);
+                      boolean hidden, boolean replicated, boolean system, boolean allowCustomRouting) {
+        this(
+            name,
+            timeStampField,
+            indices,
+            generation,
+            metadata,
+            hidden,
+            replicated,
+            system,
+            System::currentTimeMillis,
+            allowCustomRouting
+        );
     }
 
     // visible for testing
     DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
-        boolean hidden, boolean replicated, boolean system, LongSupplier timeProvider) {
+        boolean hidden, boolean replicated, boolean system, LongSupplier timeProvider, boolean allowCustomRouting) {
         this.name = name;
         this.timeStampField = timeStampField;
         this.indices = Collections.unmodifiableList(indices);
@@ -102,6 +115,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         this.replicated = replicated;
         this.timeProvider = timeProvider;
         this.system = system;
+        this.allowCustomRouting = allowCustomRouting;
         assert indices.size() > 0;
     }
 
@@ -152,6 +166,10 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         return system;
     }
 
+    public boolean isAllowCustomRouting() {
+        return allowCustomRouting;
+    }
+
     /**
      * Performs a rollover on a {@code DataStream} instance and returns a new instance containing
      * the updated list of backing indices and incremented generation.
@@ -175,7 +193,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
             newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), ++generation, currentTimeMillis);
         } while (clusterMetadata.getIndicesLookup().containsKey(newWriteIndexName));
         backingIndices.add(new Index(newWriteIndexName, writeIndexUuid));
-        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
+        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system, allowCustomRouting);
     }
 
     /**
@@ -208,7 +226,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         List<Index> backingIndices = new ArrayList<>(indices);
         backingIndices.remove(index);
         assert backingIndices.size() == indices.size() - 1;
-        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
+        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system, allowCustomRouting);
     }
 
     /**
@@ -233,7 +251,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
                 "it is the write index", existingBackingIndex.getName(), name));
         }
         backingIndices.set(backingIndexPosition, newBackingIndex);
-        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system);
+        return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system, allowCustomRouting);
     }
 
     /**
@@ -284,7 +302,18 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     }
 
     public DataStream promoteDataStream() {
-        return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider);
+        return new DataStream(
+            name,
+            timeStampField,
+            indices,
+            getGeneration(),
+            metadata,
+            hidden,
+            false,
+            system,
+            timeProvider,
+            allowCustomRouting
+        );
     }
 
     /**
@@ -315,7 +344,8 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
             metadata == null ? null : new HashMap<>(metadata),
             hidden,
             replicated,
-            system
+            system,
+            allowCustomRouting
         );
     }
 
@@ -346,8 +376,17 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     }
 
     public DataStream(StreamInput in) throws IOException {
-        this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
-            in.readMap(), in.readBoolean(), in.readBoolean(), in.readBoolean());
+        this(
+            in.readString(),
+            new TimestampField(in),
+            in.readList(Index::new),
+            in.readVLong(),
+            in.readMap(),
+            in.readBoolean(),
+            in.readBoolean(),
+            in.readBoolean(),
+            in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false
+        );
     }
 
     public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -364,6 +403,9 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         out.writeBoolean(hidden);
         out.writeBoolean(replicated);
         out.writeBoolean(system);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeBoolean(allowCustomRouting);
+        }
     }
 
     public static final ParseField NAME_FIELD = new ParseField("name");
@@ -374,12 +416,13 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
     public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
     public static final ParseField REPLICATED_FIELD = new ParseField("replicated");
     public static final ParseField SYSTEM_FIELD = new ParseField("system");
+    public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
 
     @SuppressWarnings("unchecked")
     private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
         args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
             (Map<String, Object>) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6],
-            args[7] != null && (boolean) args[7]));
+            args[7] != null && (boolean) args[7], args[8] != null && (boolean) args[8]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
@@ -390,6 +433,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
         PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD);
+        PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING);
     }
 
     public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -409,6 +453,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
         builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
         builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
         builder.field(SYSTEM_FIELD.getPreferredName(), system);
+        builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
         builder.endObject();
         return builder;
     }
@@ -424,12 +469,13 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
             generation == that.generation &&
             Objects.equals(metadata, that.metadata) &&
             hidden == that.hidden &&
-            replicated == that.replicated;
+            replicated == that.replicated &&
+            allowCustomRouting == that.allowCustomRouting;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated);
+        return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated, allowCustomRouting);
     }
 
     public static final class TimestampField implements Writeable, ToXContentObject {

+ 25 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -671,7 +671,31 @@ public class IndexNameExpressionResolver {
                         }
                     }
                 }
-            } else {
+            } else if (indexAbstraction != null && indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
+                IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction;
+                if (dataStream.getDataStream().isAllowCustomRouting() == false) {
+                    continue;
+                }
+                if (dataStream.getIndices() != null) {
+                    for (IndexMetadata indexMetadata : dataStream.getIndices()) {
+                        String concreteIndex = indexMetadata.getIndex().getName();
+                        if (norouting.contains(concreteIndex) == false) {
+                            norouting.add(concreteIndex);
+                            if (paramRouting != null) {
+                                Set<String> r = new HashSet<>(paramRouting);
+                                if (routings == null) {
+                                    routings = new HashMap<>();
+                                }
+                                routings.put(concreteIndex, r);
+                            } else {
+                                if (routings != null) {
+                                    routings.remove(concreteIndex);
+                                }
+                            }
+                        }
+                    }
+                }
+            }  else {
                 // Index
                 if (norouting.contains(expression) == false) {
                     norouting.add(expression);

+ 11 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -217,8 +217,17 @@ public class MetadataCreateDataStreamService {
         List<Index> dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList());
         dsBackingIndices.add(writeIndex.getIndex());
         boolean hidden = isSystem ? false : template.getDataStreamTemplate().isHidden();
-        DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
-            template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false, isSystem);
+        DataStream newDataStream = new DataStream(
+            dataStreamName,
+            timestampField,
+            dsBackingIndices,
+            1L,
+            template.metadata() != null ? Map.copyOf(template.metadata()) : null,
+            hidden,
+            false,
+            isSystem,
+            template.getDataStreamTemplate().isAllowCustomRouting()
+        );
         Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
 
         List<String> aliases = new ArrayList<>();

+ 15 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java

@@ -92,6 +92,16 @@ public class MetadataIndexTemplateService {
         "        }\n" +
         "      }\n" +
         "    }";
+    public static final String DEFAULT_TIMESTAMP_MAPPING_WITH_ROUTING = "{\n" +
+        "\"_routing\" : {\n"
+        + "        \"required\" : true\n"
+        + "      },"+
+        "      \"properties\": {\n" +
+        "        \"@timestamp\": {\n" +
+        "          \"type\": \"date\"\n" +
+        "        }\n" +
+        "      }\n" +
+        "    }";
     private static final Logger logger = LogManager.getLogger(MetadataIndexTemplateService.class);
     private final ClusterService clusterService;
     private final AliasValidator aliasValidator;
@@ -1024,7 +1034,11 @@ public class MetadataIndexTemplateService {
         if (template.getDataStreamTemplate() != null && indexName.startsWith(DataStream.BACKING_INDEX_PREFIX)) {
             // add a default mapping for the `@timestamp` field, at the lowest precedence, to make bootstrapping data streams more
             // straightforward as all backing indices are required to have a timestamp field
-            mappings.add(0, new CompressedXContent(wrapMappingsIfNecessary(DEFAULT_TIMESTAMP_MAPPING, xContentRegistry)));
+            if (template.getDataStreamTemplate().isAllowCustomRouting()) {
+                mappings.add(0, new CompressedXContent(wrapMappingsIfNecessary(DEFAULT_TIMESTAMP_MAPPING_WITH_ROUTING, xContentRegistry)));
+            } else {
+                mappings.add(0, new CompressedXContent(wrapMappingsIfNecessary(DEFAULT_TIMESTAMP_MAPPING, xContentRegistry)));
+            }
         }
 
         // Only include _timestamp mapping snippet if creating backing index.

+ 2 - 1
server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

@@ -648,7 +648,8 @@ public class RestoreService implements ClusterStateApplier {
             dataStream.getMetadata(),
             dataStream.isHidden(),
             dataStream.isReplicated(),
-            dataStream.isSystem()
+            dataStream.isSystem(),
+            dataStream.isAllowCustomRouting()
         );
     }
 

+ 3 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -227,8 +227,9 @@ public class TransportBulkActionTests extends ESTestCase {
             .routing("custom");
         IllegalArgumentException exception =
             expectThrows(IllegalArgumentException.class, () -> prohibitCustomRoutingOnDataStream(writeRequestAgainstDataStream, metadata));
-        assertThat(exception.getMessage(), is("index request targeting data stream [logs-foobar] specifies a custom routing. target the " +
-            "backing indices directly or remove the custom routing."));
+        assertThat(exception.getMessage(), is(
+            "index request targeting data stream [logs-foobar] specifies a custom routing "
+                + "but the [allow_custom_routing] setting was not enabled in the data stream's template."));
 
         // test custom routing is allowed when the index request targets the backing index
         DocWriteRequest<?> writeRequestAgainstIndex =

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java

@@ -32,7 +32,7 @@ public class DataStreamTemplateTests extends AbstractSerializingTestCase<DataStr
     }
 
     public static DataStreamTemplate randomInstance() {
-        return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean());
+        return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean(), randomBoolean());
     }
 
 }

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

@@ -429,7 +429,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
             preSnapshotDataStream.getGeneration() + randomIntBetween(0, 5),
             preSnapshotDataStream.getMetadata() == null ? null : new HashMap<>(preSnapshotDataStream.getMetadata()),
             preSnapshotDataStream.isHidden(),
-            preSnapshotDataStream.isReplicated() && randomBoolean());
+            preSnapshotDataStream.isReplicated() && randomBoolean(), preSnapshotDataStream.isAllowCustomRouting());
 
         var reconciledDataStream =
             postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toList()));
@@ -467,7 +467,8 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
             preSnapshotDataStream.getGeneration(),
             preSnapshotDataStream.getMetadata(),
             preSnapshotDataStream.isHidden(),
-            preSnapshotDataStream.isReplicated()
+            preSnapshotDataStream.isReplicated(),
+            preSnapshotDataStream.isAllowCustomRouting()
         );
 
         assertNull(postSnapshotDataStream.snapshot(

+ 1 - 1
server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

@@ -2373,7 +2373,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
                 .put(index2, false)
                 .put(justAnIndex, false)
                 .put(new DataStream(dataStream1, createTimestampField("@timestamp"),
-                    List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build();
+                    List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false, false))).build();
 
         Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*");
         assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() ));

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

@@ -150,7 +150,7 @@ public final class DataStreamTestHelper {
             metadata = Map.of("key", "value");
         }
         return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata,
-            randomBoolean(), randomBoolean(), false, timeProvider);
+            randomBoolean(), randomBoolean(), false, timeProvider, false);
     }
 
     public static DataStreamAlias randomAliasInstance() {

+ 2 - 2
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -274,7 +274,7 @@ public final class TransportPutFollowAction
             // just copying the data stream is in this case safe.
             return new DataStream(remoteDataStream.getName(), remoteDataStream.getTimeStampField(),
                 List.of(backingIndexToFollow), remoteDataStream.getGeneration(), remoteDataStream.getMetadata(),
-                remoteDataStream.isHidden(), true);
+                remoteDataStream.isHidden(), true, remoteDataStream.isAllowCustomRouting());
         } else {
             if (localDataStream.isReplicated() == false) {
                 throw new IllegalArgumentException("cannot follow backing index [" + backingIndexToFollow.getName() +
@@ -292,7 +292,7 @@ public final class TransportPutFollowAction
 
             return new DataStream(localDataStream.getName(), localDataStream.getTimeStampField(), backingIndices,
                 remoteDataStream.getGeneration(), remoteDataStream.getMetadata(), localDataStream.isHidden(),
-                localDataStream.isReplicated());
+                localDataStream.isReplicated(), localDataStream.isAllowCustomRouting());
         }
     }
 

+ 1 - 1
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -2376,7 +2376,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
             .system(system)
             .build();
         DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"),
-            List.of(indexMetadata.getIndex()), 1, null, false, false, system);
+            List.of(indexMetadata.getIndex()), 1, null, false, false, system, false);
         ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
             .metadata(Metadata.builder()
                 .put(indexMetadata, true)

+ 11 - 2
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java

@@ -78,14 +78,23 @@ public class TransportPutFollowActionTests extends ESTestCase {
             .mapToObj(value -> DataStream.getDefaultBackingIndexName(name, value))
             .map(value -> new Index(value, "uuid"))
             .collect(Collectors.toList());
-        return new DataStream(name, new TimestampField("@timestamp"), backingIndices, backingIndices.size(), Map.of(), false, replicate);
+        return new DataStream(
+            name,
+            new TimestampField("@timestamp"),
+            backingIndices,
+            backingIndices.size(),
+            Map.of(),
+            false,
+            replicate,
+            false
+        );
     }
 
     static DataStream generateDataSteam(String name, int generation, boolean replicate, String... backingIndexNames) {
         List<Index> backingIndices = Arrays.stream(backingIndexNames)
             .map(value -> new Index(value, "uuid"))
             .collect(Collectors.toList());
-        return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of(), false, replicate);
+        return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of(), false, replicate, false);
     }
 
 }

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java

@@ -120,6 +120,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
             public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
             public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
             public static final ParseField SYSTEM_FIELD = new ParseField("system");
+            public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
 
             DataStream dataStream;
             ClusterHealthStatus dataStreamStatus;
@@ -183,6 +184,7 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
                 }
                 builder.field(HIDDEN_FIELD.getPreferredName(), dataStream.isHidden());
                 builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem());
+                builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), dataStream.isAllowCustomRouting());
                 builder.endObject();
                 return builder;
             }

+ 2 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java

@@ -92,7 +92,7 @@ public class LifecyclePolicyUtilsTests extends ESTestCase {
                         new ComposableIndexTemplateMetadata(Collections.singletonMap("mytemplate",
                             new ComposableIndexTemplate(Collections.singletonList("myds"),
                                 new Template(Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, "mypolicy").build(), null, null),
-                                null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(false)))))
+                                null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(false, false)))))
                     .build())
                 .build();
             assertThat(LifecyclePolicyUtils.calculateUsage(iner, state, "mypolicy"),
@@ -133,7 +133,7 @@ public class LifecyclePolicyUtilsTests extends ESTestCase {
                         new ComposableIndexTemplateMetadata(Collections.singletonMap("mytemplate",
                             new ComposableIndexTemplate(Collections.singletonList("myds"),
                                 new Template(Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, "mypolicy").build(), null, null),
-                                null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(false)))))
+                                null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(false, false)))))
                     .build())
                 .build();
             assertThat(LifecyclePolicyUtils.calculateUsage(iner, state, "mypolicy"),

+ 154 - 3
x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.datastreams;
 
+import org.apache.logging.log4j.core.util.Throwables;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchStatusException;
@@ -84,6 +85,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -1109,8 +1111,8 @@ public class DataStreamIT extends ESIntegTestCase {
         assertThat(
             exception.getMessage(),
             is(
-                "index request targeting data stream [logs-foobar] specifies a custom routing. target the "
-                    + "backing indices directly or remove the custom routing."
+                "index request targeting data stream [logs-foobar] specifies a custom routing "
+                    + "but the [allow_custom_routing] setting was not enabled in the data stream's template."
             )
         );
 
@@ -1131,12 +1133,54 @@ public class DataStreamIT extends ESIntegTestCase {
                 responseItem.getFailureMessage(),
                 is(
                     "java.lang.IllegalArgumentException: index request targeting data stream "
-                        + "[logs-foobar] specifies a custom routing. target the backing indices directly or remove the custom routing."
+                        + "[logs-foobar] specifies a custom routing "
+                        + "but the [allow_custom_routing] setting was not enabled in the data stream's template."
                 )
             );
         }
     }
 
+    public void testIndexDocsWithCustomRoutingAllowed() throws Exception {
+        ComposableIndexTemplate template = new ComposableIndexTemplate(
+            List.of("logs-foobar*"),
+            new Template(null, null, null),
+            null,
+            null,
+            null,
+            null,
+            new ComposableIndexTemplate.DataStreamTemplate(false, true)
+        );
+        client().execute(
+            PutComposableIndexTemplateAction.INSTANCE,
+            new PutComposableIndexTemplateAction.Request("id1").indexTemplate(template)
+        ).actionGet();
+        // Index doc that triggers creation of a data stream
+        String dataStream = "logs-foobar";
+        IndexRequest indexRequest = new IndexRequest(dataStream).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
+            .opType(DocWriteRequest.OpType.CREATE)
+            .routing("custom");
+        IndexResponse indexResponse = client().index(indexRequest).actionGet();
+        assertThat(indexResponse.getIndex(), equalTo(DataStream.getDefaultBackingIndexName(dataStream, 1)));
+        // Index doc with custom routing that targets the data stream
+        IndexRequest indexRequestWithRouting = new IndexRequest(dataStream).source("@timestamp", System.currentTimeMillis())
+            .opType(DocWriteRequest.OpType.CREATE)
+            .routing("custom");
+        client().index(indexRequestWithRouting).actionGet();
+        // Bulk indexing with custom routing targeting the data stream
+        BulkRequest bulkRequest = new BulkRequest();
+        for (int i = 0; i < 10; i++) {
+            bulkRequest.add(
+                new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
+                    .source("@timestamp", System.currentTimeMillis())
+                    .routing("bulk-request-routing")
+            );
+        }
+        BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
+        for (BulkItemResponse responseItem : bulkResponse.getItems()) {
+            assertThat(responseItem.getFailure(), nullValue());
+        }
+    }
+
     public void testIndexDocsWithCustomRoutingTargetingBackingIndex() throws Exception {
         putComposableIndexTemplate("id1", List.of("logs-foo*"));
 
@@ -1572,6 +1616,113 @@ public class DataStreamIT extends ESIntegTestCase {
         putComposableIndexTemplate(id, null, patterns, null, null);
     }
 
+    public void testPartitionedTemplate() throws IOException {
+        /**
+         * partition size with no routing required
+         */
+        ComposableIndexTemplate template = new ComposableIndexTemplate(
+            List.of("logs"),
+            new Template(
+                Settings.builder().put("index.number_of_shards", "3").put("index.routing_partition_size", "2").build(),
+                null,
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            new ComposableIndexTemplate.DataStreamTemplate(false, true)
+        );
+        ComposableIndexTemplate finalTemplate = template;
+        client().execute(
+            PutComposableIndexTemplateAction.INSTANCE,
+            new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(finalTemplate)
+        ).actionGet();
+        /**
+         * partition size with routing required
+         */
+        template = new ComposableIndexTemplate(
+            List.of("logs"),
+            new Template(
+                Settings.builder().put("index.number_of_shards", "3").put("index.routing_partition_size", "2").build(),
+                new CompressedXContent("{\n" + "      \"_routing\": {\n" + "        \"required\": true\n" + "      }\n" + "    }"),
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            new ComposableIndexTemplate.DataStreamTemplate(false, true)
+        );
+        client().execute(
+            PutComposableIndexTemplateAction.INSTANCE,
+            new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(template)
+        ).actionGet();
+
+        /**
+         * routing enable with allow custom routing false
+         */
+        template = new ComposableIndexTemplate(
+            List.of("logs"),
+            new Template(
+                Settings.builder().put("index.number_of_shards", "3").put("index.routing_partition_size", "2").build(),
+                new CompressedXContent("{\n" + "      \"_routing\": {\n" + "        \"required\": true\n" + "      }\n" + "    }"),
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            new ComposableIndexTemplate.DataStreamTemplate(false, false)
+        );
+        ComposableIndexTemplate finalTemplate1 = template;
+        Exception e = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().execute(
+                PutComposableIndexTemplateAction.INSTANCE,
+                new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(finalTemplate1)
+            ).actionGet()
+        );
+        Exception actualException = (Exception) e.getCause();
+        assertTrue(
+            Throwables.getRootCause(actualException)
+                .getMessage()
+                .contains("mapping type [_doc] must have routing required for partitioned index")
+        );
+    }
+
+    public void testSearchWithRouting() throws IOException, ExecutionException, InterruptedException {
+        /**
+         * partition size with routing required
+         */
+        ComposableIndexTemplate template = new ComposableIndexTemplate(
+            List.of("my-logs"),
+            new Template(
+                Settings.builder()
+                    .put("index.number_of_shards", "10")
+                    .put("index.number_of_routing_shards", "10")
+                    .put("index.routing_partition_size", "4")
+                    .build(),
+                new CompressedXContent("{\n" + "      \"_routing\": {\n" + "        \"required\": true\n" + "      }\n" + "    }"),
+                null
+            ),
+            null,
+            null,
+            null,
+            null,
+            new ComposableIndexTemplate.DataStreamTemplate(false, true)
+        );
+        client().execute(
+            PutComposableIndexTemplateAction.INSTANCE,
+            new PutComposableIndexTemplateAction.Request("my-it").indexTemplate(template)
+        ).actionGet();
+        CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("my-logs");
+        client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
+        SearchRequest searchRequest = new SearchRequest("my-logs").routing("123");
+        SearchResponse searchResponse = client().search(searchRequest).actionGet();
+        assertEquals(searchResponse.getTotalShards(), 4);
+    }
+
     static void putComposableIndexTemplate(
         String id,
         @Nullable String mappings,

+ 1 - 1
x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java

@@ -242,7 +242,7 @@ public class DataStreamsStatsTests extends ESSingleNodeTestCase {
             null,
             null,
             null,
-            new ComposableIndexTemplate.DataStreamTemplate(hidden),
+            new ComposableIndexTemplate.DataStreamTemplate(hidden, false),
             null
         );
         assertTrue(