|
@@ -20,6 +20,7 @@
|
|
|
package org.elasticsearch.ingest;
|
|
|
|
|
|
import org.elasticsearch.ElasticsearchParseException;
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
|
@@ -31,12 +32,15 @@ import org.elasticsearch.cluster.ClusterService;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.ClusterStateListener;
|
|
|
import org.elasticsearch.cluster.metadata.MetaData;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.component.AbstractComponent;
|
|
|
import org.elasticsearch.common.regex.Regex;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
|
+import org.elasticsearch.ingest.core.IngestInfo;
|
|
|
import org.elasticsearch.ingest.core.Pipeline;
|
|
|
import org.elasticsearch.ingest.core.Processor;
|
|
|
+import org.elasticsearch.ingest.core.ProcessorInfo;
|
|
|
import org.elasticsearch.ingest.core.TemplateService;
|
|
|
import org.elasticsearch.script.ScriptService;
|
|
|
|
|
@@ -47,6 +51,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
|
|
|
public class PipelineStore extends AbstractComponent implements Closeable, ClusterStateListener {
|
|
|
|
|
@@ -130,8 +135,8 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
|
|
|
pipelines.remove(request.getId());
|
|
|
ClusterState.Builder newState = ClusterState.builder(currentState);
|
|
|
newState.metaData(MetaData.builder(currentState.getMetaData())
|
|
|
- .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
|
|
|
- .build());
|
|
|
+ .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
|
|
|
+ .build());
|
|
|
return newState.build();
|
|
|
}
|
|
|
}
|
|
@@ -139,15 +144,9 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
|
|
|
/**
|
|
|
* Stores the specified pipeline definition in the request.
|
|
|
*/
|
|
|
- public void put(ClusterService clusterService, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) {
|
|
|
+ public void put(ClusterService clusterService, Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request, ActionListener<WritePipelineResponse> listener) throws Exception {
|
|
|
// validates the pipeline and processor configuration before submitting a cluster update task:
|
|
|
- Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
|
|
|
- try {
|
|
|
- factory.create(request.getId(), pipelineConfig, processorRegistry);
|
|
|
- } catch(Exception e) {
|
|
|
- listener.onFailure(e);
|
|
|
- return;
|
|
|
- }
|
|
|
+ validatePipeline(ingestInfos, request);
|
|
|
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), new AckedClusterStateUpdateTask<WritePipelineResponse>(request, listener) {
|
|
|
|
|
|
@Override
|
|
@@ -162,6 +161,25 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
|
|
|
+ if (ingestInfos.isEmpty()) {
|
|
|
+ throw new IllegalStateException("Ingest info is empty");
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false).v2();
|
|
|
+ Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorRegistry);
|
|
|
+ List<IllegalArgumentException> exceptions = new ArrayList<>();
|
|
|
+ for (Processor processor : pipeline.flattenAllProcessors()) {
|
|
|
+ for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
|
|
|
+ if (entry.getValue().containsProcessor(processor.getType()) == false) {
|
|
|
+ String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
|
|
|
+ exceptions.add(new IllegalArgumentException(message));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ExceptionsHelper.rethrowAndSuppress(exceptions);
|
|
|
+ }
|
|
|
+
|
|
|
ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
|
|
|
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
|
|
|
Map<String, PipelineConfiguration> pipelines;
|