Browse Source

fix(utils/middleware): flush pending chat deltas on stream termination

Guarantees the last partial delta chunk is emitted when the SSE stream closes (EOF, break, or `[DONE]` sentinel).

* Buffer `last_delta_data` and track `delta_count`
* Flush automatically once `delta_count >= chunk_size`
* Perform a final flush after the iterator ends

Signed-off-by: Sihyeon Jang <sihyeon.jang@navercorp.com>
Sihyeon Jang 1 tháng trước cách đây
mục cha
commit
3da22af859
1 tập tin đã thay đổi với 18 bổ sung7 xóa
  1. 18 7
      backend/open_webui/utils/middleware.py

+ 18 - 7
backend/open_webui/utils/middleware.py

@@ -1849,6 +1849,21 @@ async def process_chat_response(
                             or 1
                         ),
                     )
+                    last_delta_data = None
+
+                    async def flush_pending_delta_data(threshold: int = 0):
+                        nonlocal delta_count
+                        nonlocal last_delta_data
+
+                        if delta_count >= threshold and last_delta_data:
+                            await event_emitter(
+                                {
+                                    "type": "chat:completion",
+                                    "data": last_delta_data,
+                                }
+                            )
+                            delta_count = 0
+                            last_delta_data = None
 
                     async for line in response.body_iterator:
                         line = line.decode("utf-8") if isinstance(line, bytes) else line
@@ -2099,14 +2114,9 @@ async def process_chat_response(
 
                                 if delta:
                                     delta_count += 1
+                                    last_delta_data = data
                                     if delta_count >= delta_chunk_size:
-                                        await event_emitter(
-                                            {
-                                                "type": "chat:completion",
-                                                "data": data,
-                                            }
-                                        )
-                                        delta_count = 0
+                                        await flush_pending_delta_data(delta_chunk_size)
                                 else:
                                     await event_emitter(
                                         {
@@ -2121,6 +2131,7 @@ async def process_chat_response(
                             else:
                                 log.debug(f"Error: {e}")
                                 continue
+                    await flush_pending_delta_data()
 
                     if content_blocks:
                         # Clean up the last text block