IngestService.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. /*
  2. * Licensed to Elasticsearch under one or more contributor
  3. * license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright
  5. * ownership. Elasticsearch licenses this file to you under
  6. * the Apache License, Version 2.0 (the "License"); you may
  7. * not use this file except in compliance with the License.
  8. * You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package org.elasticsearch.ingest;
  20. import org.apache.logging.log4j.LogManager;
  21. import org.apache.logging.log4j.Logger;
  22. import org.elasticsearch.ElasticsearchParseException;
  23. import org.elasticsearch.ExceptionsHelper;
  24. import org.elasticsearch.ResourceNotFoundException;
  25. import org.elasticsearch.action.ActionListener;
  26. import org.elasticsearch.action.DocWriteRequest;
  27. import org.elasticsearch.action.bulk.TransportBulkAction;
  28. import org.elasticsearch.action.index.IndexRequest;
  29. import org.elasticsearch.action.ingest.DeletePipelineRequest;
  30. import org.elasticsearch.action.ingest.PutPipelineRequest;
  31. import org.elasticsearch.action.support.master.AcknowledgedResponse;
  32. import org.elasticsearch.client.Client;
  33. import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
  34. import org.elasticsearch.cluster.ClusterChangedEvent;
  35. import org.elasticsearch.cluster.ClusterState;
  36. import org.elasticsearch.cluster.ClusterStateApplier;
  37. import org.elasticsearch.cluster.metadata.MetaData;
  38. import org.elasticsearch.cluster.node.DiscoveryNode;
  39. import org.elasticsearch.cluster.service.ClusterService;
  40. import org.elasticsearch.common.collect.Tuple;
  41. import org.elasticsearch.common.regex.Regex;
  42. import org.elasticsearch.common.unit.TimeValue;
  43. import org.elasticsearch.common.util.concurrent.AbstractRunnable;
  44. import org.elasticsearch.common.xcontent.XContentHelper;
  45. import org.elasticsearch.env.Environment;
  46. import org.elasticsearch.gateway.GatewayService;
  47. import org.elasticsearch.index.VersionType;
  48. import org.elasticsearch.index.analysis.AnalysisRegistry;
  49. import org.elasticsearch.plugins.IngestPlugin;
  50. import org.elasticsearch.script.ScriptService;
  51. import org.elasticsearch.threadpool.ThreadPool;
  52. import java.util.ArrayList;
  53. import java.util.Collections;
  54. import java.util.HashMap;
  55. import java.util.HashSet;
  56. import java.util.Iterator;
  57. import java.util.List;
  58. import java.util.Map;
  59. import java.util.Objects;
  60. import java.util.Set;
  61. import java.util.concurrent.CopyOnWriteArrayList;
  62. import java.util.concurrent.TimeUnit;
  63. import java.util.concurrent.atomic.AtomicInteger;
  64. import java.util.function.BiConsumer;
  65. import java.util.function.Consumer;
  66. import java.util.function.IntConsumer;
  67. /**
  68. * Holder class for several ingest related services.
  69. */
  70. public class IngestService implements ClusterStateApplier {
  71. public static final String NOOP_PIPELINE_NAME = "_none";
  72. private static final Logger logger = LogManager.getLogger(IngestService.class);
  73. private final ClusterService clusterService;
  74. private final ScriptService scriptService;
  75. private final Map<String, Processor.Factory> processorFactories;
  76. // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
  77. // We know of all the processor factories when a node with all its plugin have been initialized. Also some
  78. // processor factories rely on other node services. Custom metadata is statically registered when classes
  79. // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
  80. private volatile Map<String, PipelineHolder> pipelines = Map.of();
  81. private final ThreadPool threadPool;
  82. private final IngestMetric totalMetrics = new IngestMetric();
  83. private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
  84. public IngestService(ClusterService clusterService, ThreadPool threadPool,
  85. Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
  86. List<IngestPlugin> ingestPlugins, Client client) {
  87. this.clusterService = clusterService;
  88. this.scriptService = scriptService;
  89. this.processorFactories = processorFactories(
  90. ingestPlugins,
  91. new Processor.Parameters(
  92. env, scriptService, analysisRegistry,
  93. threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
  94. (delay, command) -> threadPool.schedule(
  95. command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
  96. ), this, client
  97. )
  98. );
  99. this.threadPool = threadPool;
  100. }
  101. private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins,
  102. Processor.Parameters parameters) {
  103. Map<String, Processor.Factory> processorFactories = new HashMap<>();
  104. for (IngestPlugin ingestPlugin : ingestPlugins) {
  105. Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);
  106. for (Map.Entry<String, Processor.Factory> entry : newProcessors.entrySet()) {
  107. if (processorFactories.put(entry.getKey(), entry.getValue()) != null) {
  108. throw new IllegalArgumentException("Ingest processor [" + entry.getKey() + "] is already registered");
  109. }
  110. }
  111. }
  112. return Collections.unmodifiableMap(processorFactories);
  113. }
  114. public ClusterService getClusterService() {
  115. return clusterService;
  116. }
  117. public ScriptService getScriptService() {
  118. return scriptService;
  119. }
  120. /**
  121. * Deletes the pipeline specified by id in the request.
  122. */
  123. public void delete(DeletePipelineRequest request, ActionListener<AcknowledgedResponse> listener) {
  124. clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(),
  125. new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
  126. @Override
  127. protected AcknowledgedResponse newResponse(boolean acknowledged) {
  128. return new AcknowledgedResponse(acknowledged);
  129. }
  130. @Override
  131. public ClusterState execute(ClusterState currentState) {
  132. return innerDelete(request, currentState);
  133. }
  134. });
  135. }
  136. static ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) {
  137. IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
  138. if (currentIngestMetadata == null) {
  139. return currentState;
  140. }
  141. Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
  142. Set<String> toRemove = new HashSet<>();
  143. for (String pipelineKey : pipelines.keySet()) {
  144. if (Regex.simpleMatch(request.getId(), pipelineKey)) {
  145. toRemove.add(pipelineKey);
  146. }
  147. }
  148. if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) {
  149. throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId());
  150. } else if (toRemove.isEmpty()) {
  151. return currentState;
  152. }
  153. final Map<String, PipelineConfiguration> pipelinesCopy = new HashMap<>(pipelines);
  154. for (String key : toRemove) {
  155. pipelinesCopy.remove(key);
  156. }
  157. ClusterState.Builder newState = ClusterState.builder(currentState);
  158. newState.metaData(MetaData.builder(currentState.getMetaData())
  159. .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy))
  160. .build());
  161. return newState.build();
  162. }
  163. /**
  164. * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
  165. * may be returned
  166. */
  167. // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't
  168. // know how to serialize themselves.
  169. public static List<PipelineConfiguration> getPipelines(ClusterState clusterState, String... ids) {
  170. IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE);
  171. return innerGetPipelines(ingestMetadata, ids);
  172. }
  173. static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetadata, String... ids) {
  174. if (ingestMetadata == null) {
  175. return Collections.emptyList();
  176. }
  177. // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*')
  178. if (ids.length == 0) {
  179. return new ArrayList<>(ingestMetadata.getPipelines().values());
  180. }
  181. List<PipelineConfiguration> result = new ArrayList<>(ids.length);
  182. for (String id : ids) {
  183. if (Regex.isSimpleMatchPattern(id)) {
  184. for (Map.Entry<String, PipelineConfiguration> entry : ingestMetadata.getPipelines().entrySet()) {
  185. if (Regex.simpleMatch(id, entry.getKey())) {
  186. result.add(entry.getValue());
  187. }
  188. }
  189. } else {
  190. PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id);
  191. if (pipeline != null) {
  192. result.add(pipeline);
  193. }
  194. }
  195. }
  196. return result;
  197. }
  198. /**
  199. * Stores the specified pipeline definition in the request.
  200. */
  201. public void putPipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
  202. ActionListener<AcknowledgedResponse> listener) throws Exception {
  203. // validates the pipeline and processor configuration before submitting a cluster update task:
  204. validatePipeline(ingestInfos, request);
  205. clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
  206. new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
  207. @Override
  208. protected AcknowledgedResponse newResponse(boolean acknowledged) {
  209. return new AcknowledgedResponse(acknowledged);
  210. }
  211. @Override
  212. public ClusterState execute(ClusterState currentState) {
  213. return innerPut(request, currentState);
  214. }
  215. });
  216. }
  217. /**
  218. * Returns the pipeline by the specified id
  219. */
  220. public Pipeline getPipeline(String id) {
  221. PipelineHolder holder = pipelines.get(id);
  222. if (holder != null) {
  223. return holder.pipeline;
  224. } else {
  225. return null;
  226. }
  227. }
  228. public Map<String, Processor.Factory> getProcessorFactories() {
  229. return processorFactories;
  230. }
  231. public IngestInfo info() {
  232. Map<String, Processor.Factory> processorFactories = getProcessorFactories();
  233. List<ProcessorInfo> processorInfoList = new ArrayList<>(processorFactories.size());
  234. for (Map.Entry<String, Processor.Factory> entry : processorFactories.entrySet()) {
  235. processorInfoList.add(new ProcessorInfo(entry.getKey()));
  236. }
  237. return new IngestInfo(processorInfoList);
  238. }
  239. Map<String, PipelineHolder> pipelines() {
  240. return pipelines;
  241. }
  242. /**
  243. * Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
  244. * wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
  245. * @param compoundProcessor The compound processor to start walking the non-failure processors
  246. * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
  247. * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
  248. */
  249. private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(CompoundProcessor compoundProcessor,
  250. List<Tuple<Processor, IngestMetric>> processorMetrics) {
  251. //only surface the top level non-failure processors, on-failure processor times will be included in the top level non-failure
  252. for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
  253. Processor processor = processorWithMetric.v1();
  254. IngestMetric metric = processorWithMetric.v2();
  255. if (processor instanceof CompoundProcessor) {
  256. getProcessorMetrics((CompoundProcessor) processor, processorMetrics);
  257. } else {
  258. //Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
  259. if (processor instanceof ConditionalProcessor) {
  260. metric = ((ConditionalProcessor) processor).getMetric();
  261. }
  262. processorMetrics.add(new Tuple<>(processor, metric));
  263. }
  264. }
  265. return processorMetrics;
  266. }
  267. static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
  268. IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);
  269. Map<String, PipelineConfiguration> pipelines;
  270. if (currentIngestMetadata != null) {
  271. pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
  272. } else {
  273. pipelines = new HashMap<>();
  274. }
  275. pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
  276. ClusterState.Builder newState = ClusterState.builder(currentState);
  277. newState.metaData(MetaData.builder(currentState.getMetaData())
  278. .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
  279. .build());
  280. return newState.build();
  281. }
  282. void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
  283. if (ingestInfos.isEmpty()) {
  284. throw new IllegalStateException("Ingest info is empty");
  285. }
  286. Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
  287. Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);
  288. List<Exception> exceptions = new ArrayList<>();
  289. for (Processor processor : pipeline.flattenAllProcessors()) {
  290. for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
  291. String type = processor.getType();
  292. if (entry.getValue().containsProcessor(type) == false && ConditionalProcessor.TYPE.equals(type) == false) {
  293. String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]";
  294. exceptions.add(
  295. ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)
  296. );
  297. }
  298. }
  299. }
  300. ExceptionsHelper.rethrowAndSuppress(exceptions);
  301. }
  302. public void executeBulkRequest(int numberOfActionRequests,
  303. Iterable<DocWriteRequest<?>> actionRequests,
  304. BiConsumer<Integer, Exception> itemFailureHandler,
  305. BiConsumer<Thread, Exception> completionHandler,
  306. IntConsumer itemDroppedHandler) {
  307. threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
  308. @Override
  309. public void onFailure(Exception e) {
  310. completionHandler.accept(null, e);
  311. }
  312. @Override
  313. protected void doRun() {
  314. final Thread originalThread = Thread.currentThread();
  315. final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
  316. int i = 0;
  317. for (DocWriteRequest<?> actionRequest : actionRequests) {
  318. IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
  319. if (indexRequest == null) {
  320. if (counter.decrementAndGet() == 0){
  321. completionHandler.accept(originalThread, null);
  322. }
  323. assert counter.get() >= 0;
  324. continue;
  325. }
  326. String pipelineId = indexRequest.getPipeline();
  327. if (NOOP_PIPELINE_NAME.equals(pipelineId)) {
  328. if (counter.decrementAndGet() == 0){
  329. completionHandler.accept(originalThread, null);
  330. }
  331. assert counter.get() >= 0;
  332. continue;
  333. }
  334. final int slot = i;
  335. try {
  336. PipelineHolder holder = pipelines.get(pipelineId);
  337. if (holder == null) {
  338. throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
  339. }
  340. Pipeline pipeline = holder.pipeline;
  341. innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> {
  342. if (e == null) {
  343. // this shouldn't be needed here but we do it for consistency with index api
  344. // which requires it to prevent double execution
  345. indexRequest.setPipeline(NOOP_PIPELINE_NAME);
  346. } else {
  347. itemFailureHandler.accept(slot, e);
  348. }
  349. if (counter.decrementAndGet() == 0){
  350. completionHandler.accept(originalThread, null);
  351. }
  352. assert counter.get() >= 0;
  353. });
  354. } catch (Exception e) {
  355. itemFailureHandler.accept(slot, e);
  356. if (counter.decrementAndGet() == 0){
  357. completionHandler.accept(originalThread, null);
  358. }
  359. assert counter.get() >= 0;
  360. }
  361. i++;
  362. }
  363. }
  364. });
  365. }
  366. public IngestStats stats() {
  367. IngestStats.Builder statsBuilder = new IngestStats.Builder();
  368. statsBuilder.addTotalMetrics(totalMetrics);
  369. pipelines.forEach((id, holder) -> {
  370. Pipeline pipeline = holder.pipeline;
  371. CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
  372. statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
  373. List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
  374. getProcessorMetrics(rootProcessor, processorMetrics);
  375. processorMetrics.forEach(t -> {
  376. Processor processor = t.v1();
  377. IngestMetric processorMetric = t.v2();
  378. statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
  379. });
  380. });
  381. return statsBuilder.build();
  382. }
  383. /**
  384. * Adds a listener that gets invoked with the current cluster state before processor factories
  385. * get invoked.
  386. *
  387. * This is useful for components that are used by ingest processors, so that they have the opportunity to update
  388. * before these components get used by the ingest processor factory.
  389. */
  390. public void addIngestClusterStateListener(Consumer<ClusterState> listener) {
  391. ingestClusterStateListeners.add(listener);
  392. }
  393. //package private for testing
  394. static String getProcessorName(Processor processor){
  395. // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
  396. if(processor instanceof ConditionalProcessor){
  397. processor = ((ConditionalProcessor) processor).getInnerProcessor();
  398. }
  399. StringBuilder sb = new StringBuilder(5);
  400. sb.append(processor.getType());
  401. if(processor instanceof PipelineProcessor){
  402. String pipelineName = ((PipelineProcessor) processor).getPipelineName();
  403. sb.append(":");
  404. sb.append(pipelineName);
  405. }
  406. String tag = processor.getTag();
  407. if(tag != null && !tag.isEmpty()){
  408. sb.append(":");
  409. sb.append(tag);
  410. }
  411. return sb.toString();
  412. }
  413. private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline, IntConsumer itemDroppedHandler,
  414. Consumer<Exception> handler) {
  415. if (pipeline.getProcessors().isEmpty()) {
  416. handler.accept(null);
  417. return;
  418. }
  419. long startTimeInNanos = System.nanoTime();
  420. // the pipeline specific stat holder may not exist and that is fine:
  421. // (e.g. the pipeline may have been removed while we're ingesting a document
  422. totalMetrics.preIngest();
  423. String index = indexRequest.index();
  424. String type = indexRequest.type();
  425. String id = indexRequest.id();
  426. String routing = indexRequest.routing();
  427. Long version = indexRequest.version();
  428. VersionType versionType = indexRequest.versionType();
  429. Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
  430. IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
  431. pipeline.execute(ingestDocument, (result, e) -> {
  432. long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
  433. totalMetrics.postIngest(ingestTimeInMillis);
  434. if (e != null) {
  435. totalMetrics.ingestFailed();
  436. handler.accept(e);
  437. } else if (result == null) {
  438. itemDroppedHandler.accept(slot);
  439. handler.accept(null);
  440. } else {
  441. Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
  442. //it's fine to set all metadata fields all the time, as ingest document holds their starting values
  443. //before ingestion, which might also get modified during ingestion.
  444. indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX));
  445. indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE));
  446. indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID));
  447. indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING));
  448. indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue());
  449. if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) {
  450. indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE)));
  451. }
  452. indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
  453. handler.accept(null);
  454. }
  455. });
  456. }
  457. @Override
  458. public void applyClusterState(final ClusterChangedEvent event) {
  459. ClusterState state = event.state();
  460. if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
  461. return;
  462. }
  463. // Publish cluster state to components that are used by processor factories before letting
  464. // processor factories create new processor instances.
  465. // (Note that this needs to be done also in the case when there is no change to ingest metadata, because in the case
  466. // when only the part of the cluster state that a component is interested in, is updated.)
  467. ingestClusterStateListeners.forEach(consumer -> consumer.accept(state));
  468. IngestMetadata newIngestMetadata = state.getMetaData().custom(IngestMetadata.TYPE);
  469. if (newIngestMetadata == null) {
  470. return;
  471. }
  472. try {
  473. innerUpdatePipelines(newIngestMetadata);
  474. } catch (ElasticsearchParseException e) {
  475. logger.warn("failed to update ingest pipelines", e);
  476. }
  477. }
  478. void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
  479. Map<String, PipelineHolder> existingPipelines = this.pipelines;
  480. // Lazy initialize these variables in order to favour the most like scenario that there are no pipeline changes:
  481. Map<String, PipelineHolder> newPipelines = null;
  482. List<ElasticsearchParseException> exceptions = null;
  483. // Iterate over pipeline configurations in ingest metadata and constructs a new pipeline if there is no pipeline
  484. // or the pipeline configuration has been modified
  485. for (PipelineConfiguration newConfiguration : newIngestMetadata.getPipelines().values()) {
  486. PipelineHolder previous = existingPipelines.get(newConfiguration.getId());
  487. if (previous != null && previous.configuration.equals(newConfiguration)) {
  488. continue;
  489. }
  490. if (newPipelines == null) {
  491. newPipelines = new HashMap<>(existingPipelines);
  492. }
  493. try {
  494. Pipeline newPipeline =
  495. Pipeline.create(newConfiguration.getId(), newConfiguration.getConfigAsMap(), processorFactories, scriptService);
  496. newPipelines.put(
  497. newConfiguration.getId(),
  498. new PipelineHolder(newConfiguration, newPipeline)
  499. );
  500. if (previous == null) {
  501. continue;
  502. }
  503. Pipeline oldPipeline = previous.pipeline;
  504. newPipeline.getMetrics().add(oldPipeline.getMetrics());
  505. List<Tuple<Processor, IngestMetric>> oldPerProcessMetrics = new ArrayList<>();
  506. List<Tuple<Processor, IngestMetric>> newPerProcessMetrics = new ArrayList<>();
  507. getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics);
  508. getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics);
  509. //Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since
  510. //the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and
  511. //consistent id's per processor and/or semantic equals for each processor will be needed.
  512. if (newPerProcessMetrics.size() == oldPerProcessMetrics.size()) {
  513. Iterator<Tuple<Processor, IngestMetric>> oldMetricsIterator = oldPerProcessMetrics.iterator();
  514. for (Tuple<Processor, IngestMetric> compositeMetric : newPerProcessMetrics) {
  515. String type = compositeMetric.v1().getType();
  516. IngestMetric metric = compositeMetric.v2();
  517. if (oldMetricsIterator.hasNext()) {
  518. Tuple<Processor, IngestMetric> oldCompositeMetric = oldMetricsIterator.next();
  519. String oldType = oldCompositeMetric.v1().getType();
  520. IngestMetric oldMetric = oldCompositeMetric.v2();
  521. if (type.equals(oldType)) {
  522. metric.add(oldMetric);
  523. }
  524. }
  525. }
  526. }
  527. } catch (ElasticsearchParseException e) {
  528. Pipeline pipeline = substitutePipeline(newConfiguration.getId(), e);
  529. newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline));
  530. if (exceptions == null) {
  531. exceptions = new ArrayList<>();
  532. }
  533. exceptions.add(e);
  534. } catch (Exception e) {
  535. ElasticsearchParseException parseException = new ElasticsearchParseException(
  536. "Error updating pipeline with id [" + newConfiguration.getId() + "]", e);
  537. Pipeline pipeline = substitutePipeline(newConfiguration.getId(), parseException);
  538. newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, pipeline));
  539. if (exceptions == null) {
  540. exceptions = new ArrayList<>();
  541. }
  542. exceptions.add(parseException);
  543. }
  544. }
  545. // Iterate over the current active pipelines and check whether they are missing in the pipeline configuration and
  546. // if so delete the pipeline from new Pipelines map:
  547. for (Map.Entry<String, PipelineHolder> entry : existingPipelines.entrySet()) {
  548. if (newIngestMetadata.getPipelines().get(entry.getKey()) == null) {
  549. if (newPipelines == null) {
  550. newPipelines = new HashMap<>(existingPipelines);
  551. }
  552. newPipelines.remove(entry.getKey());
  553. }
  554. }
  555. if (newPipelines != null) {
  556. // Update the pipelines:
  557. this.pipelines = Map.copyOf(newPipelines);
  558. // Rethrow errors that may have occurred during creating new pipeline instances:
  559. if (exceptions != null) {
  560. ExceptionsHelper.rethrowAndSuppress(exceptions);
  561. }
  562. }
  563. }
  564. /**
  565. * Gets all the Processors of the given type from within a Pipeline.
  566. * @param pipelineId the pipeline to inspect
  567. * @param clazz the Processor class to look for
  568. * @return True if the pipeline contains an instance of the Processor class passed in
  569. */
  570. public<P extends Processor> List<P> getProcessorsInPipeline(String pipelineId, Class<P> clazz) {
  571. Pipeline pipeline = getPipeline(pipelineId);
  572. if (pipeline == null) {
  573. throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
  574. }
  575. List<P> processors = new ArrayList<>();
  576. for (Processor processor: pipeline.flattenAllProcessors()) {
  577. if (clazz.isAssignableFrom(processor.getClass())) {
  578. processors.add(clazz.cast(processor));
  579. }
  580. while (processor instanceof WrappingProcessor) {
  581. WrappingProcessor wrappingProcessor = (WrappingProcessor) processor;
  582. if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) {
  583. processors.add(clazz.cast(wrappingProcessor.getInnerProcessor()));
  584. }
  585. processor = wrappingProcessor.getInnerProcessor();
  586. // break in the case of self referencing processors in the event a processor author creates a
  587. // wrapping processor that has its inner processor refer to itself.
  588. if (wrappingProcessor == processor) {
  589. break;
  590. }
  591. }
  592. }
  593. return processors;
  594. }
  595. private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
  596. String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
  597. String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
  598. String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
  599. Processor failureProcessor = new AbstractProcessor(tag) {
  600. @Override
  601. public IngestDocument execute(IngestDocument ingestDocument) {
  602. throw new IllegalStateException(errorMessage);
  603. }
  604. @Override
  605. public String getType() {
  606. return type;
  607. }
  608. };
  609. String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
  610. return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
  611. }
  612. static class PipelineHolder {
  613. final PipelineConfiguration configuration;
  614. final Pipeline pipeline;
  615. PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
  616. this.configuration = Objects.requireNonNull(configuration);
  617. this.pipeline = Objects.requireNonNull(pipeline);
  618. }
  619. }
  620. }