Quellcode durchsuchen

Add stable ThreadPool constructor to LogstashInternalBridge (#105163)

Ry Biesemeyer vor 1 Jahr
Ursprung
Commit
0022005e17

+ 5 - 0
docs/changelog/105163.yaml

@@ -0,0 +1,5 @@
+pr: 105163
+summary: Add stable `ThreadPool` constructor to `LogstashInternalBridge`
+area: Ingest Node
+type: bug
+issues: []

+ 12 - 0
server/src/main/java/org/elasticsearch/ingest/LogstashInternalBridge.java

@@ -8,6 +8,10 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.telemetry.metric.MeterRegistry;
+import org.elasticsearch.threadpool.ThreadPool;
+
 /**
  * This Elastic-internal API bridge class exposes package-private components of Ingest
  * in a way that can be consumed by Logstash's Elastic Integration Filter without
@@ -33,4 +37,12 @@ public class LogstashInternalBridge {
     public static void resetReroute(final IngestDocument ingestDocument) {
         ingestDocument.resetReroute();
     }
+
+    /**
+     * @param settings
+     * @return a new {@link ThreadPool} with a noop {@link MeterRegistry}
+     */
+    public static ThreadPool createThreadPool(final Settings settings) {
+        return new ThreadPool(settings, MeterRegistry.NOOP);
+    }
 }

+ 20 - 0
server/src/test/java/org/elasticsearch/ingest/LogstashInternalBridgeTests.java

@@ -8,11 +8,18 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.ingest.TestIngestDocument.emptyIngestDocument;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 
 public class LogstashInternalBridgeTests extends ESTestCase {
     public void testIngestDocumentRerouteBridge() {
@@ -29,4 +36,17 @@ public class LogstashInternalBridgeTests extends ESTestCase {
         assertThat(ingestDocument.getFieldValue("_index", String.class), is(equalTo("somewhere")));
         assertThat(LogstashInternalBridge.isReroute(ingestDocument), is(false));
     }
+
+    public void testCreateThreadPool() {
+        final Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "TEST").build();
+        ThreadPool threadPool = null;
+        try {
+            threadPool = LogstashInternalBridge.createThreadPool(settings);
+            assertThat(threadPool, is(notNullValue()));
+        } finally {
+            if (Objects.nonNull(threadPool)) {
+                ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
+            }
+        }
+    }
 }