|
@@ -49,7 +49,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
import org.elasticsearch.common.TriConsumer;
|
|
|
-import org.elasticsearch.common.bytes.BytesReference;
|
|
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|
|
import org.elasticsearch.common.logging.DeprecationCategory;
|
|
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
|
@@ -78,9 +77,9 @@ import org.elasticsearch.plugins.internal.XContentParserDecorator;
|
|
|
import org.elasticsearch.script.ScriptService;
|
|
|
import org.elasticsearch.threadpool.Scheduler;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
-import org.elasticsearch.xcontent.XContentBuilder;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.InstantSource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
@@ -569,16 +568,36 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
validatePipeline(ingestInfos, projectId, request.getId(), config);
|
|
|
}
|
|
|
|
|
|
+ public static void validateNoSystemPropertiesInPipelineConfig(final Map<String, Object> pipelineConfig) {
|
|
|
+ if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_MILLIS)) {
|
|
|
+ throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date_millis.");
|
|
|
+ } else if (pipelineConfig.containsKey(Pipeline.CREATED_DATE)) {
|
|
|
+ throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: created_date.");
|
|
|
+ } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_MILLIS)) {
|
|
|
+ throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date_millis.");
|
|
|
+ } else if (pipelineConfig.containsKey(Pipeline.MODIFIED_DATE)) {
|
|
|
+ throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system: modified_date.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Check whether updating a potentially existing pipeline will be a NOP.
|
|
|
+ * Will return <code>false</code> if request contains system-properties like created or modified_date,
|
|
|
+ * these should be rejected later.*/
|
|
|
public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) {
|
|
|
IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE);
|
|
|
if (request.getVersion() == null
|
|
|
&& currentIngestMetadata != null
|
|
|
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
|
|
|
- var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
|
|
|
- var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
|
|
|
- if (currentPipeline.getConfig().equals(pipelineConfig)) {
|
|
|
- return true;
|
|
|
- }
|
|
|
+
|
|
|
+ var newPipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
|
|
|
+
|
|
|
+ Map<String, Object> currentConfigWithoutSystemProps = new HashMap<>(
|
|
|
+ currentIngestMetadata.getPipelines().get(request.getId()).getConfig()
|
|
|
+ );
|
|
|
+ currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_MILLIS);
|
|
|
+ currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_MILLIS);
|
|
|
+
|
|
|
+ return newPipelineConfig.equals(currentConfigWithoutSystemProps);
|
|
|
}
|
|
|
|
|
|
return false;
|
|
@@ -676,10 +695,26 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
*/
|
|
|
public static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
|
|
|
private final PutPipelineRequest request;
|
|
|
-
|
|
|
- PutPipelineClusterStateUpdateTask(ProjectId projectId, ActionListener<AcknowledgedResponse> listener, PutPipelineRequest request) {
|
|
|
+ private final InstantSource instantSource;
|
|
|
+
|
|
|
+ // constructor allowing for injection of InstantSource/time for testing
|
|
|
+ PutPipelineClusterStateUpdateTask(
|
|
|
+ final ProjectId projectId,
|
|
|
+ final ActionListener<AcknowledgedResponse> listener,
|
|
|
+ final PutPipelineRequest request,
|
|
|
+ final InstantSource instantSource
|
|
|
+ ) {
|
|
|
super(projectId, listener);
|
|
|
this.request = request;
|
|
|
+ this.instantSource = instantSource;
|
|
|
+ }
|
|
|
+
|
|
|
+ PutPipelineClusterStateUpdateTask(
|
|
|
+ final ProjectId projectId,
|
|
|
+ final ActionListener<AcknowledgedResponse> listener,
|
|
|
+ final PutPipelineRequest request
|
|
|
+ ) {
|
|
|
+ this(projectId, listener, request, Instant::now);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -691,10 +726,15 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
|
|
|
@Override
|
|
|
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
|
|
|
- BytesReference pipelineSource = request.getSource();
|
|
|
+ final Map<String, PipelineConfiguration> pipelines = currentIngestMetadata == null
|
|
|
+ ? new HashMap<>(1)
|
|
|
+ : new HashMap<>(currentIngestMetadata.getPipelines());
|
|
|
+ final PipelineConfiguration existingPipeline = pipelines.get(request.getId());
|
|
|
+ final Map<String, Object> newPipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType())
|
|
|
+ .v2();
|
|
|
+
|
|
|
if (request.getVersion() != null) {
|
|
|
- var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
|
|
|
- if (currentPipeline == null) {
|
|
|
+ if (existingPipeline == null) {
|
|
|
throw new IllegalArgumentException(
|
|
|
String.format(
|
|
|
Locale.ROOT,
|
|
@@ -705,7 +745,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- final Integer currentVersion = currentPipeline.getVersion();
|
|
|
+ final Integer currentVersion = existingPipeline.getVersion();
|
|
|
if (Objects.equals(request.getVersion(), currentVersion) == false) {
|
|
|
throw new IllegalArgumentException(
|
|
|
String.format(
|
|
@@ -718,9 +758,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
|
|
|
- final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
|
|
|
- if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
|
|
|
+ final Integer specifiedVersion = (Integer) newPipelineConfig.get("version");
|
|
|
+ if (newPipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
|
|
|
throw new IllegalArgumentException(
|
|
|
String.format(
|
|
|
Locale.ROOT,
|
|
@@ -733,24 +772,24 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
|
|
|
// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
|
|
|
if (specifiedVersion == null) {
|
|
|
- pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
|
|
|
- try {
|
|
|
- var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
|
|
|
- pipelineSource = BytesReference.bytes(builder);
|
|
|
- } catch (IOException e) {
|
|
|
- throw new IllegalStateException(e);
|
|
|
- }
|
|
|
+ newPipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Map<String, PipelineConfiguration> pipelines;
|
|
|
- if (currentIngestMetadata != null) {
|
|
|
- pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
|
|
|
+ final long nowMillis = instantSource.millis();
|
|
|
+ if (existingPipeline == null) {
|
|
|
+ newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, nowMillis);
|
|
|
} else {
|
|
|
- pipelines = new HashMap<>();
|
|
|
+ Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_MILLIS);
|
|
|
+ // only set/carry over `created_date` if existing pipeline already has it.
|
|
|
+ // would be confusing if existing pipelines were all updated to have `created_date` set to now.
|
|
|
+ if (existingCreatedAt != null) {
|
|
|
+ newPipelineConfig.put(Pipeline.CREATED_DATE_MILLIS, existingCreatedAt);
|
|
|
+ }
|
|
|
}
|
|
|
+ newPipelineConfig.put(Pipeline.MODIFIED_DATE_MILLIS, nowMillis);
|
|
|
|
|
|
- pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
|
|
|
+ pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), newPipelineConfig));
|
|
|
return new IngestMetadata(pipelines);
|
|
|
}
|
|
|
}
|
|
@@ -762,6 +801,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
String pipelineId,
|
|
|
Map<String, Object> pipelineConfig
|
|
|
) throws Exception {
|
|
|
+ validateNoSystemPropertiesInPipelineConfig(pipelineConfig);
|
|
|
if (ingestInfos.isEmpty()) {
|
|
|
throw new IllegalStateException("Ingest info is empty");
|
|
|
}
|