Quellcode durchsuchen

[8.19] Fix systemd notify to use a shared arena (#135235) (#135281)

* Fix systemd notify to use a shared arena (#135235)

CloseableByteBuffer currently only uses confined Arena 
for buffer allocation meaning only the thread creating 
the Arena is allowed to access the native memory. When 
using elasticsearch with systemd, our notify-extend 
message is executed on a thread separate from where 
the native memory was allocated. This is causing a 
RuntimeException to be thrown. This changes 
CloseableByteBuffer to allow for a shared Arena as 
well for systemd.

* fix backport
Jack Conradson vor 2 Wochen
Ursprung
Commit
8552488846

+ 5 - 0
docs/changelog/135235.yaml

@@ -0,0 +1,5 @@
+pr: 135235
+summary: Fix systemd notify to use a shared arena
+area: Infra/Node Lifecycle
+type: bug
+issues: []

+ 7 - 1
libs/native/jna/src/main/java/org/elasticsearch/nativeaccess/jna/JnaJavaLibrary.java

@@ -13,8 +13,14 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer;
 import org.elasticsearch.nativeaccess.lib.JavaLibrary;
 
 class JnaJavaLibrary implements JavaLibrary {
+
+    @Override
+    public CloseableByteBuffer newConfinedBuffer(int len) {
+        return new JnaCloseableByteBuffer(len);
+    }
+
     @Override
-    public CloseableByteBuffer newBuffer(int len) {
+    public CloseableByteBuffer newSharedBuffer(int len) {
         return new JnaCloseableByteBuffer(len);
     }
 }

+ 8 - 2
libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java

@@ -46,9 +46,15 @@ abstract class AbstractNativeAccess implements NativeAccess {
     }
 
     @Override
-    public CloseableByteBuffer newBuffer(int len) {
+    public CloseableByteBuffer newSharedBuffer(int len) {
         assert len > 0;
-        return javaLib.newBuffer(len);
+        return javaLib.newSharedBuffer(len);
+    }
+
+    @Override
+    public CloseableByteBuffer newConfinedBuffer(int len) {
+        assert len > 0;
+        return javaLib.newConfinedBuffer(len);
     }
 
     @Override

+ 1 - 1
libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java

@@ -98,7 +98,7 @@ class LinuxNativeAccess extends PosixNativeAccess {
             this.systemd = null; // not running under systemd
         } else {
             logger.debug("Systemd socket path: {}", socketPath);
-            var buffer = newBuffer(64);
+            var buffer = newSharedBuffer(64);
             this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
         }
     }

+ 11 - 3
libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java

@@ -88,12 +88,20 @@ public interface NativeAccess {
     Optional<VectorSimilarityFunctions> getVectorSimilarityFunctions();
 
     /**
-     * Creates a new {@link CloseableByteBuffer}. The buffer must be used within the same thread
-     * that it is created.
+     * Creates a new {@link CloseableByteBuffer} using a shared arena. The buffer can be used
+     * across multiple threads.
      * @param len the number of bytes the buffer should allocate
      * @return the buffer
      */
-    CloseableByteBuffer newBuffer(int len);
+    CloseableByteBuffer newSharedBuffer(int len);
+
+    /**
+     * Creates a new {@link CloseableByteBuffer} using a confined arena. The buffer must be
+     * used within the same thread that it is created.
+     * @param len the number of bytes the buffer should allocate
+     * @return the buffer
+     */
+    CloseableByteBuffer newConfinedBuffer(int len);
 
     /**
      * Possible stats for execution filtering.

+ 7 - 1
libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java

@@ -78,7 +78,13 @@ class NoopNativeAccess implements NativeAccess {
     }
 
     @Override
-    public CloseableByteBuffer newBuffer(int len) {
+    public CloseableByteBuffer newSharedBuffer(int len) {
+        logger.warn("cannot allocate buffer because native access is not available");
+        return null;
+    }
+
+    @Override
+    public CloseableByteBuffer newConfinedBuffer(int len) {
         logger.warn("cannot allocate buffer because native access is not available");
         return null;
     }

+ 3 - 1
libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java

@@ -12,5 +12,7 @@ package org.elasticsearch.nativeaccess.lib;
 import org.elasticsearch.nativeaccess.CloseableByteBuffer;
 
 public non-sealed interface JavaLibrary extends NativeLibrary {
-    CloseableByteBuffer newBuffer(int len);
+    CloseableByteBuffer newSharedBuffer(int len);
+
+    CloseableByteBuffer newConfinedBuffer(int len);
 }

+ 10 - 2
libs/native/src/main21/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java

@@ -20,8 +20,16 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer {
     final MemorySegment segment;
     private final ByteBuffer bufferView;
 
-    JdkCloseableByteBuffer(int len) {
-        this.arena = Arena.ofConfined();
+    static JdkCloseableByteBuffer ofShared(int len) {
+        return new JdkCloseableByteBuffer(len, true);
+    }
+
+    static JdkCloseableByteBuffer ofConfined(int len) {
+        return new JdkCloseableByteBuffer(len, false);
+    }
+
+    private JdkCloseableByteBuffer(int len, boolean shared) {
+        this.arena = shared ? Arena.ofShared() : Arena.ofConfined();
         this.segment = arena.allocate(len);
         this.bufferView = segment.asByteBuffer();
     }

+ 8 - 2
libs/native/src/main21/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java

@@ -13,8 +13,14 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer;
 import org.elasticsearch.nativeaccess.lib.JavaLibrary;
 
 class JdkJavaLibrary implements JavaLibrary {
+
+    @Override
+    public CloseableByteBuffer newSharedBuffer(int len) {
+        return JdkCloseableByteBuffer.ofShared(len);
+    }
+
     @Override
-    public CloseableByteBuffer newBuffer(int len) {
-        return new JdkCloseableByteBuffer(len);
+    public CloseableByteBuffer newConfinedBuffer(int len) {
+        return JdkCloseableByteBuffer.ofConfined(len);
     }
 }

+ 10 - 10
libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java

@@ -38,7 +38,7 @@ public class ZstdTests extends ESTestCase {
     }
 
     public void testCompressValidation() {
-        try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) {
+        try (var src = nativeAccess.newConfinedBuffer(1000); var dst = nativeAccess.newConfinedBuffer(500)) {
             var srcBuf = src.buffer();
             var dstBuf = dst.buffer();
 
@@ -58,9 +58,9 @@ public class ZstdTests extends ESTestCase {
 
     public void testDecompressValidation() {
         try (
-            var original = nativeAccess.newBuffer(1000);
-            var compressed = nativeAccess.newBuffer(500);
-            var restored = nativeAccess.newBuffer(500)
+            var original = nativeAccess.newConfinedBuffer(1000);
+            var compressed = nativeAccess.newConfinedBuffer(500);
+            var restored = nativeAccess.newConfinedBuffer(500)
         ) {
             var originalBuf = original.buffer();
             var compressedBuf = compressed.buffer();
@@ -105,9 +105,9 @@ public class ZstdTests extends ESTestCase {
 
     private void doTestRoundtrip(byte[] data) {
         try (
-            var original = nativeAccess.newBuffer(data.length);
-            var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length));
-            var restored = nativeAccess.newBuffer(data.length)
+            var original = nativeAccess.newConfinedBuffer(data.length);
+            var compressed = nativeAccess.newConfinedBuffer(zstd.compressBound(data.length));
+            var restored = nativeAccess.newConfinedBuffer(data.length)
         ) {
             original.buffer().put(0, data);
             int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9));
@@ -121,9 +121,9 @@ public class ZstdTests extends ESTestCase {
         final int compressedOffset = randomIntBetween(1, 1000);
         final int decompressedOffset = randomIntBetween(1, 1000);
         try (
-            var original = nativeAccess.newBuffer(decompressedOffset + data.length);
-            var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length));
-            var restored = nativeAccess.newBuffer(decompressedOffset + data.length)
+            var original = nativeAccess.newConfinedBuffer(decompressedOffset + data.length);
+            var compressed = nativeAccess.newConfinedBuffer(compressedOffset + zstd.compressBound(data.length));
+            var restored = nativeAccess.newConfinedBuffer(decompressedOffset + data.length)
         ) {
             original.buffer().put(decompressedOffset, data);
             original.buffer().position(decompressedOffset);

+ 4 - 4
server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java

@@ -128,8 +128,8 @@ public final class Zstd814StoredFieldsFormat extends Lucene90CompressingStoredFi
             final int compressedLength = in.readVInt();
 
             try (
-                CloseableByteBuffer src = nativeAccess.newBuffer(compressedLength);
-                CloseableByteBuffer dest = nativeAccess.newBuffer(originalLength)
+                CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength);
+                CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength)
             ) {
 
                 while (src.buffer().position() < compressedLength) {
@@ -187,8 +187,8 @@ public final class Zstd814StoredFieldsFormat extends Lucene90CompressingStoredFi
             // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into
             // reusing compression contexts, which are not small and would increase permanent memory usage as well.
             try (
-                CloseableByteBuffer src = nativeAccess.newBuffer(srcLen);
-                CloseableByteBuffer dest = nativeAccess.newBuffer(compressBound)
+                CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen);
+                CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound)
             ) {
 
                 while (buffersInput.position() < buffersInput.length()) {