Переглянути джерело

Infrastructure for metering the update requests (#105063)

udpate request that are sending a document (or part of it) should allow for metering the size of that doc
the update request that are using a script should not be metered - reported size 0.

this commit is following up on #104859

The parsing is of the update's document is being done in UpdateHelper - the same pattern we use to meter parsing in IngestService. If the script is being used, the size observed will be 0.
The value observed is then reported in the TransportShardBulkAction and thanks to the value being 0 or positive it will not be metering the modified document again.

This commit also renames the getDocumentParsingSupplier to getDocumentParsingProvider (this was accidentally omitted in the #104859)
Przemyslaw Gomulka 1 рік тому
батько
коміт
a103e3c7a4

+ 5 - 0
docs/changelog/105063.yaml

@@ -0,0 +1,5 @@
+pr: 105063
+summary: Infrastructure for metering the update requests
+area: Infra/Metrics
+type: enhancement
+issues: []

+ 1 - 1
modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverWithPipelinesIT.java

@@ -78,7 +78,7 @@ public class DocumentSizeObserverWithPipelinesIT extends ESIntegTestCase {
         public TestDocumentParsingProviderPlugin() {}
 
         @Override
-        public DocumentParsingProvider getDocumentParsingSupplier() {
+        public DocumentParsingProvider getDocumentParsingProvider() {
             // returns a static instance, because we want to assert that the wrapping is called only once
             return new DocumentParsingProvider() {
                 @Override

+ 4 - 4
server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/DocumentSizeObserverIT.java

@@ -39,7 +39,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
             new IndexRequest(TEST_INDEX_NAME).id("1").source(jsonBuilder().startObject().field("test", "I am sam i am").endObject())
         ).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingSupplierPlugin
+        // there are more assertions in a TestDocumentParsingProviderPlugin
 
         hasWrappedParser = false;
         // the format of the request does not matter
@@ -47,7 +47,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
             new IndexRequest(TEST_INDEX_NAME).id("2").source(cborBuilder().startObject().field("test", "I am sam i am").endObject())
         ).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingSupplierPlugin
+        // there are more assertions in a TestDocumentParsingProviderPlugin
 
         hasWrappedParser = false;
         // white spaces does not matter
@@ -59,7 +59,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
             }
             """, XContentType.JSON)).actionGet();
         assertTrue(hasWrappedParser);
-        // there are more assertions in a TestDocumentParsingSupplierPlugin
+        // there are more assertions in a TestDocumentParsingProviderPlugin
     }
 
     @Override
@@ -72,7 +72,7 @@ public class DocumentSizeObserverIT extends ESIntegTestCase {
         public TestDocumentParsingProviderPlugin() {}
 
         @Override
-        public DocumentParsingProvider getDocumentParsingSupplier() {
+        public DocumentParsingProvider getDocumentParsingProvider() {
             return new DocumentParsingProvider() {
 
                 @Override

+ 7 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

@@ -455,16 +455,20 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     }
 
     /**
-     * Creates a new document size observerl
+     * Creates a new document size observer
      * @param documentParsingProvider a provider to create a new observer.
      * @param request an index request to provide information about bytes being already parsed.
-     * @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService).
+     * @return a Fixed version of DocumentSizeObserver if parsing already happened (in IngestService, UpdateHelper)
+     * and there is a value to be reported >0
      * It would be pre-populated with information about how many bytes were already parsed
-     * or return a new 'empty' DocumentSizeObserver.
+     * or a noop instance if parsed bytes in IngestService/UpdateHelper was 0 (like when empty doc or script in update)
+     * or return a new DocumentSizeObserver that will be used when parsing.
      */
     private static DocumentSizeObserver getDocumentSizeObserver(DocumentParsingProvider documentParsingProvider, IndexRequest request) {
         if (request.getNormalisedBytesParsed() != -1) {
             return documentParsingProvider.newFixedSizeDocumentObserver(request.getNormalisedBytesParsed());
+        } else if (request.getNormalisedBytesParsed() == 0) {
+            return DocumentSizeObserver.EMPTY_INSTANCE;
         }
         return documentParsingProvider.newDocumentSizeObserver();
     }

+ 29 - 8
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -59,14 +59,14 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
 
 /**
  * Index request to index a typed JSON document into a specific index and make it searchable.
- *
+ * <p>
  * The index requires the {@link #index()}, {@link #id(String)} and
  * {@link #source(byte[], XContentType)} to be set.
- *
+ * <p>
  * The source (content to index) can be set in its bytes form using ({@link #source(byte[], XContentType)}),
  * its string form ({@link #source(String, XContentType)}) or using a {@link org.elasticsearch.xcontent.XContentBuilder}
  * ({@link #source(org.elasticsearch.xcontent.XContentBuilder)}).
- *
+ * <p>
  * If the {@link #id(String)} is not set, it will be automatically generated.
  *
  * @see IndexResponse
@@ -453,7 +453,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
 
     /**
      * Sets the document source to index.
-     *
+     * <p>
      * Note, its preferable to either set it using {@link #source(org.elasticsearch.xcontent.XContentBuilder)}
      * or using the {@link #source(byte[], XContentType)}.
      */
@@ -632,7 +632,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * only perform this indexing request if the document was last modification was assigned the given
      * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
-     *
+     * <p>
      * If the document last modification was assigned a different sequence number a
      * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
      */
@@ -647,7 +647,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * only performs this indexing request if the document was last modification was assigned the given
      * primary term. Must be used in combination with {@link #setIfSeqNo(long)}
-     *
+     * <p>
      * If the document last modification was assigned a different term a
      * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
      */
@@ -670,7 +670,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
 
     /**
      * If set, only perform this indexing request if the document was last modification was assigned this primary term.
-     *
+     * <p>
      * If the document last modification was assigned a different term a
      * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
      */
@@ -933,16 +933,36 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         this.rawTimestamp = rawTimestamp;
     }
 
+    /**
+     * Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service)
+     * Defaults to -1 when a document size was not observed in earlier stages.
+     * @return a number of bytes observed
+     */
     public long getNormalisedBytesParsed() {
         return normalisedBytesParsed;
     }
 
-    public void setNormalisedBytesParsed(long normalisedBytesParsed) {
+    /**
+     * Sets number of bytes observed by a <code>DocumentSizeObserver</code>
+     * @return an index request
+     */
+    public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
         this.normalisedBytesParsed = normalisedBytesParsed;
+        return this;
+    }
+
+    /**
+     * when observing document size while parsing, this method indicates that this request should not be recorded.
+     * @return an index request
+     */
+    public IndexRequest noParsedBytesToReport() {
+        this.normalisedBytesParsed = 0;
+        return this;
     }
 
     /**
      * Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
+     *
      * @param pipeline
      */
     public void addPipeline(String pipeline) {
@@ -957,6 +977,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * This returns the list of pipelines executed on the document for this request. If listExecutedPipelines is false, the response will be
      * null, even if pipelines were executed. If listExecutedPipelines is true but no pipelines were executed, the list will be empty.
+     *
      * @return
      */
     @Nullable

+ 16 - 5
server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

@@ -26,6 +26,8 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
+import org.elasticsearch.plugins.internal.DocumentSizeObserver;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.UpdateCtxMap;
@@ -47,9 +49,11 @@ public class UpdateHelper {
     private static final Logger logger = LogManager.getLogger(UpdateHelper.class);
 
     private final ScriptService scriptService;
+    private final DocumentParsingProvider documentParsingProvider;
 
-    public UpdateHelper(ScriptService scriptService) {
+    public UpdateHelper(ScriptService scriptService, DocumentParsingProvider documentParsingProvider) {
         this.scriptService = scriptService;
+        this.documentParsingProvider = documentParsingProvider;
     }
 
     /**
@@ -174,14 +178,19 @@ public class UpdateHelper {
      * Prepare the request for merging the existing document with a new one, can optionally detect a noop change. Returns a {@code Result}
      * containing a new {@code IndexRequest} to be executed on the primary and replicas.
      */
-    static Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
+    Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
         final IndexRequest currentRequest = request.doc();
         final String routing = calculateRouting(getResult, currentRequest);
+        final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
         final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
         final XContentType updateSourceContentType = sourceAndContent.v1();
         final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
 
-        final boolean noop = XContentHelper.update(updatedSourceAsMap, currentRequest.sourceAsMap(), detectNoop) == false;
+        final boolean noop = XContentHelper.update(
+            updatedSourceAsMap,
+            currentRequest.sourceAsMap(documentSizeObserver),
+            detectNoop
+        ) == false;
 
         // We can only actually turn the update into a noop if detectNoop is true to preserve backwards compatibility and to handle cases
         // where users repopulating multi-fields or adding synonyms, etc.
@@ -216,7 +225,8 @@ public class UpdateHelper {
                 .setIfPrimaryTerm(getResult.getPrimaryTerm())
                 .waitForActiveShards(request.waitForActiveShards())
                 .timeout(request.timeout())
-                .setRefreshPolicy(request.getRefreshPolicy());
+                .setRefreshPolicy(request.getRefreshPolicy())
+                .setNormalisedBytesParsed(documentSizeObserver.normalisedBytesParsed());
             return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
         }
     }
@@ -258,7 +268,8 @@ public class UpdateHelper {
                     .setIfPrimaryTerm(getResult.getPrimaryTerm())
                     .waitForActiveShards(request.waitForActiveShards())
                     .timeout(request.timeout())
-                    .setRefreshPolicy(request.getRefreshPolicy());
+                    .setRefreshPolicy(request.getRefreshPolicy())
+                    .noParsedBytesToReport();
                 return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
             }
             case DELETE -> {

+ 17 - 10
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -256,9 +256,12 @@ class NodeConstruction {
 
             SearchModule searchModule = constructor.createSearchModule(settingsModule.getSettings(), threadPool);
             constructor.createClientAndRegistries(settingsModule.getSettings(), threadPool, searchModule);
+            DocumentParsingProvider documentParsingProvider = constructor.getDocumentParsingProvider();
 
             ScriptService scriptService = constructor.createScriptService(settingsModule, threadPool, serviceProvider);
 
+            constructor.createUpdateHelper(documentParsingProvider, scriptService);
+
             constructor.construct(
                 threadPool,
                 settingsModule,
@@ -267,7 +270,8 @@ class NodeConstruction {
                 constructor.createAnalysisRegistry(),
                 serviceProvider,
                 forbidPrivateIndexSettings,
-                telemetryProvider
+                telemetryProvider,
+                documentParsingProvider
             );
 
             return constructor;
@@ -570,14 +574,18 @@ class NodeConstruction {
             threadPool::absoluteTimeInMillis
         );
         ScriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
-        modules.add(b -> {
-            b.bind(ScriptService.class).toInstance(scriptService);
-            b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService));
-        });
+        modules.add(b -> { b.bind(ScriptService.class).toInstance(scriptService); });
 
         return scriptService;
     }
 
+    private UpdateHelper createUpdateHelper(DocumentParsingProvider documentParsingProvider, ScriptService scriptService) {
+        UpdateHelper updateHelper = new UpdateHelper(scriptService, documentParsingProvider);
+
+        modules.add(b -> { b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService, documentParsingProvider)); });
+        return updateHelper;
+    }
+
     private AnalysisRegistry createAnalysisRegistry() throws IOException {
         AnalysisRegistry registry = new AnalysisModule(
             environment,
@@ -596,7 +604,8 @@ class NodeConstruction {
         AnalysisRegistry analysisRegistry,
         NodeServiceProvider serviceProvider,
         boolean forbidPrivateIndexSettings,
-        TelemetryProvider telemetryProvider
+        TelemetryProvider telemetryProvider,
+        DocumentParsingProvider documentParsingProvider
     ) throws IOException {
 
         Settings settings = settingsModule.getSettings();
@@ -612,12 +621,10 @@ class NodeConstruction {
             ).collect(Collectors.toSet()),
             telemetryProvider.getTracer()
         );
-        final Tracer tracer = telemetryProvider.getTracer();
 
         ClusterService clusterService = createClusterService(settingsModule, threadPool, taskManager);
         clusterService.addStateApplier(scriptService);
 
-        DocumentParsingProvider documentParsingProvider = getDocumentParsingSupplier();
         modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
 
         final IngestService ingestService = new IngestService(
@@ -1298,8 +1305,8 @@ class NodeConstruction {
         logger.info("initialized");
     }
 
-    private DocumentParsingProvider getDocumentParsingSupplier() {
-        return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingSupplier)
+    private DocumentParsingProvider getDocumentParsingProvider() {
+        return getSinglePlugin(DocumentParsingProviderPlugin.class).map(DocumentParsingProviderPlugin::getDocumentParsingProvider)
             .orElse(DocumentParsingProvider.EMPTY_INSTANCE);
     }
 

+ 3 - 3
server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProviderPlugin.java

@@ -9,12 +9,12 @@
 package org.elasticsearch.plugins.internal;
 
 /**
- * An internal plugin that will return a supplier of DocumentParsingSupplier.
+ * An internal plugin that will return a DocumentParsingProvider.
  */
 public interface DocumentParsingProviderPlugin {
 
     /**
-     * @return a DocumentParsingSupplier to create instances of observer and reporter of parsing events
+     * @return a DocumentParsingProvider to create instances of observer and reporter of parsing events
      */
-    DocumentParsingProvider getDocumentParsingSupplier();
+    DocumentParsingProvider getDocumentParsingProvider();
 }

+ 7 - 5
server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.get.GetResult;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptEngine;
@@ -59,6 +60,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Mockito.mock;
 
 public class UpdateRequestTests extends ESTestCase {
 
@@ -114,7 +116,7 @@ public class UpdateRequestTests extends ESTestCase {
         final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap());
         Map<String, ScriptEngine> engines = Collections.singletonMap(engine.getType(), engine);
         ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS, () -> 1L);
-        updateHelper = new UpdateHelper(scriptService);
+        updateHelper = new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE);
     }
 
     @SuppressWarnings("unchecked")
@@ -590,14 +592,14 @@ public class UpdateRequestTests extends ESTestCase {
         try (var parser = createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))) {
             request = new UpdateRequest("test", "1").fromXContent(parser);
         }
-
-        UpdateHelper.Result result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
+        UpdateHelper updateHelper = new UpdateHelper(mock(ScriptService.class), DocumentParsingProvider.EMPTY_INSTANCE);
+        UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
 
         assertThat(result.action(), instanceOf(UpdateResponse.class));
         assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.NOOP));
 
         // Try again, with detectNoop turned off
-        result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false);
+        result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, false);
         assertThat(result.action(), instanceOf(IndexRequest.class));
         assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));
         assertThat(result.updatedSourceAsMap().get("body").toString(), equalTo("foo"));
@@ -605,7 +607,7 @@ public class UpdateRequestTests extends ESTestCase {
         try (var parser = createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"bar\"}}"))) {
             // Change the request to be a different doc
             request = new UpdateRequest("test", "1").fromXContent(parser);
-            result = UpdateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
+            result = updateHelper.prepareUpdateIndexRequest(shardId, request, getResult, true);
 
             assertThat(result.action(), instanceOf(IndexRequest.class));
             assertThat(result.getResponseResult(), equalTo(DocWriteResponse.Result.UPDATED));

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -2200,7 +2200,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     threadPool,
                     shardStateAction,
                     mappingUpdatedAction,
-                    new UpdateHelper(scriptService),
+                    new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE),
                     actionFilters,
                     indexingMemoryLimits,
                     EmptySystemIndices.INSTANCE,

+ 2 - 1
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java

@@ -112,6 +112,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.license.MockLicenseState;
 import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.plugins.internal.DocumentParsingProvider;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.SearchShardTarget;
@@ -1576,7 +1577,7 @@ public class AuthorizationServiceTests extends ESTestCase {
         TransportShardBulkAction.performOnPrimary(
             request,
             indexShard,
-            new UpdateHelper(mock(ScriptService.class)),
+            new UpdateHelper(mock(ScriptService.class), DocumentParsingProvider.EMPTY_INSTANCE),
             System::currentTimeMillis,
             mappingUpdater,
             waitForMappingUpdate,