Переглянути джерело

[ML] Reduce input stream buffer from 8KB to 2KB (#72412)

When we flush the input stream the java side writes
enough spaces to fill the input stream buffer.
However, in the case of data frame analytics, this may
cause the job to freeze. The reason is that java
writes the data and flushes in the same thread that
goes on to then restore the state. However, when
c++ reads in the end-of-data control message, it stops
reading from the stream and goes on to perform the analysis.
If the 8KB of spaces do not fit in the OS buffer for the
names pipe, the java side blocks. It never proceeds with
restoring the state and this causes a job that is
being restarted and has state to freeze.

In elastic/ml-cpp#1881 the buffer has been reduced to 2KB.
This means the buffer is smaller than the buffer of all supported OS.
Note that it is 4KB on Windows.

Thus in this commit we also reduce the number of spaces we write
in order to flush the buffer to match that of the buffer size.

Closes #70698
Dimitris Athanasiou 4 роки тому
батько
коміт
3da72d46f4

+ 0 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -493,7 +493,6 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
             "classification_training_percent_is_50_boolean", BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES, "boolean");
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/70698")
     public void testStopAndRestart() throws Exception {
         initialize("classification_stop_and_restart");
         String predictedClassField = KEYWORD_FIELD + "_prediction";

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/AbstractControlMsgWriter.java

@@ -18,7 +18,7 @@ public abstract class AbstractControlMsgWriter {
     /**
      * This should be the same size as the buffer in the C++ native process.
      */
-    public static final int FLUSH_SPACES_LENGTH = 8192;
+    public static final int FLUSH_SPACES_LENGTH = 2048;
 
     protected final LengthEncodedWriter lengthEncodedWriter;
     private final int numberOfFields;

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsControlMessageWriterTests.java

@@ -39,7 +39,7 @@ public class AnalyticsControlMessageWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeField("$");
 
         StringBuilder spaces = new StringBuilder();
-        IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' '));
+        IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' '));
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AutodetectControlMsgWriterTests.java

@@ -159,7 +159,7 @@ public class AutodetectControlMsgWriterTests extends ESTestCase {
         inOrder.verify(lengthEncodedWriter).writeNumFields(4);
         inOrder.verify(lengthEncodedWriter, times(3)).writeField("");
         StringBuilder spaces = new StringBuilder();
-        IntStream.rangeClosed(1, 8192).forEach(i -> spaces.append(' '));
+        IntStream.rangeClosed(1, 2048).forEach(i -> spaces.append(' '));
         inOrder.verify(lengthEncodedWriter).writeField(spaces.toString());
 
         inOrder.verify(lengthEncodedWriter).flush();