فهرست منبع

Reconstruct set of indices in BulkRequest (#110672)

Reconstruct indices set in BulkRequest constructor so that the correct thread pool can be used for forwarded bulk requests. Before this fix, forwarded bulk requests were always using the system_write thread pool because the indices set was empty.

Fixes issue https://github.com/elastic/elasticsearch/issues/102792
Ankita Kumar 1 سال پیش
والد
کامیت
5761c4afb5

+ 99 - 0
modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java

@@ -8,14 +8,23 @@
 package org.elasticsearch.ingest.common;
 
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.internal.Requests;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.gateway.GatewayService;
@@ -26,6 +35,7 @@ import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.MockScriptPlugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.util.Arrays;
@@ -33,11 +43,15 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.IntStream;
 
+import static org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric.INGEST;
 import static org.elasticsearch.test.NodeRoles.onlyRole;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
 // Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
@@ -326,4 +340,89 @@ public class IngestRestartIT extends ESIntegTestCase {
         source = client().prepareGet("index", "fails").get(timeout).getSource();
         assertNull(source);
     }
+
+    /**
+     * This test is for confirming that forwarded bulk requests do not use system_write thread pool
+     * for non-system indexes. Before this fix, we were using system_write thread pool for all forwarded
+     * bulk requests causing the system_write thread pool to get overloaded.
+     */
+    public void testForwardBulkWithSystemWritePoolDisabled() throws Exception {
+        // Create a node with master only role and a node with ingest role
+        final String masterOnlyNode = internalCluster().startMasterOnlyNode();
+        final String ingestNode = internalCluster().startNode();
+
+        ensureStableCluster(2);
+
+        // Create Bulk Request
+        createIndex("index");
+
+        BytesReference source = new BytesArray("""
+            {
+              "processors" : [
+                  {"set" : {"field": "y", "value": 0}}
+              ]
+            }""");
+
+        PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
+        clusterAdmin().putPipeline(putPipelineRequest).get();
+
+        int numRequests = scaledRandomIntBetween(32, 128);
+        BulkRequest bulkRequest = new BulkRequest();
+        BulkResponse response;
+        for (int i = 0; i < numRequests; i++) {
+            IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
+            indexRequest.source(Requests.INDEX_CONTENT_TYPE, "x", 1);
+            bulkRequest.add(indexRequest);
+        }
+        assertThat(numRequests, equalTo(bulkRequest.requests().size()));
+
+        // Block system_write thread pool on the ingest node
+        final ThreadPool ingestNodeThreadPool = internalCluster().getInstance(ThreadPool.class, ingestNode);
+        final var blockingLatch = new CountDownLatch(1);
+        try {
+            blockSystemWriteThreadPool(blockingLatch, ingestNodeThreadPool);
+            // Send bulk request to master only node, so it will forward it to the ingest node.
+            response = safeGet(client(masterOnlyNode).bulk(bulkRequest));
+        } finally {
+            blockingLatch.countDown();
+        }
+
+        // Make sure the requests are processed (even though we blocked system_write thread pool above).
+        assertThat(response.getItems().length, equalTo(numRequests));
+        assertFalse(response.hasFailures());
+
+        // Check Node Ingest stats
+        NodesStatsResponse nodesStatsResponse = clusterAdmin().nodesStats(new NodesStatsRequest(ingestNode).addMetric(INGEST)).actionGet();
+        assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
+
+        NodeStats stats = nodesStatsResponse.getNodes().get(0);
+        assertThat(stats.getIngestStats().totalStats().ingestCount(), equalTo((long) numRequests));
+        assertThat(stats.getIngestStats().totalStats().ingestFailedCount(), equalTo(0L));
+        final var pipelineStats = stats.getIngestStats().pipelineStats().get(0);
+        assertThat(pipelineStats.pipelineId(), equalTo("_id"));
+        assertThat(pipelineStats.stats().ingestCount(), equalTo((long) numRequests));
+
+        MultiGetResponse docListResponse = safeGet(
+            client().prepareMultiGet().addIds("index", IntStream.range(0, numRequests).mapToObj(String::valueOf).toList()).execute()
+        );
+
+        assertThat(docListResponse.getResponses().length, equalTo(numRequests));
+        Map<String, Object> document;
+        for (int i = 0; i < numRequests; i++) {
+            document = docListResponse.getResponses()[i].getResponse().getSourceAsMap();
+            assertThat(document.get("y"), equalTo(0));
+        }
+    }
+
+    private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
+        assertThat(blockingLatch.getCount(), greaterThan(0L));
+        final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
+        // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
+        expectThrows(EsRejectedExecutionException.class, () -> {
+            // noinspection InfiniteLoopStatement
+            while (true) {
+                executor.execute(() -> safeAwait(blockingLatch));
+            }
+        });
+    }
 }

+ 3 - 0
server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -87,6 +87,9 @@ public class BulkRequest extends ActionRequest
         requests.addAll(in.readCollectionAsList(i -> DocWriteRequest.readDocumentRequest(null, i)));
         refreshPolicy = RefreshPolicy.readFrom(in);
         timeout = in.readTimeValue();
+        for (DocWriteRequest<?> request : requests) {
+            indices.add(Objects.requireNonNull(request.index(), "request index must not be null"));
+        }
     }
 
     public BulkRequest(@Nullable String globalIndex) {

+ 20 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -33,6 +33,8 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.features.FeatureService;
@@ -54,6 +56,7 @@ import org.elasticsearch.transport.TransportService;
 import org.junit.After;
 import org.junit.Before;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -287,7 +290,7 @@ public class TransportBulkActionTests extends ESTestCase {
         prohibitCustomRoutingOnDataStream(writeRequestAgainstIndex, metadata.getIndicesLookup().get(writeRequestAgainstIndex.index()));
     }
 
-    public void testOnlySystem() {
+    public void testOnlySystem() throws IOException {
         SortedMap<String, IndexAbstraction> indicesLookup = new TreeMap<>();
         Settings settings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build();
         indicesLookup.put(
@@ -303,15 +306,23 @@ public class TransportBulkActionTests extends ESTestCase {
         );
         List<String> onlySystem = List.of(".foo", ".bar");
         assertTrue(TransportBulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices));
+        /* Test forwarded bulk requests (that are serialized then deserialized) */
+        assertTrue(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(onlySystem), indicesLookup, systemIndices));
 
         onlySystem = List.of(".foo", ".bar", ".test");
         assertTrue(TransportBulkAction.isOnlySystem(buildBulkRequest(onlySystem), indicesLookup, systemIndices));
+        /* Test forwarded bulk requests (that are serialized then deserialized) */
+        assertTrue(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(onlySystem), indicesLookup, systemIndices));
 
         List<String> nonSystem = List.of("foo", "bar");
         assertFalse(TransportBulkAction.isOnlySystem(buildBulkRequest(nonSystem), indicesLookup, systemIndices));
+        /* Test forwarded bulk requests (that are serialized then deserialized) */
+        assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(nonSystem), indicesLookup, systemIndices));
 
         List<String> mixed = List.of(".foo", ".test", "other");
         assertFalse(TransportBulkAction.isOnlySystem(buildBulkRequest(mixed), indicesLookup, systemIndices));
+        /* Test forwarded bulk requests (that are serialized then deserialized) */
+        assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(mixed), indicesLookup, systemIndices));
     }
 
     private void blockWriteThreadPool(CountDownLatch blockingLatch) {
@@ -463,4 +474,12 @@ public class TransportBulkActionTests extends ESTestCase {
         }
         return request;
     }
+
+    private BulkRequest buildBulkStreamRequest(List<String> indices) throws IOException {
+        BulkRequest request = buildBulkRequest(indices);
+        BytesStreamOutput out = new BytesStreamOutput();
+        request.writeTo(out);
+        StreamInput streamInput = out.bytes().streamInput();
+        return (new BulkRequest(streamInput));
+    }
 }