Jelajahi Sumber

DocumentSizeObserver infrastructure to allow not reporting upon failures (#104859)

We want to report that observation of document parsing has finished only upon a successful indexing.
To achieve this, we need to perform reporting only in one place (not as previously in both IngestService and 'bulk action')

This commit splits the DocumentParsingObserver in two. One for wrapping an XContentParser and returning the observed state - the DocumentSizeObserver and a DocumentSizeReporter to perform an action when parsing has been completed and indexing successful.

To perform reporting in one place we need to pass the state from IngestService to 'bulk action'. The state is currently represented as long - normalisedBytesParsed.

In TransportShardBulkAction we are getting the normalisedBytesParsed information and in the serverless plugin we will check if the value is indicating that parsing already happened in IngestService (value being != -1) we create a DocumentSizeObserver with the fixed normalisedBytesParsed and won't increment it.

When the indexing is completed and successful we report the observed state for an index with DocumentSizeReporter

small nit: by passing the documentParsingObserve via SourceToParse we no longer have to inject it via complex hierarchy for DocumentParser. Hence some constructor changes
Przemyslaw Gomulka 1 tahun lalu
induk
melakukan
11f3c29089
46 mengubah file dengan 453 tambahan dan 305 penghapusan
  1. 1 3
      benchmarks/src/main/java/org/elasticsearch/benchmark/index/mapper/MapperServiceFactory.java
  2. 1 3
      benchmarks/src/main/java/org/elasticsearch/benchmark/search/QueryParserHelperBenchmark.java
  3. 5 0
      docs/changelog/104859.yaml
  4. 36 27
      modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverWithPipelinesIT.java
  5. 37 20
      server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java
  6. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  7. 10 0
      server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java
  8. 65 13
      server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  9. 22 12
      server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  10. 1 3
      server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataVerifier.java
  11. 6 6
      server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java
  12. 3 11
      server/src/main/java/org/elasticsearch/index/IndexModule.java
  13. 2 7
      server/src/main/java/org/elasticsearch/index/IndexService.java
  14. 2 1
      server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java
  15. 4 19
      server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
  16. 4 12
      server/src/main/java/org/elasticsearch/index/mapper/MapperService.java
  17. 9 8
      server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java
  18. 2 1
      server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  19. 2 1
      server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java
  20. 2 8
      server/src/main/java/org/elasticsearch/indices/IndicesService.java
  21. 0 9
      server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java
  22. 11 14
      server/src/main/java/org/elasticsearch/ingest/IngestService.java
  23. 8 8
      server/src/main/java/org/elasticsearch/node/NodeConstruction.java
  24. 47 0
      server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java
  25. 4 6
      server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProviderPlugin.java
  26. 9 21
      server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeObserver.java
  27. 25 0
      server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java
  28. 28 14
      server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java
  29. 2 2
      server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java
  30. 6 13
      server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
  31. 1 3
      server/src/test/java/org/elasticsearch/index/codec/CodecTests.java
  32. 1 2
      server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java
  33. 11 1
      server/src/test/java/org/elasticsearch/index/mapper/DynamicTemplatesTests.java
  34. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/RoutingFieldMapperTests.java
  35. 37 25
      server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
  36. 2 2
      server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java
  37. 4 4
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  38. 1 3
      test/framework/src/main/java/org/elasticsearch/index/MapperTestUtils.java
  39. 3 4
      test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java
  40. 20 5
      test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java
  41. 9 1
      test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
  42. 1 3
      test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java
  43. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java
  44. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java
  45. 1 3
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
  46. 1 3
      x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

+ 1 - 3
benchmarks/src/main/java/org/elasticsearch/benchmark/index/mapper/MapperServiceFactory.java

@@ -26,7 +26,6 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.ProvidedIdFieldMapper;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.indices.IndicesModule;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.script.ScriptContext;
@@ -72,8 +71,7 @@ public class MapperServiceFactory {
                 public <T> T compile(Script script, ScriptContext<T> scriptContext) {
                     throw new UnsupportedOperationException();
                 }
-            },
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            }
         );
 
         try {

+ 1 - 3
benchmarks/src/main/java/org/elasticsearch/benchmark/search/QueryParserHelperBenchmark.java

@@ -40,7 +40,6 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.script.ScriptContext;
@@ -187,8 +186,7 @@ public class QueryParserHelperBenchmark {
                 public <T> T compile(Script script, ScriptContext<T> scriptContext) {
                     throw new UnsupportedOperationException();
                 }
-            },
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            }
         );
 
         try {

+ 5 - 0
docs/changelog/104859.yaml

@@ -0,0 +1,5 @@
+pr: 104859
+summary: ES - document observing with rejections
+area: Infra/Core
+type: enhancement
+issues: []

+ 36 - 27
modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentParsingObserverWithPipelinesIT.java → modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverWithPipelinesIT.java

@@ -24,13 +24,12 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
-public class DocumentParsingObserverWithPipelinesIT extends ESIntegTestCase {
+public class DocumentSizeObserverWithPipelinesIT extends ESIntegTestCase {
 
     private static String TEST_INDEX_NAME = "test-index-name";
     // the assertions are done in plugin which is static and will be created by ES server.
@@ -66,32 +65,55 @@ public class DocumentParsingObserverWithPipelinesIT extends ESIntegTestCase {
                 .source(jsonBuilder().startObject().field("test", "I am sam i am").endObject())
         ).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingObserver
+        // there are more assertions in a TestDocumentSizeObserver
     }
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return List.of(TestDocumentParsingObserverPlugin.class, IngestCommonPlugin.class);
+        return List.of(TestDocumentParsingProviderPlugin.class, IngestCommonPlugin.class);
     }
 
-    public static class TestDocumentParsingObserverPlugin extends Plugin implements DocumentParsingObserverPlugin, IngestPlugin {
+    public static class TestDocumentParsingProviderPlugin extends Plugin implements DocumentParsingProviderPlugin, IngestPlugin {
 
-        private static final TestDocumentParsingObserver DOCUMENT_PARSING_OBSERVER = new TestDocumentParsingObserver();
-
-        public TestDocumentParsingObserverPlugin() {}
+        public TestDocumentParsingProviderPlugin() {}
 
         @Override
-        public Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier() {
+        public DocumentParsingProvider getDocumentParsingSupplier() {
             // returns a static instance, because we want to assert that the wrapping is called only once
-            return () -> DOCUMENT_PARSING_OBSERVER;
+            return new DocumentParsingProvider() {
+                @Override
+                public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
+                    return new TestDocumentSizeObserver(normalisedBytesParsed);
+                }
+
+                @Override
+                public DocumentSizeObserver newDocumentSizeObserver() {
+                    return new TestDocumentSizeObserver(0L);
+                }
+
+                @Override
+                public DocumentSizeReporter getDocumentParsingReporter() {
+                    return new TestDocumentSizeReporter();
+                }
+            };
         }
+    }
 
+    public static class TestDocumentSizeReporter implements DocumentSizeReporter {
+        @Override
+        public void onCompleted(String indexName, long normalizedBytesParsed) {
+            assertThat(indexName, equalTo(TEST_INDEX_NAME));
+            assertThat(normalizedBytesParsed, equalTo(1L));
+        }
     }
 
-    public static class TestDocumentParsingObserver implements DocumentParsingObserver {
+    public static class TestDocumentSizeObserver implements DocumentSizeObserver {
         long mapCounter = 0;
         long wrapperCounter = 0;
-        String indexName;
+
+        public TestDocumentSizeObserver(long mapCounter) {
+            this.mapCounter = mapCounter;
+        }
 
         @Override
         public XContentParser wrapParser(XContentParser xContentParser) {
@@ -108,22 +130,9 @@ public class DocumentParsingObserverWithPipelinesIT extends ESIntegTestCase {
         }
 
         @Override
-        public void setIndexName(String indexName) {
-            this.indexName = indexName;
+        public long normalisedBytesParsed() {
+            return mapCounter;
         }
-
-        @Override
-        public void close() {
-            assertThat(indexName, equalTo(TEST_INDEX_NAME));
-            assertThat(mapCounter, equalTo(1L));
-
-            assertThat(
-                "we only want to use a wrapped counter once, once document is reported it no longer needs to wrap",
-                wrapperCounter,
-                equalTo(1L)
-            );
-        }
-
     }
 
 }

+ 37 - 20
server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentParsingObserverIT.java → server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java

@@ -19,14 +19,13 @@ import org.elasticsearch.xcontent.XContentType;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.xcontent.XContentFactory.cborBuilder;
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
 import static org.hamcrest.Matchers.equalTo;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
-public class DocumentParsingObserverIT extends ESIntegTestCase {
+public class DocumentSizeObserverIT extends ESIntegTestCase {
 
     private static String TEST_INDEX_NAME = "test-index-name";
 
@@ -40,7 +39,7 @@ public class DocumentParsingObserverIT extends ESIntegTestCase {
             new IndexRequest(TEST_INDEX_NAME).id("1").source(jsonBuilder().startObject().field("test", "I am sam i am").endObject())
         ).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingObserver
+        // there are more assertions in a TestDocumentParsingSupplierPlugin
 
         hasWrappedParser = false;
         // the format of the request does not matter
@@ -48,7 +47,7 @@ public class DocumentParsingObserverIT extends ESIntegTestCase {
             new IndexRequest(TEST_INDEX_NAME).id("2").source(cborBuilder().startObject().field("test", "I am sam i am").endObject())
         ).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingObserver
+        // there are more assertions in a TestDocumentParsingSupplierPlugin
 
         hasWrappedParser = false;
         // white spaces does not matter
@@ -60,27 +59,51 @@ public class DocumentParsingObserverIT extends ESIntegTestCase {
             }
             """, XContentType.JSON)).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingObserver
+        // there are more assertions in a TestDocumentParsingSupplierPlugin
     }
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return List.of(TestDocumentParsingObserverPlugin.class);
+        return List.of(TestDocumentParsingProviderPlugin.class);
     }
 
-    public static class TestDocumentParsingObserverPlugin extends Plugin implements DocumentParsingObserverPlugin, IngestPlugin {
+    public static class TestDocumentParsingProviderPlugin extends Plugin implements DocumentParsingProviderPlugin, IngestPlugin {
 
-        public TestDocumentParsingObserverPlugin() {}
+        public TestDocumentParsingProviderPlugin() {}
 
         @Override
-        public Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier() {
-            return () -> new TestDocumentParsingObserver();
+        public DocumentParsingProvider getDocumentParsingSupplier() {
+            return new DocumentParsingProvider() {
+
+                @Override
+                public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
+                    return new TestDocumentSizeObserver();
+                }
+
+                @Override
+                public DocumentSizeObserver newDocumentSizeObserver() {
+                    return new TestDocumentSizeObserver();
+                }
+
+                @Override
+                public DocumentSizeReporter getDocumentParsingReporter() {
+                    return new TestDocumentSizeReporter();
+                }
+            };
         }
     }
 
-    public static class TestDocumentParsingObserver implements DocumentParsingObserver {
+    public static class TestDocumentSizeReporter implements DocumentSizeReporter {
+
+        @Override
+        public void onCompleted(String indexName, long normalizedBytesParsed) {
+            assertThat(indexName, equalTo(TEST_INDEX_NAME));
+            assertThat(normalizedBytesParsed, equalTo(5L));
+        }
+    }
+
+    public static class TestDocumentSizeObserver implements DocumentSizeObserver {
         long counter = 0;
-        String indexName;
 
         @Override
         public XContentParser wrapParser(XContentParser xContentParser) {
@@ -95,14 +118,8 @@ public class DocumentParsingObserverIT extends ESIntegTestCase {
         }
 
         @Override
-        public void setIndexName(String indexName) {
-            this.indexName = indexName;
-        }
-
-        @Override
-        public void close() {
-            assertThat(indexName, equalTo(TEST_INDEX_NAME));
-            assertThat(counter, equalTo(5L));
+        public long normalisedBytesParsed() {
+            return counter;
         }
     }
 }

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -130,6 +130,7 @@ public class TransportVersions {
     public static final TransportVersion FIELD_CAPS_FIELD_HAS_VALUE = def(8_590_00_0);
     public static final TransportVersion ML_INFERENCE_REQUEST_INPUT_TYPE_CLASS_CLUSTER_ADDED = def(8_591_00_0);
     public static final TransportVersion ML_DIMENSIONS_SET_BY_USER_ADDED = def(8_592_00_0);
+    public static final TransportVersion INDEX_REQUEST_NORMALIZED_BYTES_PARSED = def(8_593_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 10 - 0
server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

@@ -18,6 +18,7 @@ import org.elasticsearch.action.support.replication.TransportWriteAction;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 
 import java.util.Arrays;
 import java.util.List;
@@ -62,6 +63,7 @@ class BulkPrimaryExecutionContext {
     private BulkItemResponse executionResult;
     private int updateRetryCounter;
     private long noopMappingUpdateRetryForMappingVersion;
+    private DocumentSizeObserver documentSizeObserver = DocumentSizeObserver.EMPTY_INSTANCE;
 
     BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
         this.request = request;
@@ -367,4 +369,12 @@ class BulkPrimaryExecutionContext {
         }
         return true;
     }
+
+    public void setDocumentSizeObserver(DocumentSizeObserver documentSizeObserver) {
+        this.documentSizeObserver = documentSizeObserver;
+    }
+
+    public DocumentSizeObserver getDocumentSizeObserver() {
+        return documentSizeObserver;
+    }
 }

+ 65 - 13
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -58,6 +58,9 @@ import org.elasticsearch.indices.ExecutorSelector;
 import org.elasticsearch.indices.IndicesService;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.node.NodeClosedException;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeReporter;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
@@ -84,6 +87,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     private final MappingUpdatedAction mappingUpdatedAction;
     private final Consumer<Runnable> postWriteAction;
 
+    private final DocumentParsingProvider documentParsingProvider;
+
     @Inject
     public TransportShardBulkAction(
         Settings settings,
@@ -96,7 +101,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         UpdateHelper updateHelper,
         ActionFilters actionFilters,
         IndexingPressure indexingPressure,
-        SystemIndices systemIndices
+        SystemIndices systemIndices,
+        DocumentParsingProvider documentParsingProvider
     ) {
         super(
             settings,
@@ -117,6 +123,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         this.updateHelper = updateHelper;
         this.mappingUpdatedAction = mappingUpdatedAction;
         this.postWriteAction = WriteAckDelay.create(settings, threadPool);
+        this.documentParsingProvider = documentParsingProvider;
     }
 
     private static final TransportRequestOptions TRANSPORT_REQUEST_OPTIONS = TransportRequestOptions.of(
@@ -160,7 +167,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             public void onTimeout(TimeValue timeout) {
                 mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
             }
-        }), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction);
+        }), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction, documentParsingProvider);
     }
 
     @Override
@@ -195,7 +202,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             threadPool,
             executorName,
             null,
-            null
+            null,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
     }
 
@@ -210,7 +218,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         ThreadPool threadPool,
         String executorName,
         @Nullable PostWriteRefresh postWriteRefresh,
-        @Nullable Consumer<Runnable> postWriteAction
+        @Nullable Consumer<Runnable> postWriteAction,
+        DocumentParsingProvider documentParsingProvider
     ) {
         new ActionRunnable<>(listener) {
 
@@ -229,7 +238,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                         nowInMillisSupplier,
                         mappingUpdater,
                         waitForMappingUpdate,
-                        ActionListener.wrap(v -> executor.execute(this), this::onRejection)
+
+                        ActionListener.wrap(v -> executor.execute(this), this::onRejection),
+                        documentParsingProvider
                     ) == false) {
                         // We are waiting for a mapping update on another thread, that will invoke this action again once its done
                         // so we just break out here.
@@ -263,7 +274,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                                     docWriteRequest.id()
                                 ),
                                 context,
-                                null
+                                null,
+                                documentParsingProvider
                             );
                         }
                         finishRequest();
@@ -304,7 +316,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
         LongSupplier nowInMillisSupplier,
         MappingUpdatePerformer mappingUpdater,
         Consumer<ActionListener<Void>> waitForMappingUpdate,
-        ActionListener<Void> itemDoneListener
+        ActionListener<Void> itemDoneListener,
+        DocumentParsingProvider documentParsingProvider
     ) throws Exception {
         final DocWriteRequest.OpType opType = context.getCurrent().opType();
 
@@ -351,13 +364,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             );
         } else {
             final IndexRequest request = context.getRequestToExecute();
+            DocumentSizeObserver documentSizeObserver = getDocumentSizeObserver(documentParsingProvider, request);
+
+            context.setDocumentSizeObserver(documentSizeObserver);
             final SourceToParse sourceToParse = new SourceToParse(
                 request.id(),
                 request.source(),
                 request.getContentType(),
                 request.routing(),
                 request.getDynamicTemplates(),
-                request.pipelinesHaveRun() == false
+                documentSizeObserver
             );
             result = primary.applyIndexOperationOnPrimary(
                 version,
@@ -368,6 +384,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                 request.getAutoGeneratedTimestamp(),
                 request.isRetry()
             );
+
         }
         if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
 
@@ -390,7 +407,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             } catch (Exception e) {
                 logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
                 assert result.getId() != null;
-                onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
+                onComplete(
+                    exceptionToResult(e, primary, isDelete, version, result.getId()),
+                    context,
+                    updateResult,
+                    documentParsingProvider
+                );
                 return true;
             }
 
@@ -414,7 +436,12 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
 
                 @Override
                 public void onFailure(Exception e) {
-                    onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
+                    onComplete(
+                        exceptionToResult(e, primary, isDelete, version, result.getId()),
+                        context,
+                        updateResult,
+                        documentParsingProvider
+                    );
                     // Requesting mapping update failed, so we don't have to wait for a cluster state update
                     assert context.isInitial();
                     itemDoneListener.onResponse(null);
@@ -422,23 +449,48 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
             });
             return false;
         } else {
-            onComplete(result, context, updateResult);
+            onComplete(result, context, updateResult, documentParsingProvider);
         }
         return true;
     }
 
+    /**
+     * Creates a new document size observerl
+     * @param documentParsingProvider a provider to create a new observer.
+     * @param request an index request to provide information about bytes being already parsed.
+     * @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService).
+     * It would be pre-populated with information about how many bytes were already parsed
+     * or return a new 'empty' DocumentSizeObserver.
+     */
+    private static DocumentSizeObserver getDocumentSizeObserver(DocumentParsingProvider documentParsingProvider, IndexRequest request) {
+        if (request.getNormalisedBytesParsed() != -1) {
+            return documentParsingProvider.newFixedSizeDocumentObserver(request.getNormalisedBytesParsed());
+        }
+        return documentParsingProvider.newDocumentSizeObserver();
+    }
+
     private static Engine.Result exceptionToResult(Exception e, IndexShard primary, boolean isDelete, long version, String id) {
         assert id != null;
         return isDelete ? primary.getFailedDeleteResult(e, version, id) : primary.getFailedIndexResult(e, version, id);
     }
 
-    private static void onComplete(Engine.Result r, BulkPrimaryExecutionContext context, UpdateHelper.Result updateResult) {
+    private static void onComplete(
+        Engine.Result r,
+        BulkPrimaryExecutionContext context,
+        UpdateHelper.Result updateResult,
+        DocumentParsingProvider documentParsingProvider
+    ) {
         context.markOperationAsExecuted(r);
         final DocWriteRequest<?> docWriteRequest = context.getCurrent();
         final DocWriteRequest.OpType opType = docWriteRequest.opType();
         final boolean isUpdate = opType == DocWriteRequest.OpType.UPDATE;
         final BulkItemResponse executionResult = context.getExecutionResult();
         final boolean isFailed = executionResult.isFailed();
+        if (isFailed == false && opType != DocWriteRequest.OpType.DELETE) {
+            DocumentSizeReporter documentSizeReporter = documentParsingProvider.getDocumentParsingReporter();
+            DocumentSizeObserver documentSizeObserver = context.getDocumentSizeObserver();
+            documentSizeReporter.onCompleted(docWriteRequest.index(), documentSizeObserver.normalisedBytesParsed());
+        }
         if (isUpdate
             && isFailed
             && isConflictException(executionResult.getFailure().getCause())
@@ -635,7 +687,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
                     indexRequest.getContentType(),
                     indexRequest.routing(),
                     Map.of(),
-                    false
+                    DocumentSizeObserver.EMPTY_INSTANCE
                 );
                 result = replica.applyIndexOperationOnReplica(
                     primaryResponse.getSeqNo(),

+ 22 - 12
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -39,7 +39,7 @@ import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xcontent.XContentType;
@@ -52,6 +52,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.TransportVersions.INDEX_REQUEST_NORMALIZED_BYTES_PARSED;
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
 import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
@@ -146,7 +147,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
      * rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
      */
     private Object rawTimestamp;
-    private boolean pipelinesHaveRun = false;
+    private long normalisedBytesParsed = -1;
 
     public IndexRequest(StreamInput in) throws IOException {
         this(null, in);
@@ -187,8 +188,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_13_0)) {
             dynamicTemplates = in.readMap(StreamInput::readString);
         }
-        if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
-            pipelinesHaveRun = in.readBoolean();
+        if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
+            && in.getTransportVersion().before(INDEX_REQUEST_NORMALIZED_BYTES_PARSED)) {
+            in.readBoolean();
         }
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             this.listExecutedPipelines = in.readBoolean();
@@ -204,6 +206,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         } else {
             requireDataStream = false;
         }
+        if (in.getTransportVersion().onOrAfter(INDEX_REQUEST_NORMALIZED_BYTES_PARSED)) {
+            normalisedBytesParsed = in.readZLong();
+        }
     }
 
     public IndexRequest() {
@@ -407,8 +412,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         return XContentHelper.convertToMap(source, false, contentType).v2();
     }
 
-    public Map<String, Object> sourceAsMap(DocumentParsingObserver documentParsingObserver) {
-        return XContentHelper.convertToMap(source, false, contentType, documentParsingObserver).v2();
+    public Map<String, Object> sourceAsMap(DocumentSizeObserver documentSizeObserver) {
+        return XContentHelper.convertToMap(source, false, contentType, documentSizeObserver).v2();
     }
 
     /**
@@ -766,8 +771,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
                 );
             }
         }
-        if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
-            out.writeBoolean(pipelinesHaveRun);
+        if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
+            && out.getTransportVersion().before(INDEX_REQUEST_NORMALIZED_BYTES_PARSED)) {
+            out.writeBoolean(normalisedBytesParsed != -1L);
         }
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             out.writeBoolean(listExecutedPipelines);
@@ -775,9 +781,13 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
                 out.writeOptionalCollection(executedPipelines, StreamOutput::writeString);
             }
         }
+
         if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
             out.writeBoolean(requireDataStream);
         }
+        if (out.getTransportVersion().onOrAfter(INDEX_REQUEST_NORMALIZED_BYTES_PARSED)) {
+            out.writeZLong(normalisedBytesParsed);
+        }
     }
 
     @Override
@@ -923,12 +933,12 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         this.rawTimestamp = rawTimestamp;
     }
 
-    public void setPipelinesHaveRun() {
-        pipelinesHaveRun = true;
+    public long getNormalisedBytesParsed() {
+        return normalisedBytesParsed;
     }
 
-    public boolean pipelinesHaveRun() {
-        return pipelinesHaveRun;
+    public void setNormalisedBytesParsed(long normalisedBytesParsed) {
+        this.normalisedBytesParsed = normalisedBytesParsed;
     }
 
     /**

+ 1 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataVerifier.java

@@ -26,7 +26,6 @@ import org.elasticsearch.index.analysis.NamedAnalyzer;
 import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.similarity.SimilarityService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -183,8 +182,7 @@ public class IndexMetadataVerifier {
                     mapperRegistry,
                     () -> null,
                     indexSettings.getMode().idFieldMapperWithoutFieldData(),
-                    scriptService,
-                    () -> DocumentParsingObserver.EMPTY_INSTANCE
+                    scriptService
                 )
             ) {
                 mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_RECOVERY);

+ 6 - 6
server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java

@@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.xcontent.DeprecationHandler;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
@@ -152,14 +152,14 @@ public class XContentHelper {
         BytesReference bytes,
         boolean ordered,
         XContentType xContentType,
-        DocumentParsingObserver documentParsingObserver
+        DocumentSizeObserver documentSizeObserver
     ) {
         return parseToType(
             ordered ? XContentParser::mapOrdered : XContentParser::map,
             bytes,
             xContentType,
             XContentParserConfiguration.EMPTY,
-            documentParsingObserver
+            documentSizeObserver
         );
     }
 
@@ -207,7 +207,7 @@ public class XContentHelper {
         @Nullable XContentType xContentType,
         @Nullable XContentParserConfiguration config
     ) throws ElasticsearchParseException {
-        return parseToType(extractor, bytes, xContentType, config, DocumentParsingObserver.EMPTY_INSTANCE);
+        return parseToType(extractor, bytes, xContentType, config, DocumentSizeObserver.EMPTY_INSTANCE);
     }
 
     public static <T> Tuple<XContentType, T> parseToType(
@@ -215,11 +215,11 @@ public class XContentHelper {
         BytesReference bytes,
         @Nullable XContentType xContentType,
         @Nullable XContentParserConfiguration config,
-        DocumentParsingObserver documentParsingObserver
+        DocumentSizeObserver documentSizeObserver
     ) throws ElasticsearchParseException {
         config = config != null ? config : XContentParserConfiguration.EMPTY;
         try (
-            XContentParser parser = documentParsingObserver.wrapParser(
+            XContentParser parser = documentSizeObserver.wrapParser(
                 xContentType != null ? createParser(config, bytes, xContentType) : createParser(config, bytes)
             )
         ) {

+ 3 - 11
server/src/main/java/org/elasticsearch/index/IndexModule.java

@@ -57,7 +57,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -77,7 +76,6 @@ import java.util.function.BiFunction;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 /**
  * IndexModule represents the central extension point for index level custom implementations like:
@@ -165,7 +163,6 @@ public final class IndexModule {
     private final IndexSettings indexSettings;
     private final AnalysisRegistry analysisRegistry;
     private final EngineFactory engineFactory;
-    private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
     private final SetOnce<DirectoryWrapper> indexDirectoryWrapper = new SetOnce<>();
     private final SetOnce<Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>>> indexReaderWrapper =
         new SetOnce<>();
@@ -189,7 +186,6 @@ public final class IndexModule {
      * @param analysisRegistry   the analysis registry
      * @param engineFactory      the engine factory
      * @param directoryFactories the available store types
-     * @param documentParsingObserverSupplier the document reporter factory
      */
     public IndexModule(
         final IndexSettings indexSettings,
@@ -198,13 +194,11 @@ public final class IndexModule {
         final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
         final BooleanSupplier allowExpensiveQueries,
         final IndexNameExpressionResolver expressionResolver,
-        final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
-        final Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
     ) {
         this.indexSettings = indexSettings;
         this.analysisRegistry = analysisRegistry;
         this.engineFactory = Objects.requireNonNull(engineFactory);
-        this.documentParsingObserverSupplier = documentParsingObserverSupplier;
         this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
         this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
         this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
@@ -541,8 +535,7 @@ public final class IndexModule {
                 recoveryStateFactory,
                 indexFoldersDeletionListener,
                 snapshotCommitSupplier,
-                indexCommitListener.get(),
-                documentParsingObserverSupplier
+                indexCommitListener.get()
             );
             success = true;
             return indexService;
@@ -652,8 +645,7 @@ public final class IndexModule {
                 throw new UnsupportedOperationException("no index query shard context available");
             },
             indexSettings.getMode().idFieldMapperWithoutFieldData(),
-            scriptService,
-            documentParsingObserverSupplier
+            scriptService
         );
     }
 

+ 2 - 7
server/src/main/java/org/elasticsearch/index/IndexService.java

@@ -82,7 +82,6 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService;
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -159,7 +158,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
     private final IndexNameExpressionResolver expressionResolver;
     private final Supplier<Sort> indexSortSupplier;
     private final ValuesSourceRegistry valuesSourceRegistry;
-    private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
 
     @SuppressWarnings("this-escape")
     public IndexService(
@@ -193,11 +191,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
         IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
         IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener,
         IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
-        Engine.IndexCommitListener indexCommitListener,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        Engine.IndexCommitListener indexCommitListener
     ) {
         super(indexSettings);
-        this.documentParsingObserverSupplier = documentParsingObserverSupplier;
         assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
             : "IndexCreationContext.RELOAD_ANALYZERS should only be used when reloading analysers";
         this.allowExpensiveQueries = allowExpensiveQueries;
@@ -222,8 +218,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
                 // we parse all percolator queries as they would be parsed on shard 0
                 () -> newSearchExecutionContext(0, 0, null, System::currentTimeMillis, null, emptyMap()),
                 idFieldMapper,
-                scriptService,
-                documentParsingObserverSupplier
+                scriptService
             );
             this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService);
             if (indexSettings.getIndexSortConfig().hasIndexSort()) {

+ 2 - 1
server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

@@ -59,6 +59,7 @@ import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.mapper.VersionFieldMapper;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -256,7 +257,7 @@ final class TranslogDirectoryReader extends DirectoryReader {
                     XContentHelper.xContentType(operation.source()),
                     operation.routing(),
                     Map.of(),
-                    false
+                    DocumentSizeObserver.EMPTY_INSTANCE
                 ),
                 mappingLookup
             );

+ 4 - 19
server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

@@ -21,7 +21,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataCache;
 import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.search.lookup.Source;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -40,7 +40,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 import static org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper.MAX_DIMS_COUNT;
 import static org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper.MIN_DIMS_FOR_DYNAMIC_FLOAT_MAPPING;
@@ -53,17 +52,11 @@ public final class DocumentParser {
     public static final IndexVersion DYNAMICALLY_MAP_DENSE_VECTORS_INDEX_VERSION = IndexVersions.FIRST_DETACHED_INDEX_VERSION;
 
     private final XContentParserConfiguration parserConfiguration;
-    private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
     private final MappingParserContext mappingParserContext;
 
-    DocumentParser(
-        XContentParserConfiguration parserConfiguration,
-        MappingParserContext mappingParserContext,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
-    ) {
+    DocumentParser(XContentParserConfiguration parserConfiguration, MappingParserContext mappingParserContext) {
         this.mappingParserContext = mappingParserContext;
         this.parserConfiguration = parserConfiguration;
-        this.documentParsingObserverSupplier = documentParsingObserverSupplier;
     }
 
     /**
@@ -81,12 +74,9 @@ public final class DocumentParser {
         final RootDocumentParserContext context;
         final XContentType xContentType = source.getXContentType();
 
-        // only observe a document if it was not already reported (done in IngestService)
-        DocumentParsingObserver documentParsingObserver = source.toBeReported()
-            ? documentParsingObserverSupplier.get()
-            : DocumentParsingObserver.EMPTY_INSTANCE;
+        DocumentSizeObserver documentSizeObserver = source.getDocumentSizeObserver();
         try (
-            XContentParser parser = documentParsingObserver.wrapParser(
+            XContentParser parser = documentSizeObserver.wrapParser(
                 XContentHelper.createParser(parserConfiguration, source.source(), xContentType)
             )
         ) {
@@ -105,11 +95,6 @@ public final class DocumentParser {
 
         Mapping dynamicUpdate = createDynamicUpdate(context);
 
-        // if a mappingUpdate is required, the parsing will be triggered again
-        if (dynamicUpdate == null) {
-            documentParsingObserver.setIndexName(mappingParserContext.getIndexSettings().getIndex().getName());
-            documentParsingObserver.close();
-        }
         return new ParsedDocument(
             context.version(),
             context.seqID(),

+ 4 - 12
server/src/main/java/org/elasticsearch/index/mapper/MapperService.java

@@ -28,7 +28,6 @@ import org.elasticsearch.index.analysis.NamedAnalyzer;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.indices.IndicesModule;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.ToXContent;
@@ -166,8 +165,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
         MapperRegistry mapperRegistry,
         Supplier<SearchExecutionContext> searchExecutionContextSupplier,
         IdFieldMapper idFieldMapper,
-        ScriptCompiler scriptCompiler,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        ScriptCompiler scriptCompiler
     ) {
         this(
             () -> clusterService.state().getMinTransportVersion(),
@@ -178,8 +176,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
             mapperRegistry,
             searchExecutionContextSupplier,
             idFieldMapper,
-            scriptCompiler,
-            documentParsingObserverSupplier
+            scriptCompiler
         );
     }
 
@@ -193,8 +190,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
         MapperRegistry mapperRegistry,
         Supplier<SearchExecutionContext> searchExecutionContextSupplier,
         IdFieldMapper idFieldMapper,
-        ScriptCompiler scriptCompiler,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        ScriptCompiler scriptCompiler
     ) {
         super(indexSettings);
         this.indexVersionCreated = indexSettings.getIndexVersionCreated();
@@ -212,11 +208,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
             indexSettings,
             idFieldMapper
         );
-        this.documentParser = new DocumentParser(
-            parserConfiguration,
-            this.mappingParserContextSupplier.get(),
-            documentParsingObserverSupplier
-        );
+        this.documentParser = new DocumentParser(parserConfiguration, this.mappingParserContextSupplier.get());
         Map<String, MetadataFieldMapper.TypeParser> metadataMapperParsers = mapperRegistry.getMetadataMapperParsers(
             indexSettings.getIndexVersionCreated()
         );

+ 9 - 8
server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java

@@ -11,6 +11,7 @@ package org.elasticsearch.index.mapper;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.util.Map;
@@ -27,7 +28,7 @@ public class SourceToParse {
     private final XContentType xContentType;
 
     private final Map<String, String> dynamicTemplates;
-    private boolean toBeReported;
+    private final DocumentSizeObserver documentSizeObserver;
 
     public SourceToParse(
         @Nullable String id,
@@ -35,7 +36,7 @@ public class SourceToParse {
         XContentType xContentType,
         @Nullable String routing,
         Map<String, String> dynamicTemplates,
-        boolean toBeReported
+        DocumentSizeObserver documentSizeObserver
     ) {
         this.id = id;
         // we always convert back to byte array, since we store it and Field only supports bytes..
@@ -44,15 +45,11 @@ public class SourceToParse {
         this.xContentType = Objects.requireNonNull(xContentType);
         this.routing = routing;
         this.dynamicTemplates = Objects.requireNonNull(dynamicTemplates);
-        this.toBeReported = toBeReported;
+        this.documentSizeObserver = documentSizeObserver;
     }
 
     public SourceToParse(String id, BytesReference source, XContentType xContentType) {
-        this(id, source, xContentType, null, Map.of(), false);
-    }
-
-    public boolean toBeReported() {
-        return toBeReported;
+        this(id, source, xContentType, null, Map.of(), DocumentSizeObserver.EMPTY_INSTANCE);
     }
 
     public BytesReference source() {
@@ -88,4 +85,8 @@ public class SourceToParse {
     public XContentType getXContentType() {
         return this.xContentType;
     }
+
+    public DocumentSizeObserver getDocumentSizeObserver() {
+        return documentSizeObserver;
+    }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -144,6 +144,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.indices.recovery.RecoveryTarget;
 import org.elasticsearch.plugins.IndexStorePlugin;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.rest.RestStatus;
@@ -1956,7 +1957,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
                         XContentHelper.xContentType(index.source()),
                         index.routing(),
                         Map.of(),
-                        false
+                        DocumentSizeObserver.EMPTY_INSTANCE
                     )
                 );
             }

+ 2 - 1
server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java

@@ -41,6 +41,7 @@ import org.elasticsearch.index.mapper.SourceValueFetcher;
 import org.elasticsearch.index.mapper.StringFieldType;
 import org.elasticsearch.index.mapper.TextSearchInfo;
 import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.search.lookup.Source;
 import org.elasticsearch.xcontent.XContentType;
 
@@ -310,7 +311,7 @@ public class TermVectorsService {
             request.xContentType(),
             request.routing(),
             Map.of(),
-            false
+            DocumentSizeObserver.EMPTY_INSTANCE
         );
         DocumentParser documentParser = indexShard.mapperService().documentParser();
         MappingLookup mappingLookup = indexShard.mapperService().mappingLookup();

+ 2 - 8
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -129,7 +129,6 @@ import org.elasticsearch.indices.store.CompositeIndexFoldersDeletionListener;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.plugins.IndexStorePlugin;
 import org.elasticsearch.plugins.PluginsService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@@ -169,7 +168,6 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
@@ -234,7 +232,6 @@ public class IndicesService extends AbstractLifecycleComponent
     private final OldShardsStats oldShardsStats = new OldShardsStats();
     private final MapperRegistry mapperRegistry;
     private final NamedWriteableRegistry namedWriteableRegistry;
-    private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
     private final Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers;
     private final IndexingMemoryController indexingMemoryController;
     private final TimeValue cleanInterval;
@@ -285,7 +282,6 @@ public class IndicesService extends AbstractLifecycleComponent
         this.indicesQueryCache = new IndicesQueryCache(settings);
         this.mapperRegistry = builder.mapperRegistry;
         this.namedWriteableRegistry = builder.namedWriteableRegistry;
-        this.documentParsingObserverSupplier = builder.documentParsingObserverSupplier;
         indexingMemoryController = new IndexingMemoryController(
             settings,
             threadPool,
@@ -740,8 +736,7 @@ public class IndicesService extends AbstractLifecycleComponent
             directoryFactories,
             () -> allowExpensiveQueries,
             indexNameExpressionResolver,
-            recoveryStateFactories,
-            documentParsingObserverSupplier
+            recoveryStateFactories
         );
         for (IndexingOperationListener operationListener : indexingOperationListeners) {
             indexModule.addIndexOperationListener(operationListener);
@@ -817,8 +812,7 @@ public class IndicesService extends AbstractLifecycleComponent
             directoryFactories,
             () -> allowExpensiveQueries,
             indexNameExpressionResolver,
-            recoveryStateFactories,
-            documentParsingObserverSupplier
+            recoveryStateFactories
         );
         pluginsService.forEach(p -> p.onIndexModule(indexModule));
         return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService);

+ 0 - 9
server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java

@@ -29,7 +29,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.plugins.EnginePlugin;
 import org.elasticsearch.plugins.IndexStorePlugin;
 import org.elasticsearch.plugins.PluginsService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.search.internal.ShardSearchRequest;
@@ -43,7 +42,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 public class IndicesServiceBuilder {
@@ -73,7 +71,6 @@ public class IndicesServiceBuilder {
     Map<String, IndexStorePlugin.SnapshotCommitSupplier> snapshotCommitSuppliers = Map.of();
     @Nullable
     CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
-    Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
 
     public IndicesServiceBuilder settings(Settings settings) {
         this.settings = settings;
@@ -172,11 +169,6 @@ public class IndicesServiceBuilder {
         return this;
     }
 
-    public IndicesServiceBuilder documentParsingObserverSupplier(Supplier<DocumentParsingObserver> documentParsingObserverSupplier) {
-        this.documentParsingObserverSupplier = documentParsingObserverSupplier;
-        return this;
-    }
-
     public IndicesService build() {
         Objects.requireNonNull(settings);
         Objects.requireNonNull(pluginsService);
@@ -200,7 +192,6 @@ public class IndicesServiceBuilder {
         Objects.requireNonNull(recoveryStateFactories);
         Objects.requireNonNull(indexFoldersDeletionListeners);
         Objects.requireNonNull(snapshotCommitSuppliers);
-        Objects.requireNonNull(documentParsingObserverSupplier);
 
         // collect engine factory providers from plugins
         engineFactoryProviders = pluginsService.filterPlugins(EnginePlugin.class)

+ 11 - 14
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -60,7 +60,8 @@ import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.node.ReportingService;
 import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.threadpool.Scheduler;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -87,7 +88,6 @@ import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.IntConsumer;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.core.Strings.format;
@@ -106,7 +106,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
     private final ClusterService clusterService;
     private final ScriptService scriptService;
-    private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
+    private final DocumentParsingProvider documentParsingProvider;
     private final Map<String, Processor.Factory> processorFactories;
     // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
     // We know of all the processor factories when a node with all its plugin have been initialized. Also some
@@ -183,11 +183,11 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         List<IngestPlugin> ingestPlugins,
         Client client,
         MatcherWatchdog matcherWatchdog,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        DocumentParsingProvider documentParsingProvider
     ) {
         this.clusterService = clusterService;
         this.scriptService = scriptService;
-        this.documentParsingObserverSupplier = documentParsingObserverSupplier;
+        this.documentParsingProvider = documentParsingProvider;
         this.processorFactories = processorFactories(
             ingestPlugins,
             new Processor.Parameters(
@@ -215,7 +215,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     IngestService(IngestService ingestService) {
         this.clusterService = ingestService.clusterService;
         this.scriptService = ingestService.scriptService;
-        this.documentParsingObserverSupplier = ingestService.documentParsingObserverSupplier;
+        this.documentParsingProvider = ingestService.documentParsingProvider;
         this.processorFactories = ingestService.processorFactories;
         this.threadPool = ingestService.threadPool;
         this.taskQueue = ingestService.taskQueue;
@@ -740,8 +740,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         totalMetrics.preIngest();
                         final int slot = i;
                         final Releasable ref = refs.acquire();
-                        DocumentParsingObserver documentParsingObserver = documentParsingObserverSupplier.get();
-                        final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver);
+                        final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
+                        final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentSizeObserver);
                         final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone();
                         // the document listener gives us three-way logic: a document can fail processing (1), or it can
                         // be successfully processed. a successfully processed document can be kept (2) or dropped (3).
@@ -779,11 +779,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         );
 
                         executePipelines(pipelines, indexRequest, ingestDocument, shouldStoreFailure, documentListener);
-                        indexRequest.setPipelinesHaveRun();
-
+                        indexRequest.setNormalisedBytesParsed(documentSizeObserver.normalisedBytesParsed());
                         assert actionRequest.index() != null;
-                        documentParsingObserver.setIndexName(actionRequest.index());
-                        documentParsingObserver.close();
 
                         i++;
                     }
@@ -1080,14 +1077,14 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     /**
      * Builds a new ingest document from the passed-in index request.
      */
-    private static IngestDocument newIngestDocument(final IndexRequest request, DocumentParsingObserver documentParsingObserver) {
+    private static IngestDocument newIngestDocument(final IndexRequest request, DocumentSizeObserver documentSizeObserver) {
         return new IngestDocument(
             request.index(),
             request.id(),
             request.version(),
             request.routing(),
             request.versionType(),
-            request.sourceAsMap(documentParsingObserver)
+            request.sourceAsMap(documentSizeObserver)
         );
     }
 

+ 8 - 8
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -162,8 +162,8 @@ import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.plugins.ShutdownAwarePlugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.plugins.TelemetryPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
-import org.elasticsearch.plugins.internal.DocumentParsingObserverPlugin;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
+import org.elasticsearch.plugins.internal.DocumentParsingProviderPlugin;
 import org.elasticsearch.plugins.internal.ReloadAwarePlugin;
 import org.elasticsearch.plugins.internal.RestExtension;
 import org.elasticsearch.plugins.internal.SettingsExtension;
@@ -617,7 +617,8 @@ class NodeConstruction {
         ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
         clusterService.addStateApplier(scriptService);
 
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier = getDocumentParsingObserverSupplier();
+        DocumentParsingProvider documentParsingProvider = getDocumentParsingSupplier();
+        modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
 
         final IngestService ingestService = new IngestService(
             clusterService,
@@ -628,7 +629,7 @@ class NodeConstruction {
             pluginsService.filterPlugins(IngestPlugin.class).toList(),
             client,
             IngestService.createGrokThreadWatchdog(environment, threadPool),
-            documentParsingObserverSupplier
+            documentParsingProvider
         );
 
         SystemIndices systemIndices = createSystemIndices(settings);
@@ -723,7 +724,6 @@ class NodeConstruction {
             .metaStateService(metaStateService)
             .valuesSourceRegistry(searchModule.getValuesSourceRegistry())
             .requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
-            .documentParsingObserverSupplier(documentParsingObserverSupplier)
             .build();
 
         final var parameters = new IndexSettingProvider.Parameters(indicesService::createIndexMapperServiceForValidation);
@@ -1298,9 +1298,9 @@ class NodeConstruction {
         logger.info("initialized");
     }
 
-    private Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier() {
-        return getSinglePlugin(DocumentParsingObserverPlugin.class).map(DocumentParsingObserverPlugin::getDocumentParsingObserverSupplier)
-            .orElse(() -> DocumentParsingObserver.EMPTY_INSTANCE);
+    private DocumentParsingProvider getDocumentParsingSupplier() {
+        return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingSupplier)
+            .orElse(DocumentParsingProvider.EMPTY_INSTANCE);
     }
 
     /**

+ 47 - 0
server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.plugins.internal;
+
+/**
+ * An interface to provide instances of document parsing observer and reporter
+ */
+public interface DocumentParsingProvider {
+    DocumentParsingProvider EMPTY_INSTANCE = new DocumentParsingProvider() {
+        @Override
+        public DocumentSizeObserver newDocumentSizeObserver() {
+            return DocumentSizeObserver.EMPTY_INSTANCE;
+        }
+
+        @Override
+        public DocumentSizeReporter getDocumentParsingReporter() {
+            return DocumentSizeReporter.EMPTY_INSTANCE;
+        }
+
+        @Override
+        public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
+            return DocumentSizeObserver.EMPTY_INSTANCE;
+        }
+    };
+
+    /**
+     * @return a new 'empty' observer to use when observing parsing
+     */
+    DocumentSizeObserver newDocumentSizeObserver();
+
+    /**
+     * @return an observer with a previously observed value (fixed to this value, not continuing)
+     */
+    DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed);
+
+    /**
+     * @return an instance of a reporter to use when parsing has been completed and indexing successful
+     */
+    DocumentSizeReporter getDocumentParsingReporter();
+
+}

+ 4 - 6
server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingObserverPlugin.java → server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProviderPlugin.java

@@ -8,15 +8,13 @@
 
 package org.elasticsearch.plugins.internal;
 
-import java.util.function.Supplier;
-
 /**
- * An internal plugin that will return a supplier of DocumentParsingObserver.
+ * An internal plugin that will return a supplier of DocumentParsingSupplier.
  */
-public interface DocumentParsingObserverPlugin {
+public interface DocumentParsingProviderPlugin {
 
     /**
-     * @return a supplier of DocumentParsingObserver to allow observing parsing events
+     * @return a DocumentParsingSupplier to create instances of observer and reporter of parsing events
      */
-    Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier();
+    DocumentParsingProvider getDocumentParsingSupplier();
 }

+ 9 - 21
server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingObserver.java → server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeObserver.java

@@ -10,35 +10,28 @@ package org.elasticsearch.plugins.internal;
 
 import org.elasticsearch.xcontent.XContentParser;
 
-import java.io.Closeable;
-
 /**
  * An interface to allow wrapping an XContentParser and observe the events emitted while parsing
- * A default implementation returns a noop DocumentParsingObserver - does not wrap a XContentParser and
- * does not do anything upon finishing parsing.
+ * A default implementation returns a noop DocumentSizeObserver
  */
-public interface DocumentParsingObserver extends Closeable {
+public interface DocumentSizeObserver {
     /**
      * a default noop implementation
      */
-    DocumentParsingObserver EMPTY_INSTANCE = new DocumentParsingObserver() {
+    DocumentSizeObserver EMPTY_INSTANCE = new DocumentSizeObserver() {
         @Override
         public XContentParser wrapParser(XContentParser xContentParser) {
             return xContentParser;
         }
 
         @Override
-        public void setIndexName(String indexName) {}
-
-        @Override
-        public void close() {}
+        public long normalisedBytesParsed() {
+            return 0;
+        }
     };
 
     /**
      * Decorates a provided xContentParser with additional logic (gather some state).
-     * The Decorator parser should use a state from DocumentParsingObserver
-     * in order to perform an action upon finished parsing which will be aware of the state
-     * gathered during parsing
      *
      * @param xContentParser to be decorated
      * @return a decorator xContentParser
@@ -46,13 +39,8 @@ public interface DocumentParsingObserver extends Closeable {
     XContentParser wrapParser(XContentParser xContentParser);
 
     /**
-     * Sets an indexName associated with parsed document.
-     * @param indexName an index name that is associated with the parsed document
-     */
-    void setIndexName(String indexName);
-
-    /**
-     * An action to be performed upon finished parsing.
+     * Returns the state gathered during parsing
+     * @return a number representing a state parsed
      */
-    void close();
+    long normalisedBytesParsed();
 }

+ 25 - 0
server/src/main/java/org/elasticsearch/plugins/internal/DocumentSizeReporter.java

@@ -0,0 +1,25 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.plugins.internal;
+
+/**
+ * An interface to allow performing an action when parsing has been completed and successful
+ */
+public interface DocumentSizeReporter {
+    /**
+     * a default noop implementation
+     */
+    DocumentSizeReporter EMPTY_INSTANCE = (indexName, normalizedBytesParsed) -> {};
+
+    /**
+     * An action to be performed upon finished parsing.
+     */
+    void onCompleted(String indexName, long normalizedBytesParsed);
+
+}

+ 28 - 14
server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

@@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTestCase;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -120,7 +121,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -151,7 +153,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new ThrowingMappingUpdatePerformer(new RuntimeException("fail")),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -282,7 +285,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             assertNotNull(update);
             updateCalled.incrementAndGet();
             listener.onResponse(null);
-        }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER);
+        }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER, DocumentParsingProvider.EMPTY_INSTANCE);
         assertTrue(context.isInitial());
         assertTrue(context.hasMoreOperationsToExecute());
         assertThat(context.getUpdateRetryCounter(), equalTo(0));
@@ -302,7 +305,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             (update, shardId, listener) -> fail("should not have had to update the mappings"),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
 
         // Verify that the shard "executed" the operation only once (1 for previous invocations plus
@@ -350,7 +354,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                 public void onFailure(final Exception e) {
                     assertEquals(err, e);
                 }
-            }, latch)
+            }, latch),
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         latch.await();
         assertFalse(context.hasMoreOperationsToExecute());
@@ -394,7 +399,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -441,7 +447,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -504,7 +511,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
 
         assertFalse(context.hasMoreOperationsToExecute());
@@ -559,7 +567,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -620,7 +629,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                 threadPool::absoluteTimeInMillis,
                 new NoopMappingUpdatePerformer(),
                 listener -> listener.onResponse(null),
-                ASSERTING_DONE_LISTENER
+                ASSERTING_DONE_LISTENER,
+                DocumentParsingProvider.EMPTY_INSTANCE
             );
         }
         assertFalse(context.hasMoreOperationsToExecute());
@@ -678,7 +688,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -734,7 +745,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> listener.onResponse(null),
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -771,7 +783,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
             threadPool::absoluteTimeInMillis,
             new NoopMappingUpdatePerformer(),
             listener -> {},
-            ASSERTING_DONE_LISTENER
+            ASSERTING_DONE_LISTENER,
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         assertFalse(context.hasMoreOperationsToExecute());
 
@@ -810,7 +823,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
                     threadPool::absoluteTimeInMillis,
                     new NoopMappingUpdatePerformer(),
                     listener -> {},
-                    ASSERTING_DONE_LISTENER
+                    ASSERTING_DONE_LISTENER,
+                    DocumentParsingProvider.EMPTY_INSTANCE
                 );
             }
 

+ 2 - 2
server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java

@@ -28,7 +28,7 @@ import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.ProcessorInfo;
 import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.reservedstate.TransformState;
 import org.elasticsearch.reservedstate.service.FileSettingsService;
 import org.elasticsearch.reservedstate.service.ReservedClusterStateService;
@@ -90,7 +90,7 @@ public class ReservedPipelineActionTests extends ESTestCase {
             Collections.singletonList(DUMMY_PLUGIN),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
         assertTrue(factories.containsKey("set"));

+ 6 - 13
server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

@@ -85,7 +85,6 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedInd
 import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.recovery.RecoveryState;
 import org.elasticsearch.plugins.IndexStorePlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.internal.ReaderContext;
 import org.elasticsearch.test.ClusterServiceUtils;
@@ -233,8 +232,7 @@ public class IndexModuleTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             indexNameExpressionResolver,
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
         module.setReaderWrapper(s -> new Wrapper());
 
@@ -259,8 +257,7 @@ public class IndexModuleTests extends ESTestCase {
             indexStoreFactories,
             () -> true,
             indexNameExpressionResolver,
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
 
         final IndexService indexService = newIndexService(module);
@@ -283,8 +280,7 @@ public class IndexModuleTests extends ESTestCase {
             Map.of(),
             () -> true,
             indexNameExpressionResolver,
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
 
         module.setDirectoryWrapper(new TestDirectoryWrapper());
@@ -635,8 +631,7 @@ public class IndexModuleTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             indexNameExpressionResolver,
-            recoveryStateFactories,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            recoveryStateFactories
         );
 
         final IndexService indexService = newIndexService(module);
@@ -656,8 +651,7 @@ public class IndexModuleTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             indexNameExpressionResolver,
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
 
         final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong();
@@ -757,8 +751,7 @@ public class IndexModuleTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             indexNameExpressionResolver,
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
     }
 

+ 1 - 3
server/src/test/java/org/elasticsearch/index/codec/CodecTests.java

@@ -28,7 +28,6 @@ import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.plugins.MapperPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
@@ -95,8 +94,7 @@ public class CodecTests extends ESTestCase {
             mapperRegistry,
             () -> null,
             settings.getMode().idFieldMapperWithoutFieldData(),
-            ScriptCompiler.NONE,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            ScriptCompiler.NONE
         );
         return new CodecService(service, BigArrays.NON_RECYCLING_INSTANCE);
     }

+ 1 - 2
server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java

@@ -21,7 +21,6 @@ import org.elasticsearch.index.analysis.AnalyzerScope;
 import org.elasticsearch.index.analysis.IndexAnalyzers;
 import org.elasticsearch.index.analysis.NamedAnalyzer;
 import org.elasticsearch.index.mapper.MapperService.MergeReason;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 
@@ -69,7 +68,7 @@ public class DocumentMapperTests extends MapperServiceTestCase {
         assertThat(stage1.mappers().getMapper("age"), nullValue());
         assertThat(stage1.mappers().getMapper("obj1.prop1"), nullValue());
         // but merged should
-        DocumentParser documentParser = new DocumentParser(null, null, () -> DocumentParsingObserver.EMPTY_INSTANCE);
+        DocumentParser documentParser = new DocumentParser(null, null);
         DocumentMapper mergedMapper = new DocumentMapper(documentParser, merged, merged.toCompressedXContent(), IndexVersion.current());
         assertThat(mergedMapper.mappers().getMapper("age"), notNullValue());
         assertThat(mergedMapper.mappers().getMapper("obj1.prop1"), notNullValue());

+ 11 - 1
server/src/test/java/org/elasticsearch/index/mapper/DynamicTemplatesTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.test.index.IndexVersionUtils;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -732,7 +733,16 @@ public class DynamicTemplatesTests extends MapperServiceTestCase {
             {"foo": "41.12,-71.34", "bar": "41.12,-71.34"}
             """;
         ParsedDocument doc = mapperService.documentMapper()
-            .parse(new SourceToParse("1", new BytesArray(json), XContentType.JSON, null, Map.of("foo", "geo_point"), false));
+            .parse(
+                new SourceToParse(
+                    "1",
+                    new BytesArray(json),
+                    XContentType.JSON,
+                    null,
+                    Map.of("foo", "geo_point"),
+                    DocumentSizeObserver.EMPTY_INSTANCE
+                )
+            );
         assertThat(doc.rootDoc().getFields("foo"), hasSize(2));
         assertThat(doc.rootDoc().getFields("bar"), hasSize(1));
     }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/RoutingFieldMapperTests.java

@@ -12,6 +12,7 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.search.IndexSearcher;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.search.lookup.Source;
 import org.elasticsearch.xcontent.XContentFactory;
@@ -54,7 +55,7 @@ public class RoutingFieldMapperTests extends MetadataMapperTestCase {
                 XContentType.JSON,
                 "routing_value",
                 Map.of(),
-                false
+                DocumentSizeObserver.EMPTY_INSTANCE
             )
         );
 

+ 37 - 25
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -54,7 +54,9 @@ import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeReporter;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptModule;
@@ -93,7 +95,6 @@ import java.util.function.Consumer;
 import java.util.function.IntConsumer;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful;
@@ -155,7 +156,7 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
         assertTrue(factories.containsKey("foo"));
@@ -175,7 +176,7 @@ public class IngestServiceTests extends ESTestCase {
                 List.of(DUMMY_PLUGIN, DUMMY_PLUGIN),
                 client,
                 null,
-                () -> DocumentParsingObserver.EMPTY_INSTANCE
+                DocumentParsingProvider.EMPTY_INSTANCE
             )
         );
         assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
@@ -192,7 +193,7 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
             .source(Map.of())
@@ -1180,33 +1181,44 @@ public class IngestServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
     }
 
-    public void testExecuteBulkRequestCallsDocumentParsingObserver() {
+    public void testExecuteBulkRequestCallsDocumentSizeObserver() {
         /*
-         * This test makes sure that for both insert and upsert requests, when we call executeBulkRequest DocumentParsingObserver is
+         * This test makes sure that for both insert and upsert requests, when we call executeBulkRequest DocumentSizeObserver is
          * called using a non-null index name.
          */
-        AtomicInteger setNameCalledCount = new AtomicInteger(0);
-        AtomicInteger closeCalled = new AtomicInteger(0);
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier = () -> new DocumentParsingObserver() {
+        AtomicInteger wrappedObserverWasUsed = new AtomicInteger(0);
+        AtomicInteger parsedValueWasUsed = new AtomicInteger(0);
+        DocumentParsingProvider documentParsingProvider = new DocumentParsingProvider() {
             @Override
-            public XContentParser wrapParser(XContentParser xContentParser) {
-                return xContentParser;
+            public DocumentSizeObserver newDocumentSizeObserver() {
+                return new DocumentSizeObserver() {
+                    @Override
+                    public XContentParser wrapParser(XContentParser xContentParser) {
+                        wrappedObserverWasUsed.incrementAndGet();
+                        return xContentParser;
+                    }
+
+                    @Override
+                    public long normalisedBytesParsed() {
+                        parsedValueWasUsed.incrementAndGet();
+                        return 0;
+                    }
+                };
             }
 
             @Override
-            public void setIndexName(String indexName) {
-                assertNotNull(indexName);
-                setNameCalledCount.incrementAndGet();
+            public DocumentSizeReporter getDocumentParsingReporter() {
+                return null;
             }
 
             @Override
-            public void close() {
-                closeCalled.incrementAndGet();
+            public DocumentSizeObserver newFixedSizeDocumentObserver(long normalisedBytesParsed) {
+                return null;
             }
         };
         IngestService ingestService = createWithProcessors(
             Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()),
-            documentParsingObserverSupplier
+            documentParsingProvider
         );
 
         PutPipelineRequest putRequest = new PutPipelineRequest(
@@ -1239,8 +1251,8 @@ public class IngestServiceTests extends ESTestCase {
             completionHandler,
             Names.WRITE
         );
-        assertThat(setNameCalledCount.get(), equalTo(2));
-        assertThat(closeCalled.get(), equalTo(2));
+        assertThat(wrappedObserverWasUsed.get(), equalTo(2));
+        assertThat(parsedValueWasUsed.get(), equalTo(2));
     }
 
     public void testExecuteSuccess() {
@@ -2292,7 +2304,7 @@ public class IngestServiceTests extends ESTestCase {
             List.of(testPlugin),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         ingestService.addIngestClusterStateListener(ingestClusterStateListener);
 
@@ -2647,7 +2659,7 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
 
@@ -2921,12 +2933,12 @@ public class IngestServiceTests extends ESTestCase {
     }
 
     private static IngestService createWithProcessors(Map<String, Processor.Factory> processors) {
-        return createWithProcessors(processors, () -> DocumentParsingObserver.EMPTY_INSTANCE);
+        return createWithProcessors(processors, DocumentParsingProvider.EMPTY_INSTANCE);
     }
 
     private static IngestService createWithProcessors(
         Map<String, Processor.Factory> processors,
-        Supplier<DocumentParsingObserver> documentParsingObserverSupplier
+        DocumentParsingProvider documentParsingProvider
     ) {
         Client client = mock(Client.class);
         ThreadPool threadPool = mock(ThreadPool.class);
@@ -2946,7 +2958,7 @@ public class IngestServiceTests extends ESTestCase {
             }),
             client,
             null,
-            documentParsingObserverSupplier
+            documentParsingProvider
         );
         if (randomBoolean()) {
             /*

+ 2 - 2
server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java

@@ -14,7 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.XContentType;
@@ -138,6 +138,6 @@ public class SimulateIngestServiceTests extends ESTestCase {
             public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
                 return processors;
             }
-        }), client, null, () -> DocumentParsingObserver.EMPTY_INSTANCE);
+        }), client, null, DocumentParsingProvider.EMPTY_INSTANCE);
     }
 }

+ 4 - 4
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -165,7 +165,7 @@ import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.monitor.StatusInfo;
 import org.elasticsearch.node.ResponseCollectorService;
 import org.elasticsearch.plugins.PluginsService;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.plugins.scanners.StablePluginsRegistry;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
@@ -1963,7 +1963,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     .client(client)
                     .featureService(new FeatureService(List.of(new IndicesFeatures())))
                     .metaStateService(new MetaStateService(nodeEnv, namedXContentRegistry))
-                    .documentParsingObserverSupplier(() -> DocumentParsingObserver.EMPTY_INSTANCE)
                     .build();
                 final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
                 snapshotShardsService = new SnapshotShardsService(
@@ -2105,7 +2104,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                             Collections.emptyList(),
                             client,
                             null,
-                            () -> DocumentParsingObserver.EMPTY_INSTANCE
+                            DocumentParsingProvider.EMPTY_INSTANCE
                         ),
                         mockFeatureService,
                         client,
@@ -2126,7 +2125,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     new UpdateHelper(scriptService),
                     actionFilters,
                     indexingMemoryLimits,
-                    EmptySystemIndices.INSTANCE
+                    EmptySystemIndices.INSTANCE,
+                    DocumentParsingProvider.EMPTY_INSTANCE
                 );
                 actions.put(TransportShardBulkAction.TYPE, transportShardBulkAction);
                 final RestoreService restoreService = new RestoreService(

+ 1 - 3
test/framework/src/main/java/org/elasticsearch/index/MapperTestUtils.java

@@ -18,7 +18,6 @@ import org.elasticsearch.index.mapper.MapperRegistry;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.indices.IndicesModule;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.script.ScriptCompiler;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -67,8 +66,7 @@ public class MapperTestUtils {
             mapperRegistry,
             () -> null,
             indexSettings.getMode().idFieldMapperWithoutFieldData(),
-            ScriptCompiler.NONE,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            ScriptCompiler.NONE
         );
     }
 }

+ 3 - 4
test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

@@ -22,7 +22,7 @@ import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.similarity.SimilarityService;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.indices.IndicesModule;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 
@@ -55,8 +55,7 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
             mapperRegistry,
             () -> null,
             indexSettings.getMode().idFieldMapperWithoutFieldData(),
-            null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            null
         );
     }
 
@@ -96,7 +95,7 @@ public class TranslogHandler implements Engine.TranslogRecoveryRunner {
                         XContentHelper.xContentType(index.source()),
                         index.routing(),
                         Map.of(),
-                        false
+                        DocumentSizeObserver.EMPTY_INSTANCE
                     ),
                     index.seqNo(),
                     index.primaryTerm(),

+ 20 - 5
test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java

@@ -58,7 +58,7 @@ import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.plugins.MapperPlugin;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptContext;
 import org.elasticsearch.search.aggregations.Aggregator;
@@ -219,8 +219,7 @@ public abstract class MapperServiceTestCase extends FieldTypeTestCase {
                 throw new UnsupportedOperationException();
             },
             indexSettings.getMode().buildIdFieldMapper(idFieldDataEnabled),
-            this::compileScript,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            this::compileScript
         );
     }
 
@@ -284,7 +283,14 @@ public abstract class MapperServiceTestCase extends FieldTypeTestCase {
         XContentBuilder builder = JsonXContent.contentBuilder().startObject();
         build.accept(builder);
         builder.endObject();
-        return new SourceToParse(id, BytesReference.bytes(builder), XContentType.JSON, routing, dynamicTemplates, false);
+        return new SourceToParse(
+            id,
+            BytesReference.bytes(builder),
+            XContentType.JSON,
+            routing,
+            dynamicTemplates,
+            DocumentSizeObserver.EMPTY_INSTANCE
+        );
     }
 
     /**
@@ -709,7 +715,16 @@ public abstract class MapperServiceTestCase extends FieldTypeTestCase {
         try (Directory roundTripDirectory = newDirectory()) {
             RandomIndexWriter roundTripIw = new RandomIndexWriter(random(), roundTripDirectory);
             roundTripIw.addDocument(
-                mapper.parse(new SourceToParse("1", new BytesArray(syntheticSource), XContentType.JSON, null, Map.of(), false)).rootDoc()
+                mapper.parse(
+                    new SourceToParse(
+                        "1",
+                        new BytesArray(syntheticSource),
+                        XContentType.JSON,
+                        null,
+                        Map.of(),
+                        DocumentSizeObserver.EMPTY_INSTANCE
+                    )
+                ).rootDoc()
             );
             roundTripIw.close();
             try (DirectoryReader roundTripReader = DirectoryReader.open(roundTripDirectory)) {

+ 9 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -71,6 +71,7 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
 import org.elasticsearch.indices.recovery.StartRecoveryRequest;
 import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService;
 import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.ShardGeneration;
@@ -961,7 +962,14 @@ public abstract class IndexShardTestCase extends ESTestCase {
             id = UUIDs.base64UUID();
             autoGeneratedTimestamp = System.currentTimeMillis();
         }
-        SourceToParse sourceToParse = new SourceToParse(id, new BytesArray(source), xContentType, routing, Map.of(), false);
+        SourceToParse sourceToParse = new SourceToParse(
+            id,
+            new BytesArray(source),
+            xContentType,
+            routing,
+            Map.of(),
+            DocumentSizeObserver.EMPTY_INSTANCE
+        );
         Engine.IndexResult result;
         if (shard.routingEntry().primary()) {
             result = shard.applyIndexOperationOnPrimary(

+ 1 - 3
test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java

@@ -65,7 +65,6 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.plugins.ScriptPlugin;
 import org.elasticsearch.plugins.SearchPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.plugins.scanners.StablePluginsRegistry;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.MockScriptService;
@@ -471,8 +470,7 @@ public abstract class AbstractBuilderTestCase extends ESTestCase {
                 mapperRegistry,
                 () -> createShardContext(null),
                 idxSettings.getMode().idFieldMapperWithoutFieldData(),
-                ScriptCompiler.NONE,
-                () -> DocumentParsingObserver.EMPTY_INSTANCE
+                ScriptCompiler.NONE
             );
             IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(nodeSettings, new IndexFieldDataCache.Listener() {
             });

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

@@ -63,6 +63,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
 import org.elasticsearch.indices.recovery.RecoverySettings;
 import org.elasticsearch.indices.recovery.RecoveryState;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.repositories.FinalizeSnapshotContext;
 import org.elasticsearch.repositories.IndexId;
 import org.elasticsearch.repositories.Repository;
@@ -487,7 +488,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
                                 XContentHelper.xContentType(source),
                                 rootFieldsVisitor.routing(),
                                 Map.of(),
-                                false
+                                DocumentSizeObserver.EMPTY_INSTANCE
                             ),
                             SequenceNumbers.UNASSIGNED_SEQ_NO,
                             0,

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -24,7 +24,7 @@ import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.license.MockLicenseState;
 import org.elasticsearch.plugins.IngestPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
@@ -135,7 +135,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             Collections.singletonList(SKINNY_INGEST_PLUGIN),
             client,
             null,
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE
         );
     }
 

+ 1 - 3
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java

@@ -52,7 +52,6 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.license.internal.XPackLicenseStatus;
 import org.elasticsearch.plugins.ExtensiblePlugin;
 import org.elasticsearch.plugins.MapperPlugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.plugins.internal.RestExtension;
 import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestHandler;
@@ -374,8 +373,7 @@ public class SecurityTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext()),
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
         security.onIndexModule(indexModule);
         // indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here

+ 1 - 3
x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java

@@ -15,7 +15,6 @@ import org.elasticsearch.index.engine.InternalEngineFactory;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.plugins.internal.DocumentParsingObserver;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.threadpool.ExecutorBuilder;
@@ -67,8 +66,7 @@ public class WatcherPluginTests extends ESTestCase {
             Collections.emptyMap(),
             () -> true,
             TestIndexNameExpressionResolver.newInstance(),
-            Collections.emptyMap(),
-            () -> DocumentParsingObserver.EMPTY_INSTANCE
+            Collections.emptyMap()
         );
         // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
         watcher.onIndexModule(indexModule);