1
0
Эх сурвалжийг харах

[ML] Mitigate IOSession timeouts (#115414) (#115526)

We are seeing exceptions ~0.03% of the time in our integration tests:
```
org.apache.http.ConnectionClosedException: Connection closed unexpectedly
```

The `contentDecoder` does not always fully consume the body within
`SimpleInputBuffer.consumeContent`. When we return back to Apache, the
rest of the body is never delivered, and the IOSession eventually times
out and gets cleaned up. During that cleanup process, Apache calls our
Consumer with the above exception.

If we read 0 bytes and return back immediately, Apache has a better
chance to load the rest of the body/footer, and it will call
`consumeContent` again. This reduces the exception rate
down to ~0.001%.

Fix #114105
Fix #114232
Fix #114327
Fix #114385
Pat Whelan 1 жил өмнө
parent
commit
d2eebff6e9

+ 9 - 0
docs/changelog/115414.yaml

@@ -0,0 +1,9 @@
+pr: 115414
+summary: Mitigate IOSession timeouts
+area: Machine Learning
+type: bug
+issues:
+ - 114385
+ - 114327
+ - 114105
+ - 114232

+ 12 - 14
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/StreamingHttpResultPublisher.java

@@ -96,11 +96,10 @@ class StreamingHttpResultPublisher implements HttpAsyncResponseConsumer<HttpResp
 
         try {
             var consumed = inputBuffer.consumeContent(contentDecoder);
-            var allBytes = new byte[consumed];
-            inputBuffer.read(allBytes);
-
-            // we can have empty bytes, don't bother sending them
-            if (allBytes.length > 0) {
+            // we could have read 0 bytes if the body was delayed getting in, we need to return out so apache can load the body/footer
+            if (consumed > 0) {
+                var allBytes = new byte[consumed];
+                inputBuffer.read(allBytes);
                 queue.offer(() -> {
                     subscriber.onNext(new HttpResult(response, allBytes));
                     var currentBytesInQueue = bytesInQueue.updateAndGet(current -> Long.max(0, current - allBytes.length));
@@ -111,18 +110,17 @@ class StreamingHttpResultPublisher implements HttpAsyncResponseConsumer<HttpResp
                         }
                     }
                 });
-            }
 
-            // always check if totalByteSize > the configured setting in case the settings change
-            if (bytesInQueue.accumulateAndGet(allBytes.length, Long::sum) >= settings.getMaxResponseSize().getBytes()) {
-                pauseProducer(ioControl);
-            }
+                // always check if totalByteSize > the configured setting in case the settings change
+                if (bytesInQueue.accumulateAndGet(allBytes.length, Long::sum) >= settings.getMaxResponseSize().getBytes()) {
+                    pauseProducer(ioControl);
+                }
 
-            // always run in case we're waking up from a pause and need to start a new thread
-            taskRunner.requestNextRun();
+                taskRunner.requestNextRun();
 
-            if (listenerCalled.compareAndSet(false, true)) {
-                listener.onResponse(this);
+                if (listenerCalled.compareAndSet(false, true)) {
+                    listener.onResponse(this);
+                }
             }
         } finally {
             inputBuffer.reset();

+ 6 - 4
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/InferenceEventsAssertion.java

@@ -12,7 +12,6 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.inference.InferenceServiceResults;
-import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
@@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
+import static org.elasticsearch.test.ESTestCase.fail;
 import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
 import static org.hamcrest.CoreMatchers.is;
 
@@ -47,7 +47,9 @@ public record InferenceEventsAssertion(Iterator<String> events, Throwable error,
     }
 
     public InferenceEventsAssertion hasNoErrors() {
-        MatcherAssert.assertThat("Expected no errors from stream.", error, Matchers.nullValue());
+        if (error != null) {
+            fail(error, "Expected no errors from stream.");
+        }
         return this;
     }
 
@@ -66,7 +68,7 @@ public record InferenceEventsAssertion(Iterator<String> events, Throwable error,
             }
             t = t.getCause();
         }
-        ESTestCase.fail(error, "Expected an underlying ElasticsearchStatusException.");
+        fail(error, "Expected an underlying ElasticsearchStatusException.");
         return this;
     }
 
@@ -79,7 +81,7 @@ public record InferenceEventsAssertion(Iterator<String> events, Throwable error,
             }
             t = t.getCause();
         }
-        ESTestCase.fail(error, "Expected exception to contain string: " + message);
+        fail(error, "Expected exception to contain string: " + message);
         return this;
     }
 

+ 0 - 2
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/anthropic/AnthropicServiceTests.java

@@ -532,7 +532,6 @@ public class AnthropicServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest() throws Exception {
         String responseJson = """
             data: {"type": "message_start", "message": {"model": "claude, probably"}}
@@ -578,7 +577,6 @@ public class AnthropicServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest_ErrorResponse() throws Exception {
         String responseJson = """
             data: {"type": "error", "error": {"type": "request_too_large", "message": "blah"}}

+ 0 - 2
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/azureaistudio/AzureAiStudioServiceTests.java

@@ -1197,7 +1197,6 @@ public class AzureAiStudioServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest() throws Exception {
         String responseJson = """
             data: {\
@@ -1253,7 +1252,6 @@ public class AzureAiStudioServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest_ErrorResponse() throws Exception {
         String responseJson = """
             {

+ 0 - 2
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/azureopenai/AzureOpenAiServiceTests.java

@@ -1322,7 +1322,6 @@ public class AzureOpenAiServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest() throws Exception {
         String responseJson = """
             data: {\
@@ -1381,7 +1380,6 @@ public class AzureOpenAiServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest_ErrorResponse() throws Exception {
         String responseJson = """
             {

+ 0 - 2
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/cohere/CohereServiceTests.java

@@ -1532,7 +1532,6 @@ public class CohereServiceTests extends ESTestCase {
         assertEquals(SimilarityMeasure.DOT_PRODUCT, CohereService.defaultSimilarity());
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest() throws Exception {
         String responseJson = """
             {"event_type":"text-generation", "text":"hello"}
@@ -1566,7 +1565,6 @@ public class CohereServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest_ErrorResponse() throws Exception {
         String responseJson = """
             { "event_type":"stream-end", "finish_reason":"ERROR", "response":{ "text": "how dare you" } }

+ 0 - 2
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/openai/OpenAiServiceTests.java

@@ -914,7 +914,6 @@ public class OpenAiServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest() throws Exception {
         String responseJson = """
             data: {\
@@ -964,7 +963,6 @@ public class OpenAiServiceTests extends ESTestCase {
         }
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/114385")
     public void testInfer_StreamRequest_ErrorResponse() throws Exception {
         String responseJson = """
             {