瀏覽代碼

Handle Bulk Requests on Write Threadpool (#40866)

* Bulk requests can be thousands of items large and take more than O(10ms) time to handle => we should not handle them on the transport threadpool to not block select loops
* relates #39128
* relates #39658
Armin Braun 6 年之前
父節點
當前提交
2931e9ca45

+ 3 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -117,7 +117,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                                TransportShardBulkAction shardBulkAction, NodeClient client,
                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
                                AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
-        super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new);
+        super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new, ThreadPool.Names.WRITE);
         Objects.requireNonNull(relativeTimeProvider);
         this.threadPool = threadPool;
         this.clusterService = clusterService;
@@ -258,7 +258,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         @Override
                         public void onResponse(CreateIndexResponse result) {
                             if (counter.decrementAndGet() == 0) {
-                                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
+                                threadPool.executor(ThreadPool.Names.WRITE).execute(
+                                    () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
                             }
                         }
 

+ 8 - 3
server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

@@ -18,7 +18,6 @@
  */
 package org.elasticsearch.action.support;
 
-import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -57,6 +56,13 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
             new TransportHandler());
     }
 
+    protected HandledTransportAction(String actionName, TransportService transportService, ActionFilters actionFilters,
+                                     Supplier<Request> request, String executor) {
+        super(actionName, actionFilters, transportService.getTaskManager());
+        transportService.registerRequestHandler(actionName, request, executor, false, true,
+            new TransportHandler());
+    }
+
     protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
                                      TransportService transportService, ActionFilters actionFilters,
                                      Writeable.Reader<Request> requestReader) {
@@ -73,9 +79,8 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
 
     class TransportHandler implements TransportRequestHandler<Request> {
         @Override
-        public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
+        public final void messageReceived(final Request request, final TransportChannel channel, Task task) {
             // We already got the task created on the network layer - no need to create it again on the transport layer
-            Logger logger = HandledTransportAction.this.logger;
             execute(task, request, new ChannelActionListener<>(channel, actionName, request));
         }
     }

+ 9 - 3
server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.transport.RemoteTransportException;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -133,9 +134,14 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
                     }
                 }
             } else {
-                Throwable t = (Throwable) response;
-                // we're not expecting any other errors
-                throw new AssertionError("Unexpected failure", t);
+                if (response instanceof RemoteTransportException
+                    && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
+                    // ignored, we exceeded the write queue size with dispatching the initial bulk request
+                } else {
+                    Throwable t = (Throwable) response;
+                    // we're not expecting any other errors
+                    throw new AssertionError("Unexpected failure", t);
+                }
             }
         }
 

+ 8 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -30,20 +30,24 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -102,7 +106,10 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
         ClusterState state = mock(ClusterState.class);
         when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
         when(clusterService.state()).thenReturn(state);
-        TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService,
+        final ThreadPool threadPool = mock(ThreadPool.class);
+        final ExecutorService direct = EsExecutors.newDirectExecutorService();
+        when(threadPool.executor(anyString())).thenReturn(direct);
+        TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
                 null, null, null, mock(ActionFilters.class), null, null) {
             @Override
             void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,

+ 9 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -45,11 +45,13 @@ import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportResponseHandler;
 import org.elasticsearch.transport.TransportService;
 import org.junit.Before;
@@ -61,6 +63,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -68,6 +71,7 @@ import java.util.function.Consumer;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
     TransportService transportService;
     ClusterService clusterService;
     IngestService ingestService;
+    ThreadPool threadPool;
 
     /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
     @Captor
@@ -126,7 +131,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
 
         TestTransportBulkAction() {
-            super(null, transportService, clusterService, ingestService,
+            super(threadPool, transportService, clusterService, ingestService,
                 null, null, new ActionFilters(Collections.emptySet()), null,
                 new AutoCreateIndex(
                     SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
@@ -163,6 +168,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
     @Before
     public void setupAction() {
         // initialize captors, which must be members to use @Capture because of generics
+        threadPool = mock(ThreadPool.class);
+        final ExecutorService direct = EsExecutors.newDirectExecutorService();
+        when(threadPool.executor(anyString())).thenReturn(direct);
         MockitoAnnotations.initMocks(this);
         // setup services that will be called by action
         transportService = mock(TransportService.class);