Browse Source

INGEST: Simplify IngestService (#33008)

* INGEST: Simplify IngestService

* Follow up to #32617
* Flatten redundant inner classes of `IngestService`
Armin Braun 7 years ago
parent
commit
200078734c

+ 223 - 283
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -71,26 +71,31 @@ public class IngestService implements ClusterStateApplier {
     public static final String NOOP_PIPELINE_NAME = "_none";
 
     private final ClusterService clusterService;
-    private final PipelineStore pipelineStore;
-    private final PipelineExecutionService pipelineExecutionService;
+    private final Map<String, Processor.Factory> processorFactories;
+    // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
+    // We know of all the processor factories when a node with all its plugin have been initialized. Also some
+    // processor factories rely on other node services. Custom metadata is statically registered when classes
+    // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
+    private volatile Map<String, Pipeline> pipelines = new HashMap<>();
+    private final ThreadPool threadPool;
+    private final StatsHolder totalStats = new StatsHolder();
+    private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
 
     public IngestService(ClusterService clusterService, ThreadPool threadPool,
                          Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
                          List<IngestPlugin> ingestPlugins) {
         this.clusterService = clusterService;
-        this.pipelineStore = new PipelineStore(
-            processorFactories(
-                ingestPlugins,
-                new Processor.Parameters(
-                    env, scriptService, analysisRegistry,
-                    threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
-                    (delay, command) -> threadPool.schedule(
-                        TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command
-                    )
+        this.processorFactories = processorFactories(
+            ingestPlugins,
+            new Processor.Parameters(
+                env, scriptService, analysisRegistry,
+                threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
+                (delay, command) -> threadPool.schedule(
+                    TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command
                 )
             )
         );
-        this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
+        this.threadPool = threadPool;
     }
 
     private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins,
@@ -114,8 +119,7 @@ public class IngestService implements ClusterStateApplier {
     /**
      * Deletes the pipeline specified by id in the request.
      */
-    public void delete(DeletePipelineRequest request,
-        ActionListener<AcknowledgedResponse> listener) {
+    public void delete(DeletePipelineRequest request, ActionListener<AcknowledgedResponse> listener) {
         clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(),
                 new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
 
@@ -198,32 +202,23 @@ public class IngestService implements ClusterStateApplier {
         return result;
     }
 
-    public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests, BiConsumer<IndexRequest, Exception> itemFailureHandler,
-        Consumer<Exception> completionHandler) {
-        pipelineExecutionService.executeBulkRequest(actionRequests, itemFailureHandler, completionHandler);
-    }
-
-    public IngestStats stats() {
-        return pipelineExecutionService.stats();
-    }
-
     /**
      * Stores the specified pipeline definition in the request.
      */
     public void putPipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
         ActionListener<AcknowledgedResponse> listener) throws Exception {
-        pipelineStore.put(clusterService, ingestInfos, request, listener);
+        put(clusterService, ingestInfos, request, listener);
     }
 
     /**
      * Returns the pipeline by the specified id
      */
     public Pipeline getPipeline(String id) {
-        return pipelineStore.get(id);
+        return pipelines.get(id);
     }
 
     public Map<String, Processor.Factory> getProcessorFactories() {
-        return pipelineStore.getProcessorFactories();
+        return processorFactories;
     }
 
     public IngestInfo info() {
@@ -236,99 +231,64 @@ public class IngestService implements ClusterStateApplier {
     }
 
     Map<String, Pipeline> pipelines() {
-        return pipelineStore.pipelines;
-    }
-
-    void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
-        pipelineStore.validatePipeline(ingestInfos, request);
-    }
-
-    void updatePipelineStats(IngestMetadata ingestMetadata) {
-        pipelineExecutionService.updatePipelineStats(ingestMetadata);
+        return pipelines;
     }
 
     @Override
     public void applyClusterState(final ClusterChangedEvent event) {
         ClusterState state = event.state();
-        pipelineStore.innerUpdatePipelines(event.previousState(), state);
+        innerUpdatePipelines(event.previousState(), state);
         IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
         if (ingestMetadata != null) {
-            pipelineExecutionService.updatePipelineStats(ingestMetadata);
+            updatePipelineStats(ingestMetadata);
         }
     }
 
-    public static final class PipelineStore {
-
-        private final Map<String, Processor.Factory> processorFactories;
-
-        // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
-        // We know of all the processor factories when a node with all its plugin have been initialized. Also some
-        // processor factories rely on other node services. Custom metadata is statically registered when classes
-        // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
-        volatile Map<String, Pipeline> pipelines = new HashMap<>();
-
-        private PipelineStore(Map<String, Processor.Factory> processorFactories) {
-            this.processorFactories = processorFactories;
-        }
-
-        void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
-            if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
-                return;
+    private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
+        String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
+        String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
+        String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
+        Processor failureProcessor = new AbstractProcessor(tag) {
+            @Override
+            public void execute(IngestDocument ingestDocument) {
+                throw new IllegalStateException(errorMessage);
             }
 
-            IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
-            IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE);
-            if (Objects.equals(ingestMetadata, previousIngestMetadata)) {
-                return;
+            @Override
+            public String getType() {
+                return type;
             }
+        };
+        String description = "this is a place holder pipeline, because pipeline with id [" +  id + "] could not be loaded";
+        return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
+    }
 
-            Map<String, Pipeline> pipelines = new HashMap<>();
-            List<ElasticsearchParseException> exceptions = new ArrayList<>();
-            for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
-                try {
-                    pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
-                } catch (ElasticsearchParseException e) {
-                    pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
-                    exceptions.add(e);
-                } catch (Exception e) {
-                    ElasticsearchParseException parseException = new ElasticsearchParseException(
-                        "Error updating pipeline with id [" + pipeline.getId() + "]", e);
-                    pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException));
-                    exceptions.add(parseException);
-                }
-            }
-            this.pipelines = Collections.unmodifiableMap(pipelines);
-            ExceptionsHelper.rethrowAndSuppress(exceptions);
+    static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
+        IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
+        Map<String, PipelineConfiguration> pipelines;
+        if (currentIngestMetadata != null) {
+            pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
+        } else {
+            pipelines = new HashMap<>();
         }
 
-        private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
-            String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
-            String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
-            String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
-            Processor failureProcessor = new AbstractProcessor(tag) {
-                @Override
-                public void execute(IngestDocument ingestDocument) {
-                    throw new IllegalStateException(errorMessage);
-                }
-
-                @Override
-                public String getType() {
-                    return type;
-                }
-            };
-            String description = "this is a place holder pipeline, because pipeline with id [" +  id + "] could not be loaded";
-            return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
-        }
+        pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
+        ClusterState.Builder newState = ClusterState.builder(currentState);
+        newState.metaData(MetaData.builder(currentState.getMetaData())
+            .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
+            .build());
+        return newState.build();
+    }
 
-        /**
-         * Stores the specified pipeline definition in the request.
-         */
-        public void put(ClusterService clusterService, Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
-                        ActionListener<AcknowledgedResponse> listener) throws Exception {
-            // validates the pipeline and processor configuration before submitting a cluster update task:
-            validatePipeline(ingestInfos, request);
-            clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
-                    new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
+    /**
+     * Stores the specified pipeline definition in the request.
+     */
+    public void put(ClusterService clusterService, Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
+        ActionListener<AcknowledgedResponse> listener) throws Exception {
+        // validates the pipeline and processor configuration before submitting a cluster update task:
+        validatePipeline(ingestInfos, request);
+        clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
+            new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
 
                 @Override
                 protected AcknowledgedResponse newResponse(boolean acknowledged) {
@@ -340,222 +300,202 @@ public class IngestService implements ClusterStateApplier {
                     return innerPut(request, currentState);
                 }
             });
-        }
-
-        void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
-            if (ingestInfos.isEmpty()) {
-                throw new IllegalStateException("Ingest info is empty");
-            }
+    }
 
-            Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
-            Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories);
-            List<Exception> exceptions = new ArrayList<>();
-            for (Processor processor : pipeline.flattenAllProcessors()) {
-                for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
-                    if (entry.getValue().containsProcessor(processor.getType()) == false) {
-                        String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
-                        exceptions.add(
-                            ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)
-                        );
-                    }
-                }
-            }
-            ExceptionsHelper.rethrowAndSuppress(exceptions);
+    void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
+        if (ingestInfos.isEmpty()) {
+            throw new IllegalStateException("Ingest info is empty");
         }
 
-        static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
-            IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
-            Map<String, PipelineConfiguration> pipelines;
-            if (currentIngestMetadata != null) {
-                pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
-            } else {
-                pipelines = new HashMap<>();
+        Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
+        Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories);
+        List<Exception> exceptions = new ArrayList<>();
+        for (Processor processor : pipeline.flattenAllProcessors()) {
+            for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
+                if (entry.getValue().containsProcessor(processor.getType()) == false) {
+                    String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
+                    exceptions.add(
+                        ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)
+                    );
+                }
             }
-
-            pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
-            ClusterState.Builder newState = ClusterState.builder(currentState);
-            newState.metaData(MetaData.builder(currentState.getMetaData())
-                .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
-                .build());
-            return newState.build();
-        }
-
-        /**
-         * Returns the pipeline by the specified id
-         */
-        public Pipeline get(String id) {
-            return pipelines.get(id);
-        }
-
-        public Map<String, Processor.Factory> getProcessorFactories() {
-            return processorFactories;
         }
+        ExceptionsHelper.rethrowAndSuppress(exceptions);
     }
 
-    private static final class PipelineExecutionService {
-
-        private final PipelineStore store;
-        private final ThreadPool threadPool;
+    public void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
+        BiConsumer<IndexRequest, Exception> itemFailureHandler, Consumer<Exception> completionHandler) {
+        threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
 
-        private final StatsHolder totalStats = new StatsHolder();
-        private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
-
-        PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
-            this.store = store;
-            this.threadPool = threadPool;
-        }
-
-        void executeBulkRequest(Iterable<DocWriteRequest<?>> actionRequests,
-                                       BiConsumer<IndexRequest, Exception> itemFailureHandler,
-                                       Consumer<Exception> completionHandler) {
-            threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
-
-                @Override
-                public void onFailure(Exception e) {
-                    completionHandler.accept(e);
-                }
+            @Override
+            public void onFailure(Exception e) {
+                completionHandler.accept(e);
+            }
 
-                @Override
-                protected void doRun() {
-                    for (DocWriteRequest<?> actionRequest : actionRequests) {
-                        IndexRequest indexRequest = null;
-                        if (actionRequest instanceof IndexRequest) {
-                            indexRequest = (IndexRequest) actionRequest;
-                        } else if (actionRequest instanceof UpdateRequest) {
-                            UpdateRequest updateRequest = (UpdateRequest) actionRequest;
-                            indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
-                        }
-                        if (indexRequest == null) {
-                            continue;
-                        }
-                        String pipeline = indexRequest.getPipeline();
-                        if (NOOP_PIPELINE_NAME.equals(pipeline) == false) {
-                            try {
-                                innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
-                                //this shouldn't be needed here but we do it for consistency with index api
-                                // which requires it to prevent double execution
-                                indexRequest.setPipeline(NOOP_PIPELINE_NAME);
-                            } catch (Exception e) {
-                                itemFailureHandler.accept(indexRequest, e);
+            @Override
+            protected void doRun() {
+                for (DocWriteRequest<?> actionRequest : actionRequests) {
+                    IndexRequest indexRequest = null;
+                    if (actionRequest instanceof IndexRequest) {
+                        indexRequest = (IndexRequest) actionRequest;
+                    } else if (actionRequest instanceof UpdateRequest) {
+                        UpdateRequest updateRequest = (UpdateRequest) actionRequest;
+                        indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
+                    }
+                    if (indexRequest == null) {
+                        continue;
+                    }
+                    String pipelineId = indexRequest.getPipeline();
+                    if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
+                        try {
+                            Pipeline pipeline = pipelines.get(pipelineId);
+                            if (pipeline == null) {
+                                throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
                             }
+                            innerExecute(indexRequest, pipeline);
+                            //this shouldn't be needed here but we do it for consistency with index api
+                            // which requires it to prevent double execution
+                            indexRequest.setPipeline(NOOP_PIPELINE_NAME);
+                        } catch (Exception e) {
+                            itemFailureHandler.accept(indexRequest, e);
                         }
                     }
-                    completionHandler.accept(null);
                 }
-            });
-        }
-
-        IngestStats stats() {
-            Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
-
-            Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
-            for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
-                statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
+                completionHandler.accept(null);
             }
+        });
+    }
 
-            return new IngestStats(totalStats.createStats(), statsPerPipeline);
+    public IngestStats stats() {
+        Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
+
+        Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
+        for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
+            statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
         }
 
-        void updatePipelineStats(IngestMetadata ingestMetadata) {
-            boolean changed = false;
-            Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
-            Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
-            while (iterator.hasNext()) {
-                String pipeline = iterator.next();
-                if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
-                    iterator.remove();
-                    changed = true;
-                }
-            }
-            for (String pipeline : ingestMetadata.getPipelines().keySet()) {
-                if (newStatsPerPipeline.containsKey(pipeline) == false) {
-                    newStatsPerPipeline.put(pipeline, new StatsHolder());
-                    changed = true;
-                }
-            }
+        return new IngestStats(totalStats.createStats(), statsPerPipeline);
+    }
 
-            if (changed) {
-                statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
+    void updatePipelineStats(IngestMetadata ingestMetadata) {
+        boolean changed = false;
+        Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
+        Iterator<String> iterator = newStatsPerPipeline.keySet().iterator();
+        while (iterator.hasNext()) {
+            String pipeline = iterator.next();
+            if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
+                iterator.remove();
+                changed = true;
             }
         }
-
-        private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
-            if (pipeline.getProcessors().isEmpty()) {
-                return;
+        for (String pipeline : ingestMetadata.getPipelines().keySet()) {
+            if (newStatsPerPipeline.containsKey(pipeline) == false) {
+                newStatsPerPipeline.put(pipeline, new StatsHolder());
+                changed = true;
             }
+        }
 
-            long startTimeInNanos = System.nanoTime();
-            // the pipeline specific stat holder may not exist and that is fine:
-            // (e.g. the pipeline may have been removed while we're ingesting a document
-            Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
-            try {
-                totalStats.preIngest();
-                pipelineStats.ifPresent(StatsHolder::preIngest);
-                String index = indexRequest.index();
-                String type = indexRequest.type();
-                String id = indexRequest.id();
-                String routing = indexRequest.routing();
-                Long version = indexRequest.version();
-                VersionType versionType = indexRequest.versionType();
-                Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
-                IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
-                pipeline.execute(ingestDocument);
-
-                Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
-                //it's fine to set all metadata fields all the time, as ingest document holds their starting values
-                //before ingestion, which might also get modified during ingestion.
-                indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
-                indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
-                indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
-                indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
-                indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
-                if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
-                    indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
-                }
-                indexRequest.source(ingestDocument.getSourceAndMetadata());
-            } catch (Exception e) {
-                totalStats.ingestFailed();
-                pipelineStats.ifPresent(StatsHolder::ingestFailed);
-                throw e;
-            } finally {
-                long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
-                totalStats.postIngest(ingestTimeInMillis);
-                pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
-            }
+        if (changed) {
+            statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
+        }
+    }
+
+    private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
+        if (pipeline.getProcessors().isEmpty()) {
+            return;
         }
 
-        private Pipeline getPipeline(String pipelineId) {
-            Pipeline pipeline = store.get(pipelineId);
-            if (pipeline == null) {
-                throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
+        long startTimeInNanos = System.nanoTime();
+        // the pipeline specific stat holder may not exist and that is fine:
+        // (e.g. the pipeline may have been removed while we're ingesting a document
+        Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
+        try {
+            totalStats.preIngest();
+            pipelineStats.ifPresent(StatsHolder::preIngest);
+            String index = indexRequest.index();
+            String type = indexRequest.type();
+            String id = indexRequest.id();
+            String routing = indexRequest.routing();
+            Long version = indexRequest.version();
+            VersionType versionType = indexRequest.versionType();
+            Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
+            IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
+            pipeline.execute(ingestDocument);
+
+            Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
+            //it's fine to set all metadata fields all the time, as ingest document holds their starting values
+            //before ingestion, which might also get modified during ingestion.
+            indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
+            indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
+            indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
+            indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
+            indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
+            if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
+                indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
             }
-            return pipeline;
+            indexRequest.source(ingestDocument.getSourceAndMetadata());
+        } catch (Exception e) {
+            totalStats.ingestFailed();
+            pipelineStats.ifPresent(StatsHolder::ingestFailed);
+            throw e;
+        } finally {
+            long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
+            totalStats.postIngest(ingestTimeInMillis);
+            pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
         }
+    }
 
-        private static class StatsHolder {
+    private void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
+        if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+            return;
+        }
 
-            private final MeanMetric ingestMetric = new MeanMetric();
-            private final CounterMetric ingestCurrent = new CounterMetric();
-            private final CounterMetric ingestFailed = new CounterMetric();
+        IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
+        IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE);
+        if (Objects.equals(ingestMetadata, previousIngestMetadata)) {
+            return;
+        }
 
-            void preIngest() {
-                ingestCurrent.inc();
+        Map<String, Pipeline> pipelines = new HashMap<>();
+        List<ElasticsearchParseException> exceptions = new ArrayList<>();
+        for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
+            try {
+                pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
+            } catch (ElasticsearchParseException e) {
+                pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
+                exceptions.add(e);
+            } catch (Exception e) {
+                ElasticsearchParseException parseException = new ElasticsearchParseException(
+                    "Error updating pipeline with id [" + pipeline.getId() + "]", e);
+                pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException));
+                exceptions.add(parseException);
             }
+        }
+        this.pipelines = Collections.unmodifiableMap(pipelines);
+        ExceptionsHelper.rethrowAndSuppress(exceptions);
+    }
 
-            void postIngest(long ingestTimeInMillis) {
-                ingestCurrent.dec();
-                ingestMetric.inc(ingestTimeInMillis);
-            }
+    private static class StatsHolder {
 
-            void ingestFailed() {
-                ingestFailed.inc();
-            }
+        private final MeanMetric ingestMetric = new MeanMetric();
+        private final CounterMetric ingestCurrent = new CounterMetric();
+        private final CounterMetric ingestFailed = new CounterMetric();
 
-            IngestStats.Stats createStats() {
-                return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
-            }
+        void preIngest() {
+            ingestCurrent.inc();
+        }
 
+        void postIngest(long ingestTimeInMillis) {
+            ingestCurrent.dec();
+            ingestMetric.inc(ingestTimeInMillis);
         }
 
+        void ingestFailed() {
+            ingestFailed.inc();
+        }
+
+        IngestStats.Stats createStats() {
+            return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
+        }
     }
 }

+ 16 - 16
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -206,7 +206,7 @@ public class IngestServiceTests extends ESTestCase {
         PutPipelineRequest putRequest = new PutPipelineRequest(id,
             new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON);
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         pipeline = ingestService.getPipeline(id);
         assertThat(pipeline, notNullValue());
@@ -233,7 +233,7 @@ public class IngestServiceTests extends ESTestCase {
         // add a new pipeline:
         PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON);
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         pipeline = ingestService.getPipeline(id);
         assertThat(pipeline, notNullValue());
@@ -245,7 +245,7 @@ public class IngestServiceTests extends ESTestCase {
         putRequest =
             new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON);
         previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         pipeline = ingestService.getPipeline(id);
         assertThat(pipeline, notNullValue());
@@ -264,7 +264,7 @@ public class IngestServiceTests extends ESTestCase {
         PutPipelineRequest putRequest =
             new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON);
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         try {
             ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
             fail("should fail");
@@ -439,7 +439,7 @@ public class IngestServiceTests extends ESTestCase {
         PutPipelineRequest putRequest = new PutPipelineRequest(id,
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final SetOnce<Boolean> failure = new SetOnce<>();
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id);
@@ -467,7 +467,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
 
         BulkRequest bulkRequest = new BulkRequest();
@@ -507,7 +507,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
         @SuppressWarnings("unchecked")
@@ -525,7 +525,7 @@ public class IngestServiceTests extends ESTestCase {
             new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
         @SuppressWarnings("unchecked")
@@ -545,7 +545,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final long newVersion = randomLong();
         final String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
@@ -587,7 +587,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
         doThrow(new RuntimeException())
@@ -616,7 +616,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
         doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap()));
@@ -645,7 +645,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
         doThrow(new RuntimeException())
@@ -700,7 +700,7 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
 
         @SuppressWarnings("unchecked")
@@ -734,7 +734,7 @@ public class IngestServiceTests extends ESTestCase {
             new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
 
         @SuppressWarnings("unchecked")
@@ -762,12 +762,12 @@ public class IngestServiceTests extends ESTestCase {
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
         ClusterState previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         putRequest = new PutPipelineRequest("_id2",
             new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON);
         previousClusterState = clusterState;
-        clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState);
+        clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
         configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON));