123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087 |
- import time
- import logging
- import sys
- import os
- import base64
- import textwrap
- import asyncio
- from aiocache import cached
- from typing import Any, Optional
- import random
- import json
- import html
- import inspect
- import re
- import ast
- from uuid import uuid4
- from concurrent.futures import ThreadPoolExecutor
- from fastapi import Request, HTTPException
- from fastapi.responses import HTMLResponse
- from starlette.responses import Response, StreamingResponse, JSONResponse
- from open_webui.models.oauth_sessions import OAuthSessions
- from open_webui.models.chats import Chats
- from open_webui.models.folders import Folders
- from open_webui.models.users import Users
- from open_webui.socket.main import (
- get_event_call,
- get_event_emitter,
- get_active_status_by_user_id,
- )
- from open_webui.routers.tasks import (
- generate_queries,
- generate_title,
- generate_follow_ups,
- generate_image_prompt,
- generate_chat_tags,
- )
- from open_webui.routers.retrieval import (
- process_web_search,
- SearchForm,
- )
- from open_webui.routers.images import (
- load_b64_image_data,
- image_generations,
- GenerateImageForm,
- upload_image,
- )
- from open_webui.routers.pipelines import (
- process_pipeline_inlet_filter,
- process_pipeline_outlet_filter,
- )
- from open_webui.routers.memories import query_memory, QueryMemoryForm
- from open_webui.utils.webhook import post_webhook
- from open_webui.utils.files import (
- get_audio_url_from_base64,
- get_file_url_from_base64,
- get_image_url_from_base64,
- )
- from open_webui.models.users import UserModel
- from open_webui.models.functions import Functions
- from open_webui.models.models import Models
- from open_webui.retrieval.utils import get_sources_from_items
- from open_webui.utils.chat import generate_chat_completion
- from open_webui.utils.task import (
- get_task_model_id,
- rag_template,
- tools_function_calling_generation_template,
- )
- from open_webui.utils.misc import (
- deep_update,
- extract_urls,
- get_message_list,
- add_or_update_system_message,
- add_or_update_user_message,
- get_last_user_message,
- get_last_user_message_item,
- get_last_assistant_message,
- get_system_message,
- prepend_to_first_user_message_content,
- convert_logit_bias_input_to_json,
- get_content_from_message,
- )
- from open_webui.utils.tools import get_tools
- from open_webui.utils.plugin import load_function_module_by_id
- from open_webui.utils.filter import (
- get_sorted_filter_ids,
- process_filter_functions,
- )
- from open_webui.utils.code_interpreter import execute_code_jupyter
- from open_webui.utils.payload import apply_system_prompt_to_body
- from open_webui.utils.mcp.client import MCPClient
- from open_webui.config import (
- CACHE_DIR,
- DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE,
- DEFAULT_CODE_INTERPRETER_PROMPT,
- CODE_INTERPRETER_BLOCKED_MODULES,
- )
- from open_webui.env import (
- SRC_LOG_LEVELS,
- GLOBAL_LOG_LEVEL,
- CHAT_RESPONSE_STREAM_DELTA_CHUNK_SIZE,
- CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES,
- BYPASS_MODEL_ACCESS_CONTROL,
- ENABLE_REALTIME_CHAT_SAVE,
- ENABLE_QUERIES_CACHE,
- )
- from open_webui.constants import TASKS
- logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MAIN"])
- DEFAULT_REASONING_TAGS = [
- ("<think>", "</think>"),
- ("<thinking>", "</thinking>"),
- ("<reason>", "</reason>"),
- ("<reasoning>", "</reasoning>"),
- ("<thought>", "</thought>"),
- ("<Thought>", "</Thought>"),
- ("<|begin_of_thought|>", "<|end_of_thought|>"),
- ("◁think▷", "◁/think▷"),
- ]
- DEFAULT_SOLUTION_TAGS = [("<|begin_of_solution|>", "<|end_of_solution|>")]
- DEFAULT_CODE_INTERPRETER_TAGS = [("<code_interpreter>", "</code_interpreter>")]
- def process_tool_result(
- request,
- tool_function_name,
- tool_result,
- tool_type,
- direct_tool=False,
- metadata=None,
- user=None,
- ):
- tool_result_embeds = []
- if isinstance(tool_result, HTMLResponse):
- content_disposition = tool_result.headers.get("Content-Disposition", "")
- if "inline" in content_disposition:
- content = tool_result.body.decode("utf-8", "replace")
- tool_result_embeds.append(content)
- if 200 <= tool_result.status_code < 300:
- tool_result = {
- "status": "success",
- "code": "ui_component",
- "message": f"{tool_function_name}: Embedded UI result is active and visible to the user.",
- }
- elif 400 <= tool_result.status_code < 500:
- tool_result = {
- "status": "error",
- "code": "ui_component",
- "message": f"{tool_function_name}: Client error {tool_result.status_code} from embedded UI result.",
- }
- elif 500 <= tool_result.status_code < 600:
- tool_result = {
- "status": "error",
- "code": "ui_component",
- "message": f"{tool_function_name}: Server error {tool_result.status_code} from embedded UI result.",
- }
- else:
- tool_result = {
- "status": "error",
- "code": "ui_component",
- "message": f"{tool_function_name}: Unexpected status code {tool_result.status_code} from embedded UI result.",
- }
- else:
- tool_result = tool_result.body.decode("utf-8", "replace")
- elif (tool_type == "external" and isinstance(tool_result, tuple)) or (
- direct_tool and isinstance(tool_result, list) and len(tool_result) == 2
- ):
- tool_result, tool_response_headers = tool_result
- try:
- if not isinstance(tool_response_headers, dict):
- tool_response_headers = dict(tool_response_headers)
- except Exception as e:
- tool_response_headers = {}
- log.debug(e)
- if tool_response_headers and isinstance(tool_response_headers, dict):
- content_disposition = tool_response_headers.get(
- "Content-Disposition",
- tool_response_headers.get("content-disposition", ""),
- )
- if "inline" in content_disposition:
- content_type = tool_response_headers.get(
- "Content-Type",
- tool_response_headers.get("content-type", ""),
- )
- location = tool_response_headers.get(
- "Location",
- tool_response_headers.get("location", ""),
- )
- if "text/html" in content_type:
- # Display as iframe embed
- tool_result_embeds.append(tool_result)
- tool_result = {
- "status": "success",
- "code": "ui_component",
- "message": f"{tool_function_name}: Embedded UI result is active and visible to the user.",
- }
- elif location:
- tool_result_embeds.append(location)
- tool_result = {
- "status": "success",
- "code": "ui_component",
- "message": f"{tool_function_name}: Embedded UI result is active and visible to the user.",
- }
- tool_result_files = []
- if isinstance(tool_result, list):
- if tool_type == "mcp": # MCP
- tool_response = []
- for item in tool_result:
- if isinstance(item, dict):
- if item.get("type") == "text":
- text = item.get("text", "")
- if isinstance(text, str):
- try:
- text = json.loads(text)
- except json.JSONDecodeError:
- pass
- tool_response.append(text)
- elif item.get("type") in ["image", "audio"]:
- file_url = get_file_url_from_base64(
- request,
- f"data:{item.get('mimeType')};base64,{item.get('data', item.get('blob', ''))}",
- {
- "chat_id": metadata.get("chat_id", None),
- "message_id": metadata.get("message_id", None),
- "session_id": metadata.get("session_id", None),
- "result": item,
- },
- user,
- )
- tool_result_files.append(
- {
- "type": item.get("type", "data"),
- "url": file_url,
- }
- )
- tool_result = tool_response[0] if len(tool_response) == 1 else tool_response
- else: # OpenAPI
- for item in tool_result:
- if isinstance(item, str) and item.startswith("data:"):
- tool_result_files.append(
- {
- "type": "data",
- "content": item,
- }
- )
- tool_result.remove(item)
- if isinstance(tool_result, list):
- tool_result = {"results": tool_result}
- if isinstance(tool_result, dict) or isinstance(tool_result, list):
- tool_result = json.dumps(tool_result, indent=2, ensure_ascii=False)
- return tool_result, tool_result_files, tool_result_embeds
- async def chat_completion_tools_handler(
- request: Request, body: dict, extra_params: dict, user: UserModel, models, tools
- ) -> tuple[dict, dict]:
- async def get_content_from_response(response) -> Optional[str]:
- content = None
- if hasattr(response, "body_iterator"):
- async for chunk in response.body_iterator:
- data = json.loads(chunk.decode("utf-8", "replace"))
- content = data["choices"][0]["message"]["content"]
- # Cleanup any remaining background tasks if necessary
- if response.background is not None:
- await response.background()
- else:
- content = response["choices"][0]["message"]["content"]
- return content
- def get_tools_function_calling_payload(messages, task_model_id, content):
- user_message = get_last_user_message(messages)
- recent_messages = messages[-4:] if len(messages) > 4 else messages
- chat_history = "\n".join(
- f"{message['role'].upper()}: \"\"\"{get_content_from_message(message)}\"\"\""
- for message in recent_messages
- )
- prompt = f"History:\n{chat_history}\nQuery: {user_message}"
- return {
- "model": task_model_id,
- "messages": [
- {"role": "system", "content": content},
- {"role": "user", "content": f"Query: {prompt}"},
- ],
- "stream": False,
- "metadata": {"task": str(TASKS.FUNCTION_CALLING)},
- }
- event_caller = extra_params["__event_call__"]
- event_emitter = extra_params["__event_emitter__"]
- metadata = extra_params["__metadata__"]
- task_model_id = get_task_model_id(
- body["model"],
- request.app.state.config.TASK_MODEL,
- request.app.state.config.TASK_MODEL_EXTERNAL,
- models,
- )
- skip_files = False
- sources = []
- specs = [tool["spec"] for tool in tools.values()]
- tools_specs = json.dumps(specs)
- if request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE != "":
- template = request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
- else:
- template = DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
- tools_function_calling_prompt = tools_function_calling_generation_template(
- template, tools_specs
- )
- payload = get_tools_function_calling_payload(
- body["messages"], task_model_id, tools_function_calling_prompt
- )
- try:
- response = await generate_chat_completion(request, form_data=payload, user=user)
- log.debug(f"{response=}")
- content = await get_content_from_response(response)
- log.debug(f"{content=}")
- if not content:
- return body, {}
- try:
- content = content[content.find("{") : content.rfind("}") + 1]
- if not content:
- raise Exception("No JSON object found in the response")
- result = json.loads(content)
- async def tool_call_handler(tool_call):
- nonlocal skip_files
- log.debug(f"{tool_call=}")
- tool_function_name = tool_call.get("name", None)
- if tool_function_name not in tools:
- return body, {}
- tool_function_params = tool_call.get("parameters", {})
- tool = None
- tool_type = ""
- direct_tool = False
- try:
- tool = tools[tool_function_name]
- tool_type = tool.get("type", "")
- direct_tool = tool.get("direct", False)
- spec = tool.get("spec", {})
- allowed_params = (
- spec.get("parameters", {}).get("properties", {}).keys()
- )
- tool_function_params = {
- k: v
- for k, v in tool_function_params.items()
- if k in allowed_params
- }
- if tool.get("direct", False):
- tool_result = await event_caller(
- {
- "type": "execute:tool",
- "data": {
- "id": str(uuid4()),
- "name": tool_function_name,
- "params": tool_function_params,
- "server": tool.get("server", {}),
- "session_id": metadata.get("session_id", None),
- },
- }
- )
- else:
- tool_function = tool["callable"]
- tool_result = await tool_function(**tool_function_params)
- except Exception as e:
- tool_result = str(e)
- tool_result, tool_result_files, tool_result_embeds = (
- process_tool_result(
- request,
- tool_function_name,
- tool_result,
- tool_type,
- direct_tool,
- metadata,
- user,
- )
- )
- if event_emitter:
- if tool_result_files:
- await event_emitter(
- {
- "type": "files",
- "data": {
- "files": tool_result_files,
- },
- }
- )
- if tool_result_embeds:
- await event_emitter(
- {
- "type": "embeds",
- "data": {
- "embeds": tool_result_embeds,
- },
- }
- )
- print(
- f"Tool {tool_function_name} result: {tool_result}",
- tool_result_files,
- tool_result_embeds,
- )
- if tool_result:
- tool = tools[tool_function_name]
- tool_id = tool.get("tool_id", "")
- tool_name = (
- f"{tool_id}/{tool_function_name}"
- if tool_id
- else f"{tool_function_name}"
- )
- # Citation is enabled for this tool
- sources.append(
- {
- "source": {
- "name": (f"{tool_name}"),
- },
- "document": [str(tool_result)],
- "metadata": [
- {
- "source": (f"{tool_name}"),
- "parameters": tool_function_params,
- }
- ],
- "tool_result": True,
- }
- )
- # Citation is not enabled for this tool
- body["messages"] = add_or_update_user_message(
- f"\nTool `{tool_name}` Output: {tool_result}",
- body["messages"],
- )
- if (
- tools[tool_function_name]
- .get("metadata", {})
- .get("file_handler", False)
- ):
- skip_files = True
- # check if "tool_calls" in result
- if result.get("tool_calls"):
- for tool_call in result.get("tool_calls"):
- await tool_call_handler(tool_call)
- else:
- await tool_call_handler(result)
- except Exception as e:
- log.debug(f"Error: {e}")
- content = None
- except Exception as e:
- log.debug(f"Error: {e}")
- content = None
- log.debug(f"tool_contexts: {sources}")
- if skip_files and "files" in body.get("metadata", {}):
- del body["metadata"]["files"]
- return body, {"sources": sources}
- async def chat_memory_handler(
- request: Request, form_data: dict, extra_params: dict, user
- ):
- try:
- results = await query_memory(
- request,
- QueryMemoryForm(
- **{
- "content": get_last_user_message(form_data["messages"]) or "",
- "k": 3,
- }
- ),
- user,
- )
- except Exception as e:
- log.debug(e)
- results = None
- user_context = ""
- if results and hasattr(results, "documents"):
- if results.documents and len(results.documents) > 0:
- for doc_idx, doc in enumerate(results.documents[0]):
- created_at_date = "Unknown Date"
- if results.metadatas[0][doc_idx].get("created_at"):
- created_at_timestamp = results.metadatas[0][doc_idx]["created_at"]
- created_at_date = time.strftime(
- "%Y-%m-%d", time.localtime(created_at_timestamp)
- )
- user_context += f"{doc_idx + 1}. [{created_at_date}] {doc}\n"
- form_data["messages"] = add_or_update_system_message(
- f"User Context:\n{user_context}\n", form_data["messages"], append=True
- )
- return form_data
- async def chat_web_search_handler(
- request: Request, form_data: dict, extra_params: dict, user
- ):
- event_emitter = extra_params["__event_emitter__"]
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search",
- "description": "Searching the web",
- "done": False,
- },
- }
- )
- messages = form_data["messages"]
- user_message = get_last_user_message(messages)
- queries = []
- try:
- res = await generate_queries(
- request,
- {
- "model": form_data["model"],
- "messages": messages,
- "prompt": user_message,
- "type": "web_search",
- },
- user,
- )
- response = res["choices"][0]["message"]["content"]
- try:
- bracket_start = response.find("{")
- bracket_end = response.rfind("}") + 1
- if bracket_start == -1 or bracket_end == -1:
- raise Exception("No JSON object found in the response")
- response = response[bracket_start:bracket_end]
- queries = json.loads(response)
- queries = queries.get("queries", [])
- except Exception as e:
- queries = [response]
- if ENABLE_QUERIES_CACHE:
- request.state.cached_queries = queries
- except Exception as e:
- log.exception(e)
- queries = [user_message]
- # Check if generated queries are empty
- if len(queries) == 1 and queries[0].strip() == "":
- queries = [user_message]
- # Check if queries are not found
- if len(queries) == 0:
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search",
- "description": "No search query generated",
- "done": True,
- },
- }
- )
- return form_data
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search_queries_generated",
- "queries": queries,
- "done": False,
- },
- }
- )
- try:
- results = await process_web_search(
- request,
- SearchForm(queries=queries),
- user=user,
- )
- if results:
- files = form_data.get("files", [])
- if results.get("collection_names"):
- for col_idx, collection_name in enumerate(
- results.get("collection_names")
- ):
- files.append(
- {
- "collection_name": collection_name,
- "name": ", ".join(queries),
- "type": "web_search",
- "urls": results["filenames"],
- "queries": queries,
- }
- )
- elif results.get("docs"):
- # Invoked when bypass embedding and retrieval is set to True
- docs = results["docs"]
- files.append(
- {
- "docs": docs,
- "name": ", ".join(queries),
- "type": "web_search",
- "urls": results["filenames"],
- "queries": queries,
- }
- )
- form_data["files"] = files
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search",
- "description": "Searched {{count}} sites",
- "urls": results["filenames"],
- "items": results.get("items", []),
- "done": True,
- },
- }
- )
- else:
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search",
- "description": "No search results found",
- "done": True,
- "error": True,
- },
- }
- )
- except Exception as e:
- log.exception(e)
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "web_search",
- "description": "An error occurred while searching the web",
- "queries": queries,
- "done": True,
- "error": True,
- },
- }
- )
- return form_data
- async def chat_image_generation_handler(
- request: Request, form_data: dict, extra_params: dict, user
- ):
- __event_emitter__ = extra_params["__event_emitter__"]
- await __event_emitter__(
- {
- "type": "status",
- "data": {"description": "Creating image", "done": False},
- }
- )
- messages = form_data["messages"]
- user_message = get_last_user_message(messages)
- prompt = user_message
- negative_prompt = ""
- if request.app.state.config.ENABLE_IMAGE_PROMPT_GENERATION:
- try:
- res = await generate_image_prompt(
- request,
- {
- "model": form_data["model"],
- "messages": messages,
- },
- user,
- )
- response = res["choices"][0]["message"]["content"]
- try:
- bracket_start = response.find("{")
- bracket_end = response.rfind("}") + 1
- if bracket_start == -1 or bracket_end == -1:
- raise Exception("No JSON object found in the response")
- response = response[bracket_start:bracket_end]
- response = json.loads(response)
- prompt = response.get("prompt", [])
- except Exception as e:
- prompt = user_message
- except Exception as e:
- log.exception(e)
- prompt = user_message
- system_message_content = ""
- try:
- images = await image_generations(
- request=request,
- form_data=GenerateImageForm(**{"prompt": prompt}),
- user=user,
- )
- await __event_emitter__(
- {
- "type": "status",
- "data": {"description": "Image created", "done": True},
- }
- )
- await __event_emitter__(
- {
- "type": "files",
- "data": {
- "files": [
- {
- "type": "image",
- "url": image["url"],
- }
- for image in images
- ]
- },
- }
- )
- system_message_content = "<context>User is shown the generated image, tell the user that the image has been generated</context>"
- except Exception as e:
- log.exception(e)
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "description": f"An error occurred while generating an image",
- "done": True,
- },
- }
- )
- system_message_content = "<context>Unable to generate an image, tell the user that an error occurred</context>"
- if system_message_content:
- form_data["messages"] = add_or_update_system_message(
- system_message_content, form_data["messages"]
- )
- return form_data
- async def chat_completion_files_handler(
- request: Request, body: dict, extra_params: dict, user: UserModel
- ) -> tuple[dict, dict[str, list]]:
- __event_emitter__ = extra_params["__event_emitter__"]
- sources = []
- if files := body.get("metadata", {}).get("files", None):
- # Check if all files are in full context mode
- all_full_context = all(
- item.get("context") == "full"
- for item in files
- if item.get("type") == "file"
- )
- queries = []
- if not all_full_context:
- try:
- queries_response = await generate_queries(
- request,
- {
- "model": body["model"],
- "messages": body["messages"],
- "type": "retrieval",
- },
- user,
- )
- queries_response = queries_response["choices"][0]["message"]["content"]
- try:
- bracket_start = queries_response.find("{")
- bracket_end = queries_response.rfind("}") + 1
- if bracket_start == -1 or bracket_end == -1:
- raise Exception("No JSON object found in the response")
- queries_response = queries_response[bracket_start:bracket_end]
- queries_response = json.loads(queries_response)
- except Exception as e:
- queries_response = {"queries": [queries_response]}
- queries = queries_response.get("queries", [])
- except:
- pass
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "action": "queries_generated",
- "queries": queries,
- "done": False,
- },
- }
- )
- if len(queries) == 0:
- queries = [get_last_user_message(body["messages"])]
- try:
- # Offload get_sources_from_items to a separate thread
- loop = asyncio.get_running_loop()
- with ThreadPoolExecutor() as executor:
- sources = await loop.run_in_executor(
- executor,
- lambda: get_sources_from_items(
- request=request,
- items=files,
- queries=queries,
- embedding_function=lambda query, prefix: request.app.state.EMBEDDING_FUNCTION(
- query, prefix=prefix, user=user
- ),
- k=request.app.state.config.TOP_K,
- reranking_function=(
- (
- lambda sentences: request.app.state.RERANKING_FUNCTION(
- sentences, user=user
- )
- )
- if request.app.state.RERANKING_FUNCTION
- else None
- ),
- k_reranker=request.app.state.config.TOP_K_RERANKER,
- r=request.app.state.config.RELEVANCE_THRESHOLD,
- hybrid_bm25_weight=request.app.state.config.HYBRID_BM25_WEIGHT,
- hybrid_search=request.app.state.config.ENABLE_RAG_HYBRID_SEARCH,
- full_context=all_full_context
- or request.app.state.config.RAG_FULL_CONTEXT,
- user=user,
- ),
- )
- except Exception as e:
- log.exception(e)
- log.debug(f"rag_contexts:sources: {sources}")
- unique_ids = set()
- for source in sources or []:
- if not source or len(source.keys()) == 0:
- continue
- documents = source.get("document") or []
- metadatas = source.get("metadata") or []
- src_info = source.get("source") or {}
- for index, _ in enumerate(documents):
- metadata = metadatas[index] if index < len(metadatas) else None
- _id = (
- (metadata or {}).get("source")
- or (src_info or {}).get("id")
- or "N/A"
- )
- unique_ids.add(_id)
- sources_count = len(unique_ids)
- await __event_emitter__(
- {
- "type": "status",
- "data": {
- "action": "sources_retrieved",
- "count": sources_count,
- "done": True,
- },
- }
- )
- return body, {"sources": sources}
- def apply_params_to_form_data(form_data, model):
- params = form_data.pop("params", {})
- custom_params = params.pop("custom_params", {})
- open_webui_params = {
- "stream_response": bool,
- "stream_delta_chunk_size": int,
- "function_calling": str,
- "reasoning_tags": list,
- "system": str,
- }
- for key in list(params.keys()):
- if key in open_webui_params:
- del params[key]
- if custom_params:
- # Attempt to parse custom_params if they are strings
- for key, value in custom_params.items():
- if isinstance(value, str):
- try:
- # Attempt to parse the string as JSON
- custom_params[key] = json.loads(value)
- except json.JSONDecodeError:
- # If it fails, keep the original string
- pass
- # If custom_params are provided, merge them into params
- params = deep_update(params, custom_params)
- if model.get("owned_by") == "ollama":
- # Ollama specific parameters
- form_data["options"] = params
- else:
- if isinstance(params, dict):
- for key, value in params.items():
- if value is not None:
- form_data[key] = value
- if "logit_bias" in params and params["logit_bias"] is not None:
- try:
- form_data["logit_bias"] = json.loads(
- convert_logit_bias_input_to_json(params["logit_bias"])
- )
- except Exception as e:
- log.exception(f"Error parsing logit_bias: {e}")
- return form_data
- async def process_chat_payload(request, form_data, user, metadata, model):
- # Pipeline Inlet -> Filter Inlet -> Chat Memory -> Chat Web Search -> Chat Image Generation
- # -> Chat Code Interpreter (Form Data Update) -> (Default) Chat Tools Function Calling
- # -> Chat Files
- form_data = apply_params_to_form_data(form_data, model)
- log.debug(f"form_data: {form_data}")
- system_message = get_system_message(form_data.get("messages", []))
- if system_message: # Chat Controls/User Settings
- try:
- form_data = apply_system_prompt_to_body(
- system_message.get("content"), form_data, metadata, user, replace=True
- ) # Required to handle system prompt variables
- except:
- pass
- event_emitter = get_event_emitter(metadata)
- event_call = get_event_call(metadata)
- oauth_token = None
- try:
- if request.cookies.get("oauth_session_id", None):
- oauth_token = await request.app.state.oauth_manager.get_oauth_token(
- user.id,
- request.cookies.get("oauth_session_id", None),
- )
- except Exception as e:
- log.error(f"Error getting OAuth token: {e}")
- extra_params = {
- "__event_emitter__": event_emitter,
- "__event_call__": event_call,
- "__user__": user.model_dump() if isinstance(user, UserModel) else {},
- "__metadata__": metadata,
- "__request__": request,
- "__model__": model,
- "__oauth_token__": oauth_token,
- }
- # Initialize events to store additional event to be sent to the client
- # Initialize contexts and citation
- if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
- models = {
- request.state.model["id"]: request.state.model,
- }
- else:
- models = request.app.state.MODELS
- task_model_id = get_task_model_id(
- form_data["model"],
- request.app.state.config.TASK_MODEL,
- request.app.state.config.TASK_MODEL_EXTERNAL,
- models,
- )
- events = []
- sources = []
- # Folder "Project" handling
- # Check if the request has chat_id and is inside of a folder
- chat_id = metadata.get("chat_id", None)
- if chat_id and user:
- chat = Chats.get_chat_by_id_and_user_id(chat_id, user.id)
- if chat and chat.folder_id:
- folder = Folders.get_folder_by_id_and_user_id(chat.folder_id, user.id)
- if folder and folder.data:
- if "system_prompt" in folder.data:
- form_data = apply_system_prompt_to_body(
- folder.data["system_prompt"], form_data, metadata, user
- )
- if "files" in folder.data:
- form_data["files"] = [
- *folder.data["files"],
- *form_data.get("files", []),
- ]
- # Model "Knowledge" handling
- user_message = get_last_user_message(form_data["messages"])
- model_knowledge = model.get("info", {}).get("meta", {}).get("knowledge", False)
- if model_knowledge:
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "knowledge_search",
- "query": user_message,
- "done": False,
- },
- }
- )
- knowledge_files = []
- for item in model_knowledge:
- if item.get("collection_name"):
- knowledge_files.append(
- {
- "id": item.get("collection_name"),
- "name": item.get("name"),
- "legacy": True,
- }
- )
- elif item.get("collection_names"):
- knowledge_files.append(
- {
- "name": item.get("name"),
- "type": "collection",
- "collection_names": item.get("collection_names"),
- "legacy": True,
- }
- )
- else:
- knowledge_files.append(item)
- files = form_data.get("files", [])
- files.extend(knowledge_files)
- form_data["files"] = files
- variables = form_data.pop("variables", None)
- # Process the form_data through the pipeline
- try:
- form_data = await process_pipeline_inlet_filter(
- request, form_data, user, models
- )
- except Exception as e:
- raise e
- try:
- filter_functions = [
- Functions.get_function_by_id(filter_id)
- for filter_id in get_sorted_filter_ids(
- request, model, metadata.get("filter_ids", [])
- )
- ]
- form_data, flags = await process_filter_functions(
- request=request,
- filter_functions=filter_functions,
- filter_type="inlet",
- form_data=form_data,
- extra_params=extra_params,
- )
- except Exception as e:
- raise Exception(f"{e}")
- features = form_data.pop("features", None)
- if features:
- if "memory" in features and features["memory"]:
- form_data = await chat_memory_handler(
- request, form_data, extra_params, user
- )
- if "web_search" in features and features["web_search"]:
- form_data = await chat_web_search_handler(
- request, form_data, extra_params, user
- )
- if "image_generation" in features and features["image_generation"]:
- form_data = await chat_image_generation_handler(
- request, form_data, extra_params, user
- )
- if "code_interpreter" in features and features["code_interpreter"]:
- form_data["messages"] = add_or_update_user_message(
- (
- request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE
- if request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE != ""
- else DEFAULT_CODE_INTERPRETER_PROMPT
- ),
- form_data["messages"],
- )
- tool_ids = form_data.pop("tool_ids", None)
- files = form_data.pop("files", None)
- prompt = get_last_user_message(form_data["messages"])
- urls = extract_urls(prompt)
- if files or urls:
- if not files:
- files = []
- files = [*files, *[{"type": "url", "url": url, "name": url} for url in urls]]
- # Remove duplicate files based on their content
- files = list({json.dumps(f, sort_keys=True): f for f in files}.values())
- metadata = {
- **metadata,
- "tool_ids": tool_ids,
- "files": files,
- }
- form_data["metadata"] = metadata
- # Server side tools
- tool_ids = metadata.get("tool_ids", None)
- # Client side tools
- direct_tool_servers = metadata.get("tool_servers", None)
- log.debug(f"{tool_ids=}")
- log.debug(f"{direct_tool_servers=}")
- tools_dict = {}
- mcp_clients = {}
- mcp_tools_dict = {}
- if tool_ids:
- for tool_id in tool_ids:
- if tool_id.startswith("server:mcp:"):
- try:
- server_id = tool_id[len("server:mcp:") :]
- mcp_server_connection = None
- for (
- server_connection
- ) in request.app.state.config.TOOL_SERVER_CONNECTIONS:
- if (
- server_connection.get("type", "") == "mcp"
- and server_connection.get("info", {}).get("id") == server_id
- ):
- mcp_server_connection = server_connection
- break
- if not mcp_server_connection:
- log.error(f"MCP server with id {server_id} not found")
- continue
- auth_type = mcp_server_connection.get("auth_type", "")
- headers = {}
- if auth_type == "bearer":
- headers["Authorization"] = (
- f"Bearer {mcp_server_connection.get('key', '')}"
- )
- elif auth_type == "none":
- # No authentication
- pass
- elif auth_type == "session":
- headers["Authorization"] = (
- f"Bearer {request.state.token.credentials}"
- )
- elif auth_type == "system_oauth":
- oauth_token = extra_params.get("__oauth_token__", None)
- if oauth_token:
- headers["Authorization"] = (
- f"Bearer {oauth_token.get('access_token', '')}"
- )
- elif auth_type == "oauth_2.1":
- try:
- splits = server_id.split(":")
- server_id = splits[-1] if len(splits) > 1 else server_id
- oauth_token = await request.app.state.oauth_client_manager.get_oauth_token(
- user.id, f"mcp:{server_id}"
- )
- if oauth_token:
- headers["Authorization"] = (
- f"Bearer {oauth_token.get('access_token', '')}"
- )
- except Exception as e:
- log.error(f"Error getting OAuth token: {e}")
- oauth_token = None
- mcp_clients[server_id] = MCPClient()
- await mcp_clients[server_id].connect(
- url=mcp_server_connection.get("url", ""),
- headers=headers if headers else None,
- )
- tool_specs = await mcp_clients[server_id].list_tool_specs()
- for tool_spec in tool_specs:
- def make_tool_function(client, function_name):
- async def tool_function(**kwargs):
- print(kwargs)
- print(client)
- print(await client.list_tool_specs())
- return await client.call_tool(
- function_name,
- function_args=kwargs,
- )
- return tool_function
- tool_function = make_tool_function(
- mcp_clients[server_id], tool_spec["name"]
- )
- mcp_tools_dict[f"{server_id}_{tool_spec['name']}"] = {
- "spec": {
- **tool_spec,
- "name": f"{server_id}_{tool_spec['name']}",
- },
- "callable": tool_function,
- "type": "mcp",
- "client": mcp_clients[server_id],
- "direct": False,
- }
- except Exception as e:
- log.debug(e)
- continue
- tools_dict = await get_tools(
- request,
- tool_ids,
- user,
- {
- **extra_params,
- "__model__": models[task_model_id],
- "__messages__": form_data["messages"],
- "__files__": metadata.get("files", []),
- },
- )
- if mcp_tools_dict:
- tools_dict = {**tools_dict, **mcp_tools_dict}
- if direct_tool_servers:
- for tool_server in direct_tool_servers:
- tool_specs = tool_server.pop("specs", [])
- for tool in tool_specs:
- tools_dict[tool["name"]] = {
- "spec": tool,
- "direct": True,
- "server": tool_server,
- }
- if mcp_clients:
- metadata["mcp_clients"] = mcp_clients
- if tools_dict:
- if metadata.get("params", {}).get("function_calling") == "native":
- # If the function calling is native, then call the tools function calling handler
- metadata["tools"] = tools_dict
- form_data["tools"] = [
- {"type": "function", "function": tool.get("spec", {})}
- for tool in tools_dict.values()
- ]
- else:
- # If the function calling is not native, then call the tools function calling handler
- try:
- form_data, flags = await chat_completion_tools_handler(
- request, form_data, extra_params, user, models, tools_dict
- )
- sources.extend(flags.get("sources", []))
- except Exception as e:
- log.exception(e)
- try:
- form_data, flags = await chat_completion_files_handler(
- request, form_data, extra_params, user
- )
- sources.extend(flags.get("sources", []))
- except Exception as e:
- log.exception(e)
- # If context is not empty, insert it into the messages
- if len(sources) > 0:
- context_string = ""
- citation_idx_map = {}
- for source in sources:
- if "document" in source:
- for document_text, document_metadata in zip(
- source["document"], source["metadata"]
- ):
- source_name = source.get("source", {}).get("name", None)
- source_id = (
- document_metadata.get("source", None)
- or source.get("source", {}).get("id", None)
- or "N/A"
- )
- if source_id not in citation_idx_map:
- citation_idx_map[source_id] = len(citation_idx_map) + 1
- context_string += (
- f'<source id="{citation_idx_map[source_id]}"'
- + (f' name="{source_name}"' if source_name else "")
- + f">{document_text}</source>\n"
- )
- context_string = context_string.strip()
- if prompt is None:
- raise Exception("No user message found")
- if context_string != "":
- form_data["messages"] = add_or_update_user_message(
- rag_template(
- request.app.state.config.RAG_TEMPLATE,
- context_string,
- prompt,
- ),
- form_data["messages"],
- append=False,
- )
- # If there are citations, add them to the data_items
- sources = [
- source
- for source in sources
- if source.get("source", {}).get("name", "")
- or source.get("source", {}).get("id", "")
- ]
- if len(sources) > 0:
- events.append({"sources": sources})
- if model_knowledge:
- await event_emitter(
- {
- "type": "status",
- "data": {
- "action": "knowledge_search",
- "query": user_message,
- "done": True,
- "hidden": True,
- },
- }
- )
- return form_data, metadata, events
- async def process_chat_response(
- request, response, form_data, user, metadata, model, events, tasks
- ):
- async def background_tasks_handler():
- message = None
- messages = []
- if "chat_id" in metadata and not metadata["chat_id"].startswith("local:"):
- messages_map = Chats.get_messages_map_by_chat_id(metadata["chat_id"])
- message = messages_map.get(metadata["message_id"]) if messages_map else None
- message_list = get_message_list(messages_map, metadata["message_id"])
- # Remove details tags and files from the messages.
- # as get_message_list creates a new list, it does not affect
- # the original messages outside of this handler
- messages = []
- for message in message_list:
- content = message.get("content", "")
- if isinstance(content, list):
- for item in content:
- if item.get("type") == "text":
- content = item["text"]
- break
- if isinstance(content, str):
- content = re.sub(
- r"<details\b[^>]*>.*?<\/details>|!\[.*?\]\(.*?\)",
- "",
- content,
- flags=re.S | re.I,
- ).strip()
- messages.append(
- {
- **message,
- "role": message.get(
- "role", "assistant"
- ), # Safe fallback for missing role
- "content": content,
- }
- )
- else:
- # Local temp chat, get the model and message from the form_data
- message = get_last_user_message_item(form_data.get("messages", []))
- messages = form_data.get("messages", [])
- if message:
- message["model"] = form_data.get("model")
- if message and "model" in message:
- if tasks and messages:
- if (
- TASKS.FOLLOW_UP_GENERATION in tasks
- and tasks[TASKS.FOLLOW_UP_GENERATION]
- ):
- print("Generating follow ups")
- res = await generate_follow_ups(
- request,
- {
- "model": message["model"],
- "messages": messages,
- "message_id": metadata["message_id"],
- "chat_id": metadata["chat_id"],
- },
- user,
- )
- if res and isinstance(res, dict):
- if len(res.get("choices", [])) == 1:
- follow_ups_string = (
- res.get("choices", [])[0]
- .get("message", {})
- .get("content", "")
- )
- else:
- follow_ups_string = ""
- follow_ups_string = follow_ups_string[
- follow_ups_string.find("{") : follow_ups_string.rfind("}")
- + 1
- ]
- try:
- follow_ups = json.loads(follow_ups_string).get(
- "follow_ups", []
- )
- await event_emitter(
- {
- "type": "chat:message:follow_ups",
- "data": {
- "follow_ups": follow_ups,
- },
- }
- )
- if not metadata.get("chat_id", "").startswith("local:"):
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "followUps": follow_ups,
- },
- )
- except Exception as e:
- pass
- if not metadata.get("chat_id", "").startswith(
- "local:"
- ): # Only update titles and tags for non-temp chats
- if (
- TASKS.TITLE_GENERATION in tasks
- and tasks[TASKS.TITLE_GENERATION]
- ):
- user_message = get_last_user_message(messages)
- if user_message and len(user_message) > 100:
- user_message = user_message[:100] + "..."
- if tasks[TASKS.TITLE_GENERATION]:
- res = await generate_title(
- request,
- {
- "model": message["model"],
- "messages": messages,
- "chat_id": metadata["chat_id"],
- },
- user,
- )
- if res and isinstance(res, dict):
- if len(res.get("choices", [])) == 1:
- title_string = (
- res.get("choices", [])[0]
- .get("message", {})
- .get(
- "content",
- message.get("content", user_message),
- )
- )
- else:
- title_string = ""
- title_string = title_string[
- title_string.find("{") : title_string.rfind("}") + 1
- ]
- try:
- title = json.loads(title_string).get(
- "title", user_message
- )
- except Exception as e:
- title = ""
- if not title:
- title = messages[0].get("content", user_message)
- Chats.update_chat_title_by_id(
- metadata["chat_id"], title
- )
- await event_emitter(
- {
- "type": "chat:title",
- "data": title,
- }
- )
- elif len(messages) == 2:
- title = messages[0].get("content", user_message)
- Chats.update_chat_title_by_id(metadata["chat_id"], title)
- await event_emitter(
- {
- "type": "chat:title",
- "data": message.get("content", user_message),
- }
- )
- if TASKS.TAGS_GENERATION in tasks and tasks[TASKS.TAGS_GENERATION]:
- res = await generate_chat_tags(
- request,
- {
- "model": message["model"],
- "messages": messages,
- "chat_id": metadata["chat_id"],
- },
- user,
- )
- if res and isinstance(res, dict):
- if len(res.get("choices", [])) == 1:
- tags_string = (
- res.get("choices", [])[0]
- .get("message", {})
- .get("content", "")
- )
- else:
- tags_string = ""
- tags_string = tags_string[
- tags_string.find("{") : tags_string.rfind("}") + 1
- ]
- try:
- tags = json.loads(tags_string).get("tags", [])
- Chats.update_chat_tags_by_id(
- metadata["chat_id"], tags, user
- )
- await event_emitter(
- {
- "type": "chat:tags",
- "data": tags,
- }
- )
- except Exception as e:
- pass
- event_emitter = None
- event_caller = None
- if (
- "session_id" in metadata
- and metadata["session_id"]
- and "chat_id" in metadata
- and metadata["chat_id"]
- and "message_id" in metadata
- and metadata["message_id"]
- ):
- event_emitter = get_event_emitter(metadata)
- event_caller = get_event_call(metadata)
- # Non-streaming response
- if not isinstance(response, StreamingResponse):
- if event_emitter:
- try:
- if isinstance(response, dict) or isinstance(response, JSONResponse):
- if isinstance(response, list) and len(response) == 1:
- # If the response is a single-item list, unwrap it #17213
- response = response[0]
- if isinstance(response, JSONResponse) and isinstance(
- response.body, bytes
- ):
- try:
- response_data = json.loads(
- response.body.decode("utf-8", "replace")
- )
- except json.JSONDecodeError:
- response_data = {
- "error": {"detail": "Invalid JSON response"}
- }
- else:
- response_data = response
- if "error" in response_data:
- error = response_data.get("error")
- if isinstance(error, dict):
- error = error.get("detail", error)
- else:
- error = str(error)
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "error": {"content": error},
- },
- )
- if isinstance(error, str) or isinstance(error, dict):
- await event_emitter(
- {
- "type": "chat:message:error",
- "data": {"error": {"content": error}},
- }
- )
- if "selected_model_id" in response_data:
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "selectedModelId": response_data["selected_model_id"],
- },
- )
- choices = response_data.get("choices", [])
- if choices and choices[0].get("message", {}).get("content"):
- content = response_data["choices"][0]["message"]["content"]
- if content:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": response_data,
- }
- )
- title = Chats.get_chat_title_by_id(metadata["chat_id"])
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "done": True,
- "content": content,
- "title": title,
- },
- }
- )
- # Save message in the database
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "role": "assistant",
- "content": content,
- },
- )
- # Send a webhook notification if the user is not active
- if not get_active_status_by_user_id(user.id):
- webhook_url = Users.get_user_webhook_url_by_id(user.id)
- if webhook_url:
- await post_webhook(
- request.app.state.WEBUI_NAME,
- webhook_url,
- f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",
- {
- "action": "chat",
- "message": content,
- "title": title,
- "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",
- },
- )
- await background_tasks_handler()
- if events and isinstance(events, list):
- extra_response = {}
- for event in events:
- if isinstance(event, dict):
- extra_response.update(event)
- else:
- extra_response[event] = True
- response_data = {
- **extra_response,
- **response_data,
- }
- if isinstance(response, dict):
- response = response_data
- if isinstance(response, JSONResponse):
- response = JSONResponse(
- content=response_data,
- headers=response.headers,
- status_code=response.status_code,
- )
- except Exception as e:
- log.debug(f"Error occurred while processing request: {e}")
- pass
- return response
- else:
- if events and isinstance(events, list) and isinstance(response, dict):
- extra_response = {}
- for event in events:
- if isinstance(event, dict):
- extra_response.update(event)
- else:
- extra_response[event] = True
- response = {
- **extra_response,
- **response,
- }
- return response
- # Non standard response
- if not any(
- content_type in response.headers["Content-Type"]
- for content_type in ["text/event-stream", "application/x-ndjson"]
- ):
- return response
- oauth_token = None
- try:
- if request.cookies.get("oauth_session_id", None):
- oauth_token = await request.app.state.oauth_manager.get_oauth_token(
- user.id,
- request.cookies.get("oauth_session_id", None),
- )
- except Exception as e:
- log.error(f"Error getting OAuth token: {e}")
- extra_params = {
- "__event_emitter__": event_emitter,
- "__event_call__": event_caller,
- "__user__": user.model_dump() if isinstance(user, UserModel) else {},
- "__metadata__": metadata,
- "__oauth_token__": oauth_token,
- "__request__": request,
- "__model__": model,
- }
- filter_functions = [
- Functions.get_function_by_id(filter_id)
- for filter_id in get_sorted_filter_ids(
- request, model, metadata.get("filter_ids", [])
- )
- ]
- # Streaming response
- if event_emitter and event_caller:
- task_id = str(uuid4()) # Create a unique task ID.
- model_id = form_data.get("model", "")
- def split_content_and_whitespace(content):
- content_stripped = content.rstrip()
- original_whitespace = (
- content[len(content_stripped) :]
- if len(content) > len(content_stripped)
- else ""
- )
- return content_stripped, original_whitespace
- def is_opening_code_block(content):
- backtick_segments = content.split("```")
- # Even number of segments means the last backticks are opening a new block
- return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0
- # Handle as a background task
- async def response_handler(response, events):
- def serialize_content_blocks(content_blocks, raw=False):
- content = ""
- for block in content_blocks:
- if block["type"] == "text":
- block_content = block["content"].strip()
- if block_content:
- content = f"{content}{block_content}\n"
- elif block["type"] == "tool_calls":
- attributes = block.get("attributes", {})
- tool_calls = block.get("content", [])
- results = block.get("results", [])
- if content and not content.endswith("\n"):
- content += "\n"
- if results:
- tool_calls_display_content = ""
- for tool_call in tool_calls:
- tool_call_id = tool_call.get("id", "")
- tool_name = tool_call.get("function", {}).get(
- "name", ""
- )
- tool_arguments = tool_call.get("function", {}).get(
- "arguments", ""
- )
- tool_result = None
- tool_result_files = None
- for result in results:
- if tool_call_id == result.get("tool_call_id", ""):
- tool_result = result.get("content", None)
- tool_result_files = result.get("files", None)
- break
- if tool_result is not None:
- tool_result_embeds = result.get("embeds", "")
- tool_calls_display_content = f'{tool_calls_display_content}<details type="tool_calls" done="true" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}" result="{html.escape(json.dumps(tool_result, ensure_ascii=False))}" files="{html.escape(json.dumps(tool_result_files)) if tool_result_files else ""}" embeds="{html.escape(json.dumps(tool_result_embeds))}">\n<summary>Tool Executed</summary>\n</details>\n'
- else:
- tool_calls_display_content = f'{tool_calls_display_content}<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>\n'
- if not raw:
- content = f"{content}{tool_calls_display_content}"
- else:
- tool_calls_display_content = ""
- for tool_call in tool_calls:
- tool_call_id = tool_call.get("id", "")
- tool_name = tool_call.get("function", {}).get(
- "name", ""
- )
- tool_arguments = tool_call.get("function", {}).get(
- "arguments", ""
- )
- tool_calls_display_content = f'{tool_calls_display_content}\n<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>\n'
- if not raw:
- content = f"{content}{tool_calls_display_content}"
- elif block["type"] == "reasoning":
- reasoning_display_content = "\n".join(
- (f"> {line}" if not line.startswith(">") else line)
- for line in block["content"].splitlines()
- )
- reasoning_duration = block.get("duration", None)
- start_tag = block.get("start_tag", "")
- end_tag = block.get("end_tag", "")
- if content and not content.endswith("\n"):
- content += "\n"
- if reasoning_duration is not None:
- if raw:
- content = (
- f'{content}{start_tag}{block["content"]}{end_tag}\n'
- )
- else:
- content = f'{content}<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'
- else:
- if raw:
- content = (
- f'{content}{start_tag}{block["content"]}{end_tag}\n'
- )
- else:
- content = f'{content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'
- elif block["type"] == "code_interpreter":
- attributes = block.get("attributes", {})
- output = block.get("output", None)
- lang = attributes.get("lang", "")
- content_stripped, original_whitespace = (
- split_content_and_whitespace(content)
- )
- if is_opening_code_block(content_stripped):
- # Remove trailing backticks that would open a new block
- content = (
- content_stripped.rstrip("`").rstrip()
- + original_whitespace
- )
- else:
- # Keep content as is - either closing backticks or no backticks
- content = content_stripped + original_whitespace
- if content and not content.endswith("\n"):
- content += "\n"
- if output:
- output = html.escape(json.dumps(output))
- if raw:
- content = f'{content}<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n```output\n{output}\n```\n'
- else:
- content = f'{content}<details type="code_interpreter" done="true" output="{output}">\n<summary>Analyzed</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
- else:
- if raw:
- content = f'{content}<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n'
- else:
- content = f'{content}<details type="code_interpreter" done="false">\n<summary>Analyzing...</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
- else:
- block_content = str(block["content"]).strip()
- if block_content:
- content = f"{content}{block['type']}: {block_content}\n"
- return content.strip()
- def convert_content_blocks_to_messages(content_blocks, raw=False):
- messages = []
- temp_blocks = []
- for idx, block in enumerate(content_blocks):
- if block["type"] == "tool_calls":
- messages.append(
- {
- "role": "assistant",
- "content": serialize_content_blocks(temp_blocks, raw),
- "tool_calls": block.get("content"),
- }
- )
- results = block.get("results", [])
- for result in results:
- messages.append(
- {
- "role": "tool",
- "tool_call_id": result["tool_call_id"],
- "content": result.get("content", "") or "",
- }
- )
- temp_blocks = []
- else:
- temp_blocks.append(block)
- if temp_blocks:
- content = serialize_content_blocks(temp_blocks, raw)
- if content:
- messages.append(
- {
- "role": "assistant",
- "content": content,
- }
- )
- return messages
- def tag_content_handler(content_type, tags, content, content_blocks):
- end_flag = False
- def extract_attributes(tag_content):
- """Extract attributes from a tag if they exist."""
- attributes = {}
- if not tag_content: # Ensure tag_content is not None
- return attributes
- # Match attributes in the format: key="value" (ignores single quotes for simplicity)
- matches = re.findall(r'(\w+)\s*=\s*"([^"]+)"', tag_content)
- for key, value in matches:
- attributes[key] = value
- return attributes
- if content_blocks[-1]["type"] == "text":
- for start_tag, end_tag in tags:
- start_tag_pattern = rf"{re.escape(start_tag)}"
- if start_tag.startswith("<") and start_tag.endswith(">"):
- # Match start tag e.g., <tag> or <tag attr="value">
- # remove both '<' and '>' from start_tag
- # Match start tag with attributes
- start_tag_pattern = (
- rf"<{re.escape(start_tag[1:-1])}(\s.*?)?>"
- )
- match = re.search(start_tag_pattern, content)
- if match:
- try:
- attr_content = (
- match.group(1) if match.group(1) else ""
- ) # Ensure it's not None
- except:
- attr_content = ""
- attributes = extract_attributes(
- attr_content
- ) # Extract attributes safely
- # Capture everything before and after the matched tag
- before_tag = content[
- : match.start()
- ] # Content before opening tag
- after_tag = content[
- match.end() :
- ] # Content after opening tag
- # Remove the start tag and after from the currently handling text block
- content_blocks[-1]["content"] = content_blocks[-1][
- "content"
- ].replace(match.group(0) + after_tag, "")
- if before_tag:
- content_blocks[-1]["content"] = before_tag
- if not content_blocks[-1]["content"]:
- content_blocks.pop()
- # Append the new block
- content_blocks.append(
- {
- "type": content_type,
- "start_tag": start_tag,
- "end_tag": end_tag,
- "attributes": attributes,
- "content": "",
- "started_at": time.time(),
- }
- )
- if after_tag:
- content_blocks[-1]["content"] = after_tag
- tag_content_handler(
- content_type, tags, after_tag, content_blocks
- )
- break
- elif content_blocks[-1]["type"] == content_type:
- start_tag = content_blocks[-1]["start_tag"]
- end_tag = content_blocks[-1]["end_tag"]
- if end_tag.startswith("<") and end_tag.endswith(">"):
- # Match end tag e.g., </tag>
- end_tag_pattern = rf"{re.escape(end_tag)}"
- else:
- # Handle cases where end_tag is just a tag name
- end_tag_pattern = rf"{re.escape(end_tag)}"
- # Check if the content has the end tag
- if re.search(end_tag_pattern, content):
- end_flag = True
- block_content = content_blocks[-1]["content"]
- # Strip start and end tags from the content
- start_tag_pattern = rf"<{re.escape(start_tag)}(.*?)>"
- block_content = re.sub(
- start_tag_pattern, "", block_content
- ).strip()
- end_tag_regex = re.compile(end_tag_pattern, re.DOTALL)
- split_content = end_tag_regex.split(block_content, maxsplit=1)
- # Content inside the tag
- block_content = (
- split_content[0].strip() if split_content else ""
- )
- # Leftover content (everything after `</tag>`)
- leftover_content = (
- split_content[1].strip() if len(split_content) > 1 else ""
- )
- if block_content:
- content_blocks[-1]["content"] = block_content
- content_blocks[-1]["ended_at"] = time.time()
- content_blocks[-1]["duration"] = int(
- content_blocks[-1]["ended_at"]
- - content_blocks[-1]["started_at"]
- )
- # Reset the content_blocks by appending a new text block
- if content_type != "code_interpreter":
- if leftover_content:
- content_blocks.append(
- {
- "type": "text",
- "content": leftover_content,
- }
- )
- else:
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- else:
- # Remove the block if content is empty
- content_blocks.pop()
- if leftover_content:
- content_blocks.append(
- {
- "type": "text",
- "content": leftover_content,
- }
- )
- else:
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- # Clean processed content
- start_tag_pattern = rf"{re.escape(start_tag)}"
- if start_tag.startswith("<") and start_tag.endswith(">"):
- # Match start tag e.g., <tag> or <tag attr="value">
- # remove both '<' and '>' from start_tag
- # Match start tag with attributes
- start_tag_pattern = (
- rf"<{re.escape(start_tag[1:-1])}(\s.*?)?>"
- )
- content = re.sub(
- rf"{start_tag_pattern}(.|\n)*?{re.escape(end_tag)}",
- "",
- content,
- flags=re.DOTALL,
- )
- return content, content_blocks, end_flag
- message = Chats.get_message_by_id_and_message_id(
- metadata["chat_id"], metadata["message_id"]
- )
- tool_calls = []
- last_assistant_message = None
- try:
- if form_data["messages"][-1]["role"] == "assistant":
- last_assistant_message = get_last_assistant_message(
- form_data["messages"]
- )
- except Exception as e:
- pass
- content = (
- message.get("content", "")
- if message
- else last_assistant_message if last_assistant_message else ""
- )
- content_blocks = [
- {
- "type": "text",
- "content": content,
- }
- ]
- reasoning_tags_param = metadata.get("params", {}).get("reasoning_tags")
- DETECT_REASONING_TAGS = reasoning_tags_param is not False
- DETECT_CODE_INTERPRETER = metadata.get("features", {}).get(
- "code_interpreter", False
- )
- reasoning_tags = []
- if DETECT_REASONING_TAGS:
- if (
- isinstance(reasoning_tags_param, list)
- and len(reasoning_tags_param) == 2
- ):
- reasoning_tags = [
- (reasoning_tags_param[0], reasoning_tags_param[1])
- ]
- else:
- reasoning_tags = DEFAULT_REASONING_TAGS
- try:
- for event in events:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": event,
- }
- )
- # Save message in the database
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- **event,
- },
- )
- async def stream_body_handler(response, form_data):
- nonlocal content
- nonlocal content_blocks
- response_tool_calls = []
- delta_count = 0
- delta_chunk_size = max(
- CHAT_RESPONSE_STREAM_DELTA_CHUNK_SIZE,
- int(
- metadata.get("params", {}).get("stream_delta_chunk_size")
- or 1
- ),
- )
- last_delta_data = None
- async def flush_pending_delta_data(threshold: int = 0):
- nonlocal delta_count
- nonlocal last_delta_data
- if delta_count >= threshold and last_delta_data:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": last_delta_data,
- }
- )
- delta_count = 0
- last_delta_data = None
- async for line in response.body_iterator:
- line = (
- line.decode("utf-8", "replace")
- if isinstance(line, bytes)
- else line
- )
- data = line
- # Skip empty lines
- if not data.strip():
- continue
- # "data:" is the prefix for each event
- if not data.startswith("data:"):
- continue
- # Remove the prefix
- data = data[len("data:") :].strip()
- try:
- data = json.loads(data)
- data, _ = await process_filter_functions(
- request=request,
- filter_functions=filter_functions,
- filter_type="stream",
- form_data=data,
- extra_params={"__body__": form_data, **extra_params},
- )
- if data:
- if "event" in data:
- await event_emitter(data.get("event", {}))
- if "selected_model_id" in data:
- model_id = data["selected_model_id"]
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "selectedModelId": model_id,
- },
- )
- await event_emitter(
- {
- "type": "chat:completion",
- "data": data,
- }
- )
- else:
- choices = data.get("choices", [])
- # 17421
- usage = data.get("usage", {}) or {}
- usage.update(data.get("timings", {})) # llama.cpp
- if usage:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "usage": usage,
- },
- }
- )
- if not choices:
- error = data.get("error", {})
- if error:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "error": error,
- },
- }
- )
- continue
- delta = choices[0].get("delta", {})
- delta_tool_calls = delta.get("tool_calls", None)
- if delta_tool_calls:
- for delta_tool_call in delta_tool_calls:
- tool_call_index = delta_tool_call.get(
- "index"
- )
- if tool_call_index is not None:
- # Check if the tool call already exists
- current_response_tool_call = None
- for (
- response_tool_call
- ) in response_tool_calls:
- if (
- response_tool_call.get("index")
- == tool_call_index
- ):
- current_response_tool_call = (
- response_tool_call
- )
- break
- if current_response_tool_call is None:
- # Add the new tool call
- delta_tool_call.setdefault(
- "function", {}
- )
- delta_tool_call[
- "function"
- ].setdefault("name", "")
- delta_tool_call[
- "function"
- ].setdefault("arguments", "")
- response_tool_calls.append(
- delta_tool_call
- )
- else:
- # Update the existing tool call
- delta_name = delta_tool_call.get(
- "function", {}
- ).get("name")
- delta_arguments = (
- delta_tool_call.get(
- "function", {}
- ).get("arguments")
- )
- if delta_name:
- current_response_tool_call[
- "function"
- ]["name"] += delta_name
- if delta_arguments:
- current_response_tool_call[
- "function"
- ][
- "arguments"
- ] += delta_arguments
- value = delta.get("content")
- reasoning_content = (
- delta.get("reasoning_content")
- or delta.get("reasoning")
- or delta.get("thinking")
- )
- if reasoning_content:
- if (
- not content_blocks
- or content_blocks[-1]["type"] != "reasoning"
- ):
- reasoning_block = {
- "type": "reasoning",
- "start_tag": "<think>",
- "end_tag": "</think>",
- "attributes": {
- "type": "reasoning_content"
- },
- "content": "",
- "started_at": time.time(),
- }
- content_blocks.append(reasoning_block)
- else:
- reasoning_block = content_blocks[-1]
- reasoning_block["content"] += reasoning_content
- data = {
- "content": serialize_content_blocks(
- content_blocks
- )
- }
- if value:
- if (
- content_blocks
- and content_blocks[-1]["type"]
- == "reasoning"
- and content_blocks[-1]
- .get("attributes", {})
- .get("type")
- == "reasoning_content"
- ):
- reasoning_block = content_blocks[-1]
- reasoning_block["ended_at"] = time.time()
- reasoning_block["duration"] = int(
- reasoning_block["ended_at"]
- - reasoning_block["started_at"]
- )
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- content = f"{content}{value}"
- if not content_blocks:
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- content_blocks[-1]["content"] = (
- content_blocks[-1]["content"] + value
- )
- if DETECT_REASONING_TAGS:
- content, content_blocks, _ = (
- tag_content_handler(
- "reasoning",
- reasoning_tags,
- content,
- content_blocks,
- )
- )
- content, content_blocks, _ = (
- tag_content_handler(
- "solution",
- DEFAULT_SOLUTION_TAGS,
- content,
- content_blocks,
- )
- )
- if DETECT_CODE_INTERPRETER:
- content, content_blocks, end = (
- tag_content_handler(
- "code_interpreter",
- DEFAULT_CODE_INTERPRETER_TAGS,
- content,
- content_blocks,
- )
- )
- if end:
- break
- if ENABLE_REALTIME_CHAT_SAVE:
- # Save message in the database
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "content": serialize_content_blocks(
- content_blocks
- ),
- },
- )
- else:
- data = {
- "content": serialize_content_blocks(
- content_blocks
- ),
- }
- if delta:
- delta_count += 1
- last_delta_data = data
- if delta_count >= delta_chunk_size:
- await flush_pending_delta_data(delta_chunk_size)
- else:
- await event_emitter(
- {
- "type": "chat:completion",
- "data": data,
- }
- )
- except Exception as e:
- done = "data: [DONE]" in line
- if done:
- pass
- else:
- log.debug(f"Error: {e}")
- continue
- await flush_pending_delta_data()
- if content_blocks:
- # Clean up the last text block
- if content_blocks[-1]["type"] == "text":
- content_blocks[-1]["content"] = content_blocks[-1][
- "content"
- ].strip()
- if not content_blocks[-1]["content"]:
- content_blocks.pop()
- if not content_blocks:
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- if content_blocks[-1]["type"] == "reasoning":
- reasoning_block = content_blocks[-1]
- if reasoning_block.get("ended_at") is None:
- reasoning_block["ended_at"] = time.time()
- reasoning_block["duration"] = int(
- reasoning_block["ended_at"]
- - reasoning_block["started_at"]
- )
- if response_tool_calls:
- tool_calls.append(response_tool_calls)
- if response.background:
- await response.background()
- await stream_body_handler(response, form_data)
- tool_call_retries = 0
- while (
- len(tool_calls) > 0
- and tool_call_retries < CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES
- ):
- tool_call_retries += 1
- response_tool_calls = tool_calls.pop(0)
- content_blocks.append(
- {
- "type": "tool_calls",
- "content": response_tool_calls,
- }
- )
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "content": serialize_content_blocks(content_blocks),
- },
- }
- )
- tools = metadata.get("tools", {})
- results = []
- for tool_call in response_tool_calls:
- print("tool_call", tool_call)
- tool_call_id = tool_call.get("id", "")
- tool_function_name = tool_call.get("function", {}).get(
- "name", ""
- )
- tool_args = tool_call.get("function", {}).get("arguments", "{}")
- tool_function_params = {}
- try:
- # json.loads cannot be used because some models do not produce valid JSON
- tool_function_params = ast.literal_eval(tool_args)
- except Exception as e:
- log.debug(e)
- # Fallback to JSON parsing
- try:
- tool_function_params = json.loads(tool_args)
- except Exception as e:
- log.error(
- f"Error parsing tool call arguments: {tool_args}"
- )
- # Mutate the original tool call response params as they are passed back to the passed
- # back to the LLM via the content blocks. If they are in a json block and are invalid json,
- # this can cause downstream LLM integrations to fail (e.g. bedrock gateway) where response
- # params are not valid json.
- # Main case so far is no args = "" = invalid json.
- log.debug(
- f"Parsed args from {tool_args} to {tool_function_params}"
- )
- tool_call.setdefault("function", {})["arguments"] = json.dumps(
- tool_function_params
- )
- tool_result = None
- tool = None
- tool_type = None
- direct_tool = False
- if tool_function_name in tools:
- tool = tools[tool_function_name]
- spec = tool.get("spec", {})
- tool_type = tool.get("type", "")
- direct_tool = tool.get("direct", False)
- try:
- allowed_params = (
- spec.get("parameters", {})
- .get("properties", {})
- .keys()
- )
- tool_function_params = {
- k: v
- for k, v in tool_function_params.items()
- if k in allowed_params
- }
- if direct_tool:
- tool_result = await event_caller(
- {
- "type": "execute:tool",
- "data": {
- "id": str(uuid4()),
- "name": tool_function_name,
- "params": tool_function_params,
- "server": tool.get("server", {}),
- "session_id": metadata.get(
- "session_id", None
- ),
- },
- }
- )
- else:
- tool_function = tool["callable"]
- tool_result = await tool_function(
- **tool_function_params
- )
- except Exception as e:
- tool_result = str(e)
- tool_result, tool_result_files, tool_result_embeds = (
- process_tool_result(
- request,
- tool_function_name,
- tool_result,
- tool_type,
- direct_tool,
- metadata,
- user,
- )
- )
- results.append(
- {
- "tool_call_id": tool_call_id,
- "content": tool_result or "",
- **(
- {"files": tool_result_files}
- if tool_result_files
- else {}
- ),
- **(
- {"embeds": tool_result_embeds}
- if tool_result_embeds
- else {}
- ),
- }
- )
- content_blocks[-1]["results"] = results
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "content": serialize_content_blocks(content_blocks),
- },
- }
- )
- try:
- new_form_data = {
- "model": model_id,
- "stream": True,
- "tools": form_data["tools"],
- "messages": [
- *form_data["messages"],
- *convert_content_blocks_to_messages(
- content_blocks, True
- ),
- ],
- }
- res = await generate_chat_completion(
- request,
- new_form_data,
- user,
- )
- if isinstance(res, StreamingResponse):
- await stream_body_handler(res, new_form_data)
- else:
- break
- except Exception as e:
- log.debug(e)
- break
- if DETECT_CODE_INTERPRETER:
- MAX_RETRIES = 5
- retries = 0
- while (
- content_blocks[-1]["type"] == "code_interpreter"
- and retries < MAX_RETRIES
- ):
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "content": serialize_content_blocks(content_blocks),
- },
- }
- )
- retries += 1
- log.debug(f"Attempt count: {retries}")
- output = ""
- try:
- if content_blocks[-1]["attributes"].get("type") == "code":
- code = content_blocks[-1]["content"]
- if CODE_INTERPRETER_BLOCKED_MODULES:
- blocking_code = textwrap.dedent(
- f"""
- import builtins
- BLOCKED_MODULES = {CODE_INTERPRETER_BLOCKED_MODULES}
- _real_import = builtins.__import__
- def restricted_import(name, globals=None, locals=None, fromlist=(), level=0):
- if name.split('.')[0] in BLOCKED_MODULES:
- importer_name = globals.get('__name__') if globals else None
- if importer_name == '__main__':
- raise ImportError(
- f"Direct import of module {{name}} is restricted."
- )
- return _real_import(name, globals, locals, fromlist, level)
- builtins.__import__ = restricted_import
- """
- )
- code = blocking_code + "\n" + code
- if (
- request.app.state.config.CODE_INTERPRETER_ENGINE
- == "pyodide"
- ):
- output = await event_caller(
- {
- "type": "execute:python",
- "data": {
- "id": str(uuid4()),
- "code": code,
- "session_id": metadata.get(
- "session_id", None
- ),
- },
- }
- )
- elif (
- request.app.state.config.CODE_INTERPRETER_ENGINE
- == "jupyter"
- ):
- output = await execute_code_jupyter(
- request.app.state.config.CODE_INTERPRETER_JUPYTER_URL,
- code,
- (
- request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_TOKEN
- if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
- == "token"
- else None
- ),
- (
- request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD
- if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
- == "password"
- else None
- ),
- request.app.state.config.CODE_INTERPRETER_JUPYTER_TIMEOUT,
- )
- else:
- output = {
- "stdout": "Code interpreter engine not configured."
- }
- log.debug(f"Code interpreter output: {output}")
- if isinstance(output, dict):
- stdout = output.get("stdout", "")
- if isinstance(stdout, str):
- stdoutLines = stdout.split("\n")
- for idx, line in enumerate(stdoutLines):
- if "data:image/png;base64" in line:
- image_url = get_image_url_from_base64(
- request,
- line,
- metadata,
- user,
- )
- if image_url:
- stdoutLines[idx] = (
- f""
- )
- output["stdout"] = "\n".join(stdoutLines)
- result = output.get("result", "")
- if isinstance(result, str):
- resultLines = result.split("\n")
- for idx, line in enumerate(resultLines):
- if "data:image/png;base64" in line:
- image_url = get_image_url_from_base64(
- request,
- line,
- metadata,
- user,
- )
- resultLines[idx] = (
- f""
- )
- output["result"] = "\n".join(resultLines)
- except Exception as e:
- output = str(e)
- content_blocks[-1]["output"] = output
- content_blocks.append(
- {
- "type": "text",
- "content": "",
- }
- )
- await event_emitter(
- {
- "type": "chat:completion",
- "data": {
- "content": serialize_content_blocks(content_blocks),
- },
- }
- )
- try:
- new_form_data = {
- "model": model_id,
- "stream": True,
- "messages": [
- *form_data["messages"],
- {
- "role": "assistant",
- "content": serialize_content_blocks(
- content_blocks, raw=True
- ),
- },
- ],
- }
- res = await generate_chat_completion(
- request,
- new_form_data,
- user,
- )
- if isinstance(res, StreamingResponse):
- await stream_body_handler(res, new_form_data)
- else:
- break
- except Exception as e:
- log.debug(e)
- break
- title = Chats.get_chat_title_by_id(metadata["chat_id"])
- data = {
- "done": True,
- "content": serialize_content_blocks(content_blocks),
- "title": title,
- }
- if not ENABLE_REALTIME_CHAT_SAVE:
- # Save message in the database
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "content": serialize_content_blocks(content_blocks),
- },
- )
- # Send a webhook notification if the user is not active
- if not get_active_status_by_user_id(user.id):
- webhook_url = Users.get_user_webhook_url_by_id(user.id)
- if webhook_url:
- await post_webhook(
- request.app.state.WEBUI_NAME,
- webhook_url,
- f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",
- {
- "action": "chat",
- "message": content,
- "title": title,
- "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",
- },
- )
- await event_emitter(
- {
- "type": "chat:completion",
- "data": data,
- }
- )
- await background_tasks_handler()
- except asyncio.CancelledError:
- log.warning("Task was cancelled!")
- await event_emitter({"type": "chat:tasks:cancel"})
- if not ENABLE_REALTIME_CHAT_SAVE:
- # Save message in the database
- Chats.upsert_message_to_chat_by_id_and_message_id(
- metadata["chat_id"],
- metadata["message_id"],
- {
- "content": serialize_content_blocks(content_blocks),
- },
- )
- if response.background is not None:
- await response.background()
- return await response_handler(response, events)
- else:
- # Fallback to the original response
- async def stream_wrapper(original_generator, events):
- def wrap_item(item):
- return f"data: {item}\n\n"
- for event in events:
- event, _ = await process_filter_functions(
- request=request,
- filter_functions=filter_functions,
- filter_type="stream",
- form_data=event,
- extra_params=extra_params,
- )
- if event:
- yield wrap_item(json.dumps(event))
- async for data in original_generator:
- data, _ = await process_filter_functions(
- request=request,
- filter_functions=filter_functions,
- filter_type="stream",
- form_data=data,
- extra_params=extra_params,
- )
- if data:
- yield data
- return StreamingResponse(
- stream_wrapper(response.body_iterator, events),
- headers=dict(response.headers),
- background=response.background,
- )
|