Browse Source

[ML] Disable child span for streaming tasks (#132945) (#133071)

There is the potential for a memory leak, depending on which thread
handles the onComplete message. Currently, the child span does not add
anything to debugging, so we will disable it until we can figure out
a clean way to propagate the child span's context throughout the stream.

In any case, it would be better to replace it entirely with a child span
capturing the outbound service call and response for both streaming and
non-streaming requests, so this may remain disabled in the long run
anyway in favor of that child span.
Pat Whelan 1 month ago
parent
commit
2de3769314

+ 5 - 0
docs/changelog/132945.yaml

@@ -0,0 +1,5 @@
+pr: 132945
+summary: Disable child span for streaming tasks
+area: Machine Learning
+type: bug
+issues: []

+ 1 - 1
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/task/StreamingTaskManager.java

@@ -105,7 +105,7 @@ public class StreamingTaskManager {
                             flowTask.addListener(TaskBackedProcessor.this::cancelTask);
                             return flowTask;
                         }
-                    });
+                    }, false);
                 }
             }
         }

+ 4 - 4
x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/task/StreamingTaskManagerTests.java

@@ -54,7 +54,7 @@ public class StreamingTaskManagerTests extends ESTestCase {
         doAnswer(ans -> {
             TaskAwareRequest taskAwareRequest = ans.getArgument(2);
             return taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
-        }).when(taskManager).register(any(), any(), any());
+        }).when(taskManager).register(any(), any(), any(), eq(false));
     }
 
     @After
@@ -67,7 +67,7 @@ public class StreamingTaskManagerTests extends ESTestCase {
 
         processor.subscribe(mock());
 
-        verify(taskManager, only()).register(eq(taskType), eq(taskAction), any());
+        verify(taskManager, only()).register(eq(taskType), eq(taskAction), any(), eq(false));
     }
 
     public void testCancelPropagatesUpstreamAndDownstream() {
@@ -77,7 +77,7 @@ public class StreamingTaskManagerTests extends ESTestCase {
             var registeredTask = (CancellableTask) taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
             task.set(registeredTask);
             return registeredTask;
-        }).when(taskManager).register(any(), any(), any());
+        }).when(taskManager).register(any(), any(), any(), eq(false));
 
         Flow.Subscriber<Object> downstream = mock();
         Flow.Subscription upstream = mock();
@@ -173,7 +173,7 @@ public class StreamingTaskManagerTests extends ESTestCase {
             var registeredTask = (CancellableTask) taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
             task.set(registeredTask);
             return registeredTask;
-        }).when(taskManager).register(any(), any(), any());
+        }).when(taskManager).register(any(), any(), any(), eq(false));
 
         var processor = streamingTaskManager.create(taskType, taskAction);
         var downstream = establishFlow(processor);