浏览代码

Use response executor when handling failure in field-caps (#107370)

The transport service may call onFailure with a thread pool other than 
search_coordinator. This change adds a workaround to ensure that the
merging of field-caps always occurs on the search_coordinator.
Nhat Nguyen 1 年之前
父节点
当前提交
9810b65d96

+ 5 - 0
docs/changelog/107370.yaml

@@ -0,0 +1,5 @@
+pr: 107370
+summary: Fork when handling remote field-caps responses
+area: Search
+type: bug
+issues: []

+ 20 - 0
server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java

@@ -25,8 +25,10 @@ import java.util.Collection;
 import java.util.List;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 
 public class CCSFieldCapabilitiesIT extends AbstractMultiClustersTestCase {
 
@@ -35,6 +37,11 @@ public class CCSFieldCapabilitiesIT extends AbstractMultiClustersTestCase {
         return List.of("remote_cluster");
     }
 
+    @Override
+    protected boolean reuseClusters() {
+        return false;
+    }
+
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
@@ -105,4 +112,17 @@ public class CCSFieldCapabilitiesIT extends AbstractMultiClustersTestCase {
         assertEquals(IllegalArgumentException.class, ex.getClass());
         assertEquals("I throw because I choose to.", ex.getMessage());
     }
+
+    public void testFailedToConnectToRemoteCluster() throws Exception {
+        String localIndex = "local_index";
+        assertAcked(client(LOCAL_CLUSTER).admin().indices().prepareCreate(localIndex));
+        client(LOCAL_CLUSTER).prepareIndex(localIndex).setId("1").setSource("foo", "bar").get();
+        client(LOCAL_CLUSTER).admin().indices().prepareRefresh(localIndex).get();
+        cluster("remote_cluster").close();
+        FieldCapabilitiesResponse response = client().prepareFieldCaps("*", "remote_cluster:*").setFields("*").get();
+        assertThat(response.getIndices(), arrayContaining(localIndex));
+        List<FieldCapabilitiesFailure> failures = response.getFailures();
+        assertThat(failures, hasSize(1));
+        assertThat(failures.get(0).getIndices(), arrayContaining("remote_cluster:*"));
+    }
 }

+ 20 - 1
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.RemoteClusterActionType;
+import org.elasticsearch.action.support.AbstractThreadedActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -252,7 +253,14 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
                 remoteClusterClient.execute(
                     TransportFieldCapabilitiesAction.REMOTE_TYPE,
                     remoteRequest,
-                    ActionListener.releaseAfter(remoteListener, refs.acquire())
+                    // The underlying transport service may call onFailure with a thread pool other than search_coordinator.
+                    // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
+                    // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
+                    new ForkingOnFailureActionListener<>(
+                        searchCoordinationExecutor,
+                        true,
+                        ActionListener.releaseAfter(remoteListener, refs.acquire())
+                    )
                 );
             }
         }
@@ -569,4 +577,15 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             });
         }
     }
+
+    private static class ForkingOnFailureActionListener<Response> extends AbstractThreadedActionListener<Response> {
+        ForkingOnFailureActionListener(Executor executor, boolean forceExecution, ActionListener<Response> delegate) {
+            super(executor, forceExecution, delegate);
+        }
+
+        @Override
+        public void onResponse(Response response) {
+            delegate.onResponse(response);
+        }
+    }
 }

+ 0 - 3
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java

@@ -494,9 +494,6 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
         assertThat(flatList, containsInAnyOrder("engineering"));
     }
 
-    @SuppressWarnings("unchecked")
-    @AwaitsFix(bugUrl = "this trips ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION)")
-    // comment out those assertions in EsqlIndexResolver and TransportFieldCapabilitiesAction to see this test pass
     public void testCrossClusterQueryAgainstInvalidRemote() throws Exception {
         configureRemoteCluster();
         populateData();