瀏覽代碼

Don't close rest client from its callback (#22061)

If you try to close the rest client inside one of its callbacks then
it blocks itself. The thread pool switches the status to one that
requests a shutdown and then waits for the pool to shutdown. When
another thread attempts to honor the shutdown request it waits
for all the threads in the pool to finish what they are working on.
Thus thread a is waiting on thread b while thread b is waiting
on thread a. It isn't quite that simple, but it is close.

Relates to #22027
Nik Everett 8 年之前
父節點
當前提交
fc2060ba7e

+ 4 - 0
client/rest/src/main/java/org/elasticsearch/client/ResponseListener.java

@@ -23,6 +23,10 @@ package org.elasticsearch.client;
  * Listener to be provided when calling async performRequest methods provided by {@link RestClient}.
  * Those methods that do accept a listener will return immediately, execute asynchronously, and notify
  * the listener whenever the request yielded a response, or failed with an exception.
+ *
+ * <p>
+ * Note that it is <strong>not</strong> safe to call {@link RestClient#close()} from either of these
+ * callbacks.
  */
 public interface ResponseListener {
 

+ 11 - 5
modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

@@ -83,11 +83,17 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
 
     @Override
     public void close() {
-        try {
-            client.close();
-        } catch (IOException e) {
-            fail.accept(new IOException("couldn't close the remote connection", e));
-        }
+        super.close();
+        /* This might be called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail
+         * to close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. That way we
+         * never close the client in its own thread pool. */
+        threadPool.generic().submit(() -> {
+            try {
+                client.close();
+            } catch (IOException e) {
+                logger.error("Failed to shutdown the remote connection", e);
+            }
+        });
     }
 
     @Override

+ 0 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWithAuthTests.java

@@ -70,7 +70,6 @@ public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {
     @Override
     protected Settings nodeSettings() {
         Settings.Builder settings = Settings.builder().put(super.nodeSettings());
-        // Weird incantation required to test with netty
         settings.put(NetworkModule.HTTP_ENABLED.getKey(), true);
         // Whitelist reindexing from the http host we're going to use
         settings.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "127.0.0.1:*");

+ 53 - 0
modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml

@@ -257,3 +257,56 @@
             match:
               text: test
   - match: {hits.total: 1}
+
+---
+"Reindex from remote with broken query":
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      1
+        body:    { "text": "test" }
+        refresh: true
+
+  # Fetch the http host. We use the host of the master because we know there will always be a master.
+  - do:
+      cluster.state: {}
+  - set: { master_node: master }
+  - do:
+      nodes.info:
+        metric: [ http ]
+  - is_true: nodes.$master.http.publish_address
+  - set: {nodes.$master.http.publish_address: host}
+  - do:
+      catch: /query malformed, no start_object after query name/
+      reindex:
+        body:
+          source:
+            remote:
+              host: http://${host}
+            index: source
+            query:
+              garbage: not a query
+          dest:
+            index: dest
+
+---
+"Reindex from remote that you can't connect to":
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      1
+        body:    { "text": "test" }
+        refresh: true
+
+  - do:
+      catch: /Connection refused/
+      reindex:
+        body:
+          source:
+            remote:
+              host: http://127.0.0.1:0
+            index: source
+          dest:
+            index: dest