فهرست منبع

[ML] Offload request to generic threadpool (#109104)

buildTextStructureResponse can take a long time, presumably up to 25s,
and is currently blocking the transport thread for that amount of time.
Offloading the request to the generic threadpool will at least let the
transport thread handle other requests concurrently.

Co-authored-by: David Turner <david.turner@elastic.co>
Pat Whelan 1 سال پیش
والد
کامیت
1582c645bf

+ 6 - 0
docs/changelog/109104.yaml

@@ -0,0 +1,6 @@
+pr: 109104
+summary: Offload request to generic threadpool
+area: Machine Learning
+type: bug
+issues:
+ - 109100

+ 47 - 0
x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/transport/TextStructExecutor.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.textstructure.transport;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.common.CheckedSupplier;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
+
+/**
+ * workaround for https://github.com/elastic/elasticsearch/issues/97916
+ * TODO delete this entire class when we can
+ */
+public class TextStructExecutor {
+    private final ThreadPool threadPool;
+
+    @Inject
+    public TextStructExecutor(ThreadPool threadPool) {
+        this.threadPool = threadPool;
+    }
+
+    /**
+     * when the workaround is removed, change the value in each consuming class's constructor passes to the super constructor from
+     * DIRECT_EXECUTOR_SERVICE back to threadpool.generic() so that we continue to fork off of the transport thread.
+     */
+    ExecutorService handledTransportActionExecutorService() {
+        return DIRECT_EXECUTOR_SERVICE;
+    }
+
+    /**
+     * when the workaround is removed, change the callers of this function to
+     * {@link ActionListener#completeWith(ActionListener, CheckedSupplier)}.
+     */
+    <T> void execute(ActionListener<T> listener, CheckedSupplier<T, Exception> supplier) {
+        threadPool.generic().execute(ActionRunnable.supply(listener, supplier));
+    }
+}

+ 11 - 10
x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/transport/TransportFindFieldStructureAction.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.textstructure.transport;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -31,6 +32,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
+
 public class TransportFindFieldStructureAction extends HandledTransportAction<FindFieldStructureAction.Request, FindStructureResponse> {
 
     private final Client client;
@@ -58,21 +61,18 @@ public class TransportFindFieldStructureAction extends HandledTransportAction<Fi
             .setFetchSource(true)
             .setQuery(QueryBuilders.existsQuery(request.getField()))
             .setFetchSource(new String[] { request.getField() }, null)
-            .execute(ActionListener.wrap(searchResponse -> {
-                long hitCount = searchResponse.getHits().getHits().length;
+            .execute(listener.delegateFailureAndWrap((delegate, searchResponse) -> {
+                var hitCount = searchResponse.getHits().getHits().length;
                 if (hitCount < AbstractFindStructureRequest.MIN_SAMPLE_LINE_COUNT) {
-                    listener.onFailure(
+                    delegate.onFailure(
                         new IllegalArgumentException("Input contained too few lines [" + hitCount + "] to obtain a meaningful sample")
                     );
                     return;
                 }
-                List<String> messages = getMessages(searchResponse, request.getField());
-                try {
-                    listener.onResponse(buildTextStructureResponse(messages, request));
-                } catch (Exception e) {
-                    listener.onFailure(e);
-                }
-            }, listener::onFailure));
+                var messages = getMessages(searchResponse, request.getField());
+                // As matching a regular expression might take a while, we run in a different thread to avoid blocking the network thread.
+                threadPool.generic().execute(ActionRunnable.supply(delegate, () -> buildTextStructureResponse(messages, request)));
+            }));
     }
 
     private List<String> getMessages(SearchResponse searchResponse, String field) {
@@ -83,6 +83,7 @@ public class TransportFindFieldStructureAction extends HandledTransportAction<Fi
 
     private FindStructureResponse buildTextStructureResponse(List<String> messages, FindFieldStructureAction.Request request)
         throws Exception {
+        assert ThreadPool.assertCurrentThreadPool(GENERIC);
         TextStructureFinderManager structureFinderManager = new TextStructureFinderManager(threadPool.scheduler());
         TextStructureFinder textStructureFinder = structureFinderManager.findTextStructure(
             messages,

+ 13 - 7
x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/transport/TransportFindMessageStructureAction.java

@@ -19,32 +19,38 @@ import org.elasticsearch.xpack.textstructure.structurefinder.TextStructureFinder
 import org.elasticsearch.xpack.textstructure.structurefinder.TextStructureFinderManager;
 import org.elasticsearch.xpack.textstructure.structurefinder.TextStructureOverrides;
 
+import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
+
 public class TransportFindMessageStructureAction extends HandledTransportAction<FindMessageStructureAction.Request, FindStructureResponse> {
 
     private final ThreadPool threadPool;
+    private final TextStructExecutor executor;
 
     @Inject
-    public TransportFindMessageStructureAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool) {
+    public TransportFindMessageStructureAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        ThreadPool threadPool,
+        TextStructExecutor executor
+    ) {
         super(
             FindMessageStructureAction.NAME,
             transportService,
             actionFilters,
             FindMessageStructureAction.Request::new,
-            threadPool.generic()
+            executor.handledTransportActionExecutorService()
         );
         this.threadPool = threadPool;
+        this.executor = executor;
     }
 
     @Override
     protected void doExecute(Task task, FindMessageStructureAction.Request request, ActionListener<FindStructureResponse> listener) {
-        try {
-            listener.onResponse(buildTextStructureResponse(request));
-        } catch (Exception e) {
-            listener.onFailure(e);
-        }
+        executor.execute(listener, () -> buildTextStructureResponse(request));
     }
 
     private FindStructureResponse buildTextStructureResponse(FindMessageStructureAction.Request request) throws Exception {
+        assert ThreadPool.assertCurrentThreadPool(GENERIC);
         TextStructureFinderManager structureFinderManager = new TextStructureFinderManager(threadPool.scheduler());
         TextStructureFinder textStructureFinder = structureFinderManager.findTextStructure(
             request.getMessages(),

+ 19 - 7
x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/transport/TransportFindStructureAction.java

@@ -21,26 +21,38 @@ import org.elasticsearch.xpack.textstructure.structurefinder.TextStructureOverri
 
 import java.io.InputStream;
 
+import static org.elasticsearch.threadpool.ThreadPool.Names.GENERIC;
+
 public class TransportFindStructureAction extends HandledTransportAction<FindStructureAction.Request, FindStructureResponse> {
 
     private final ThreadPool threadPool;
+    private final TextStructExecutor executor;
 
     @Inject
-    public TransportFindStructureAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool) {
-        super(FindStructureAction.NAME, transportService, actionFilters, FindStructureAction.Request::new, threadPool.generic());
+    public TransportFindStructureAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        ThreadPool threadPool,
+        TextStructExecutor executor
+    ) {
+        super(
+            FindStructureAction.NAME,
+            transportService,
+            actionFilters,
+            FindStructureAction.Request::new,
+            executor.handledTransportActionExecutorService()
+        );
         this.threadPool = threadPool;
+        this.executor = executor;
     }
 
     @Override
     protected void doExecute(Task task, FindStructureAction.Request request, ActionListener<FindStructureResponse> listener) {
-        try {
-            listener.onResponse(buildTextStructureResponse(request));
-        } catch (Exception e) {
-            listener.onFailure(e);
-        }
+        executor.execute(listener, () -> buildTextStructureResponse(request));
     }
 
     private FindStructureResponse buildTextStructureResponse(FindStructureAction.Request request) throws Exception {
+        assert ThreadPool.assertCurrentThreadPool(GENERIC);
         TextStructureFinderManager structureFinderManager = new TextStructureFinderManager(threadPool.scheduler());
         try (InputStream sampleStream = request.getSample().streamInput()) {
             TextStructureFinder textStructureFinder = structureFinderManager.findTextStructure(