|
@@ -761,8 +761,13 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
}
|
|
|
Pipeline pipeline = holder.pipeline;
|
|
|
String originalIndex = indexRequest.indices()[0];
|
|
|
+ long startTimeInNanos = System.nanoTime();
|
|
|
+ totalMetrics.preIngest();
|
|
|
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
|
|
|
+ long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
|
|
|
+ totalMetrics.postIngest(ingestTimeInNanos);
|
|
|
if (e != null) {
|
|
|
+ totalMetrics.ingestFailed();
|
|
|
logger.debug(
|
|
|
() -> format(
|
|
|
"failed to execute pipeline [%s] for document [%s/%s]",
|
|
@@ -893,10 +898,6 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- long startTimeInNanos = System.nanoTime();
|
|
|
- // the pipeline specific stat holder may not exist and that is fine:
|
|
|
- // (e.g. the pipeline may have been removed while we're ingesting a document
|
|
|
- totalMetrics.preIngest();
|
|
|
String index = indexRequest.index();
|
|
|
String id = indexRequest.id();
|
|
|
String routing = indexRequest.routing();
|
|
@@ -916,10 +917,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e));
|
|
|
assert false : "A listener was unexpectedly called more than once";
|
|
|
} else {
|
|
|
- long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
|
|
|
- totalMetrics.postIngest(ingestTimeInNanos);
|
|
|
if (e != null) {
|
|
|
- totalMetrics.ingestFailed();
|
|
|
handler.accept(e);
|
|
|
} else if (result == null) {
|
|
|
itemDroppedHandler.accept(slot);
|
|
@@ -951,7 +949,6 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
|
|
// processor creates a source map that is self-referencing.
|
|
|
// In that case, we catch and wrap the exception so we can
|
|
|
// include which pipeline failed.
|
|
|
- totalMetrics.ingestFailed();
|
|
|
handler.accept(
|
|
|
new IllegalArgumentException(
|
|
|
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
|