浏览代码

Propogate rejected execution during bulk actions (#64842)

Currently a rejected execution exception can be swallowed when async
actions return during transport bulk actions. This includes scenarios
where we went async to perform ingest pipelines or index creation. This
commit resolves the issue by propogating a rejected exeception.
Tim Brooks 5 年之前
父节点
当前提交
01217e929e

+ 25 - 15
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -57,7 +57,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexNotFoundException;
@@ -253,8 +252,13 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                     @Override
                     @Override
                     public void onResponse(CreateIndexResponse result) {
                     public void onResponse(CreateIndexResponse result) {
                         if (counter.decrementAndGet() == 0) {
                         if (counter.decrementAndGet() == 0) {
-                            threadPool.executor(executorName).execute(
-                                () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
+                            threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
+
+                                @Override
+                                protected void doRun() {
+                                    executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
+                                }
+                            });
                         }
                         }
                     }
                     }
 
 
@@ -274,11 +278,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                             }
                             }
                         }
                         }
                         if (counter.decrementAndGet() == 0) {
                         if (counter.decrementAndGet() == 0) {
-                            threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime,
-                                ActionListener.wrap(listener::onResponse, inner -> {
-                                    inner.addSuppressed(e);
-                                    listener.onFailure(inner);
-                                }), responses, indicesThatCannotBeCreated));
+                            final ActionListener<BulkResponse> wrappedListener = ActionListener.wrap(listener::onResponse, inner -> {
+                                inner.addSuppressed(e);
+                                listener.onFailure(inner);
+                            });
+                            threadPool.executor(executorName).execute(new ActionRunnable<>(wrappedListener) {
+                                @Override
+                                protected void doRun() {
+                                    executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
+                                }
+
+                                @Override
+                                public void onRejection(Exception rejectedException) {
+                                    rejectedException.addSuppressed(e);
+                                    super.onRejection(rejectedException);
+                                }
+                            });
                         }
                         }
                     }
                     }
                 });
                 });
@@ -698,14 +713,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                             assert Thread.currentThread().getName().contains(executorName);
                             assert Thread.currentThread().getName().contains(executorName);
                             doInternalExecute(task, bulkRequest, executorName, actionListener);
                             doInternalExecute(task, bulkRequest, executorName, actionListener);
                         } else {
                         } else {
-                            threadPool.executor(executorName).execute(new AbstractRunnable() {
-                                @Override
-                                public void onFailure(Exception e) {
-                                    listener.onFailure(e);
-                                }
-
+                            threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
                                 @Override
                                 @Override
-                                protected void doRun() throws Exception {
+                                protected void doRun() {
                                     doInternalExecute(task, bulkRequest, executorName, actionListener);
                                     doInternalExecute(task, bulkRequest, executorName, actionListener);
                                 }
                                 }
 
 

+ 23 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -19,6 +19,7 @@
 
 
 package org.elasticsearch.action.bulk;
 package org.elasticsearch.action.bulk;
 
 
+import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteRequest;
@@ -41,6 +42,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.VersionType;
@@ -73,12 +75,13 @@ public class TransportBulkActionTests extends ESTestCase {
     /** Services needed by bulk action */
     /** Services needed by bulk action */
     private TransportService transportService;
     private TransportService transportService;
     private ClusterService clusterService;
     private ClusterService clusterService;
-    private ThreadPool threadPool;
+    private TestThreadPool threadPool;
 
 
     private TestTransportBulkAction bulkAction;
     private TestTransportBulkAction bulkAction;
 
 
     class TestTransportBulkAction extends TransportBulkAction {
     class TestTransportBulkAction extends TransportBulkAction {
 
 
+        volatile boolean failIndexCreation = false;
         boolean indexCreated = false; // set when the "real" index is created
         boolean indexCreated = false; // set when the "real" index is created
 
 
         TestTransportBulkAction() {
         TestTransportBulkAction() {
@@ -90,7 +93,11 @@ public class TransportBulkActionTests extends ESTestCase {
         @Override
         @Override
         void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
         void createIndex(String index, TimeValue timeout, Version minNodeVersion, ActionListener<CreateIndexResponse> listener) {
             indexCreated = true;
             indexCreated = true;
-            listener.onResponse(null);
+            if (failIndexCreation) {
+                listener.onFailure(new ResourceAlreadyExistsException("index already exists"));
+            } else {
+                listener.onResponse(null);
+            }
         }
         }
     }
     }
 
 
@@ -261,6 +268,20 @@ public class TransportBulkActionTests extends ESTestCase {
         assertFalse(bulkAction.isOnlySystem(buildBulkRequest(mixed), indicesLookup, systemIndices));
         assertFalse(bulkAction.isOnlySystem(buildBulkRequest(mixed), indicesLookup, systemIndices));
     }
     }
 
 
+    public void testRejectionAfterCreateIndexIsPropagated() throws Exception {
+        BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
+        bulkAction.failIndexCreation = randomBoolean();
+
+        try {
+            threadPool.startForcingRejections();
+            PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
+            ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
+            expectThrows(EsRejectedExecutionException.class, future::actionGet);
+        } finally {
+            threadPool.stopForcingRejections();
+        }
+    }
+
     private BulkRequest buildBulkRequest(List<String> indices) {
     private BulkRequest buildBulkRequest(List<String> indices) {
         BulkRequest request = new BulkRequest();
         BulkRequest request = new BulkRequest();
         for (String index : indices) {
         for (String index : indices) {

+ 70 - 0
test/framework/src/main/java/org/elasticsearch/threadpool/TestThreadPool.java

@@ -20,10 +20,20 @@
 package org.elasticsearch.threadpool;
 package org.elasticsearch.threadpool;
 
 
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.node.Node;
 import org.elasticsearch.node.Node;
 
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
 public class TestThreadPool extends ThreadPool {
 public class TestThreadPool extends ThreadPool {
 
 
+    private final CountDownLatch blockingLatch = new CountDownLatch(1);
+    private volatile boolean returnRejectingExecutor = false;
+    private volatile ThreadPoolExecutor rejectingExecutor;
+
     public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
     public TestThreadPool(String name, ExecutorBuilder<?>... customBuilders) {
         this(name, Settings.EMPTY, customBuilders);
         this(name, Settings.EMPTY, customBuilders);
     }
     }
@@ -32,4 +42,64 @@ public class TestThreadPool extends ThreadPool {
         super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
         super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), name).put(settings).build(), customBuilders);
     }
     }
 
 
+    @Override
+    public ExecutorService executor(String name) {
+        if (returnRejectingExecutor) {
+            return rejectingExecutor;
+        } else {
+            return super.executor(name);
+        }
+    }
+
+    public void startForcingRejections() {
+        if (rejectingExecutor == null) {
+            createRejectingExecutor();
+        }
+        returnRejectingExecutor = true;
+    }
+
+    public void stopForcingRejections() {
+        returnRejectingExecutor = false;
+    }
+
+    @Override
+    public void shutdown() {
+        blockingLatch.countDown();
+        if (rejectingExecutor != null) {
+            rejectingExecutor.shutdown();
+        }
+        super.shutdown();
+    }
+
+    @Override
+    public void shutdownNow() {
+        blockingLatch.countDown();
+        if (rejectingExecutor != null) {
+            rejectingExecutor.shutdownNow();
+        }
+        super.shutdownNow();
+    }
+
+    private synchronized void createRejectingExecutor() {
+        if (rejectingExecutor != null) {
+            return;
+        }
+        ThreadFactory factory = EsExecutors.daemonThreadFactory("reject_thread");
+        rejectingExecutor = EsExecutors.newFixed("rejecting", 1, 0, factory, getThreadContext(), false);
+
+        CountDownLatch startedLatch = new CountDownLatch(1);
+        rejectingExecutor.execute(() -> {
+            try {
+                startedLatch.countDown();
+                blockingLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        try {
+            startedLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
 }