|
@@ -26,10 +26,13 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
|
|
|
import org.elasticsearch.cluster.metadata.Template;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.UUIDs;
|
|
|
import org.elasticsearch.common.compress.CompressedXContent;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
|
|
+import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
+import org.elasticsearch.core.Nullable;
|
|
|
import org.elasticsearch.features.NodeFeature;
|
|
|
import org.elasticsearch.index.IndexSettingProvider;
|
|
|
import org.elasticsearch.index.IndexSettingProviders;
|
|
@@ -37,6 +40,7 @@ import org.elasticsearch.index.IndexVersion;
|
|
|
import org.elasticsearch.index.IndexingPressure;
|
|
|
import org.elasticsearch.index.VersionType;
|
|
|
import org.elasticsearch.index.engine.Engine;
|
|
|
+import org.elasticsearch.index.mapper.MapperService;
|
|
|
import org.elasticsearch.index.mapper.SourceToParse;
|
|
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
|
|
import org.elasticsearch.index.shard.IndexShard;
|
|
@@ -50,6 +54,10 @@ import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xcontent.NamedXContentRegistry;
|
|
|
+import org.elasticsearch.xcontent.XContentFactory;
|
|
|
+import org.elasticsearch.xcontent.XContentParser;
|
|
|
+import org.elasticsearch.xcontent.XContentParserConfiguration;
|
|
|
+import org.elasticsearch.xcontent.XContentType;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.HashMap;
|
|
@@ -75,6 +83,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
"simulate.component.template.substitutions"
|
|
|
);
|
|
|
public static final NodeFeature SIMULATE_INDEX_TEMPLATE_SUBSTITUTIONS = new NodeFeature("simulate.index.template.substitutions");
|
|
|
+ public static final NodeFeature SIMULATE_MAPPING_ADDITION = new NodeFeature("simulate.mapping.addition");
|
|
|
private final IndicesService indicesService;
|
|
|
private final NamedXContentRegistry xContentRegistry;
|
|
|
private final Set<IndexSettingProvider> indexSettingProviders;
|
|
@@ -122,11 +131,17 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
|
|
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
|
|
|
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = bulkRequest.getIndexTemplateSubstitutions();
|
|
|
+ Map<String, Object> mappingAddition = ((SimulateBulkRequest) bulkRequest).getMappingAddition();
|
|
|
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
|
|
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
|
|
|
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
|
|
|
IndexRequest request = (IndexRequest) docRequest;
|
|
|
- Exception mappingValidationException = validateMappings(componentTemplateSubstitutions, indexTemplateSubstitutions, request);
|
|
|
+ Exception mappingValidationException = validateMappings(
|
|
|
+ componentTemplateSubstitutions,
|
|
|
+ indexTemplateSubstitutions,
|
|
|
+ mappingAddition,
|
|
|
+ request
|
|
|
+ );
|
|
|
responses.set(
|
|
|
i,
|
|
|
BulkItemResponse.success(
|
|
@@ -159,6 +174,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
private Exception validateMappings(
|
|
|
Map<String, ComponentTemplate> componentTemplateSubstitutions,
|
|
|
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
|
|
|
+ Map<String, Object> mappingAddition,
|
|
|
IndexRequest request
|
|
|
) {
|
|
|
final SourceToParse sourceToParse = new SourceToParse(
|
|
@@ -174,7 +190,10 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
Exception mappingValidationException = null;
|
|
|
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(request.index());
|
|
|
try {
|
|
|
- if (indexAbstraction != null && componentTemplateSubstitutions.isEmpty() && indexTemplateSubstitutions.isEmpty()) {
|
|
|
+ if (indexAbstraction != null
|
|
|
+ && componentTemplateSubstitutions.isEmpty()
|
|
|
+ && indexTemplateSubstitutions.isEmpty()
|
|
|
+ && mappingAddition.isEmpty()) {
|
|
|
/*
|
|
|
* In this case the index exists and we don't have any component template overrides. So we can just use withTempIndexService
|
|
|
* to do the mapping validation, using all the existing logic for validation.
|
|
@@ -250,36 +269,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
indexSettingProviders
|
|
|
);
|
|
|
CompressedXContent mappings = template.mappings();
|
|
|
- if (mappings != null) {
|
|
|
- MappingMetadata mappingMetadata = new MappingMetadata(mappings);
|
|
|
- Settings dummySettings = Settings.builder()
|
|
|
- .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
- .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
|
|
- .build();
|
|
|
- final IndexMetadata imd = IndexMetadata.builder(request.index())
|
|
|
- .settings(dummySettings)
|
|
|
- .putMapping(mappingMetadata)
|
|
|
- .build();
|
|
|
- indicesService.withTempIndexService(imd, indexService -> {
|
|
|
- indexService.mapperService().updateMapping(null, imd);
|
|
|
- return IndexShard.prepareIndex(
|
|
|
- indexService.mapperService(),
|
|
|
- sourceToParse,
|
|
|
- SequenceNumbers.UNASSIGNED_SEQ_NO,
|
|
|
- -1,
|
|
|
- -1,
|
|
|
- VersionType.INTERNAL,
|
|
|
- Engine.Operation.Origin.PRIMARY,
|
|
|
- Long.MIN_VALUE,
|
|
|
- false,
|
|
|
- request.ifSeqNo(),
|
|
|
- request.ifPrimaryTerm(),
|
|
|
- 0
|
|
|
- );
|
|
|
- });
|
|
|
- }
|
|
|
+ CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
|
|
|
+ validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse);
|
|
|
} else {
|
|
|
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedState.metadata(), request.index(), false);
|
|
|
final Map<String, Object> mappingsMap = MetadataCreateIndexService.parseV1Mappings(
|
|
@@ -287,40 +278,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
|
|
|
xContentRegistry
|
|
|
);
|
|
|
- final CompressedXContent combinedMappings;
|
|
|
- if (mappingsMap.isEmpty()) {
|
|
|
- combinedMappings = null;
|
|
|
- } else {
|
|
|
- combinedMappings = new CompressedXContent(mappingsMap);
|
|
|
- }
|
|
|
- Settings dummySettings = Settings.builder()
|
|
|
- .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
- .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
- .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
|
|
- .build();
|
|
|
- MappingMetadata mappingMetadata = combinedMappings == null ? null : new MappingMetadata(combinedMappings);
|
|
|
- final IndexMetadata imd = IndexMetadata.builder(request.index())
|
|
|
- .putMapping(mappingMetadata)
|
|
|
- .settings(dummySettings)
|
|
|
- .build();
|
|
|
- indicesService.withTempIndexService(imd, indexService -> {
|
|
|
- indexService.mapperService().updateMapping(null, imd);
|
|
|
- return IndexShard.prepareIndex(
|
|
|
- indexService.mapperService(),
|
|
|
- sourceToParse,
|
|
|
- SequenceNumbers.UNASSIGNED_SEQ_NO,
|
|
|
- -1,
|
|
|
- -1,
|
|
|
- VersionType.INTERNAL,
|
|
|
- Engine.Operation.Origin.PRIMARY,
|
|
|
- Long.MIN_VALUE,
|
|
|
- false,
|
|
|
- request.ifSeqNo(),
|
|
|
- request.ifPrimaryTerm(),
|
|
|
- 0
|
|
|
- );
|
|
|
- });
|
|
|
+ final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
|
|
|
+ validateUpdatedMappings(null, combinedMappings, request, sourceToParse);
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -329,6 +288,66 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
return mappingValidationException;
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Validates that when updatedMappings are applied
|
|
|
+ */
|
|
|
+ private void validateUpdatedMappings(
|
|
|
+ @Nullable CompressedXContent originalMappings,
|
|
|
+ @Nullable CompressedXContent updatedMappings,
|
|
|
+ IndexRequest request,
|
|
|
+ SourceToParse sourceToParse
|
|
|
+ ) throws IOException {
|
|
|
+ if (updatedMappings == null) {
|
|
|
+ return; // no validation to do
|
|
|
+ }
|
|
|
+ Settings dummySettings = Settings.builder()
|
|
|
+ .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
|
|
+ .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
|
|
+ .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
|
|
+ .build();
|
|
|
+ IndexMetadata.Builder originalIndexMetadataBuilder = IndexMetadata.builder(request.index()).settings(dummySettings);
|
|
|
+ if (originalMappings != null) {
|
|
|
+ originalIndexMetadataBuilder.putMapping(new MappingMetadata(originalMappings));
|
|
|
+ }
|
|
|
+ final IndexMetadata originalIndexMetadata = originalIndexMetadataBuilder.build();
|
|
|
+ final IndexMetadata updatedIndexMetadata = IndexMetadata.builder(request.index())
|
|
|
+ .settings(dummySettings)
|
|
|
+ .putMapping(new MappingMetadata(updatedMappings))
|
|
|
+ .build();
|
|
|
+ indicesService.withTempIndexService(originalIndexMetadata, indexService -> {
|
|
|
+ indexService.mapperService().merge(updatedIndexMetadata, MapperService.MergeReason.MAPPING_UPDATE);
|
|
|
+ return IndexShard.prepareIndex(
|
|
|
+ indexService.mapperService(),
|
|
|
+ sourceToParse,
|
|
|
+ SequenceNumbers.UNASSIGNED_SEQ_NO,
|
|
|
+ -1,
|
|
|
+ -1,
|
|
|
+ VersionType.INTERNAL,
|
|
|
+ Engine.Operation.Origin.PRIMARY,
|
|
|
+ Long.MIN_VALUE,
|
|
|
+ false,
|
|
|
+ request.ifSeqNo(),
|
|
|
+ request.ifPrimaryTerm(),
|
|
|
+ 0
|
|
|
+ );
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private static CompressedXContent mergeMappings(@Nullable CompressedXContent originalMapping, Map<String, Object> mappingAddition)
|
|
|
+ throws IOException {
|
|
|
+ Map<String, Object> combinedMappingMap = new HashMap<>();
|
|
|
+ if (originalMapping != null) {
|
|
|
+ combinedMappingMap.putAll(XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2());
|
|
|
+ }
|
|
|
+ XContentHelper.update(combinedMappingMap, mappingAddition, true);
|
|
|
+ if (combinedMappingMap.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ } else {
|
|
|
+ return convertMappingMapToXContent(combinedMappingMap);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* This overrides TransportSimulateBulkAction's getIngestService to allow us to provide an IngestService that handles pipeline
|
|
|
* substitutions defined in the request.
|
|
@@ -344,4 +363,25 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|
|
// A simulate bulk request should not change any persistent state in the system, so we never write to the failure store
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ private static CompressedXContent convertMappingMapToXContent(Map<String, Object> rawAdditionalMapping) throws IOException {
|
|
|
+ CompressedXContent compressedXContent;
|
|
|
+ if (rawAdditionalMapping == null || rawAdditionalMapping.isEmpty()) {
|
|
|
+ compressedXContent = null;
|
|
|
+ } else {
|
|
|
+ try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) {
|
|
|
+ compressedXContent = mappingFromXContent(parser);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return compressedXContent;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException {
|
|
|
+ XContentParser.Token token = parser.nextToken();
|
|
|
+ if (token == XContentParser.Token.START_OBJECT) {
|
|
|
+ return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
|
|
|
+ } else {
|
|
|
+ throw new IllegalArgumentException("Unexpected token: " + token);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|