| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759 | import timeimport loggingimport sysimport osimport base64import textwrapimport asynciofrom aiocache import cachedfrom typing import Any, Optionalimport randomimport jsonimport htmlimport inspectimport reimport astfrom uuid import uuid4from concurrent.futures import ThreadPoolExecutorfrom fastapi import Request, HTTPExceptionfrom starlette.responses import Response, StreamingResponse, JSONResponsefrom open_webui.models.chats import Chatsfrom open_webui.models.folders import Foldersfrom open_webui.models.users import Usersfrom open_webui.socket.main import (    get_event_call,    get_event_emitter,    get_active_status_by_user_id,)from open_webui.routers.tasks import (    generate_queries,    generate_title,    generate_follow_ups,    generate_image_prompt,    generate_chat_tags,)from open_webui.routers.retrieval import process_web_search, SearchFormfrom open_webui.routers.images import (    load_b64_image_data,    image_generations,    GenerateImageForm,    upload_image,)from open_webui.routers.pipelines import (    process_pipeline_inlet_filter,    process_pipeline_outlet_filter,)from open_webui.routers.memories import query_memory, QueryMemoryFormfrom open_webui.utils.webhook import post_webhookfrom open_webui.models.users import UserModelfrom open_webui.models.functions import Functionsfrom open_webui.models.models import Modelsfrom open_webui.retrieval.utils import get_sources_from_itemsfrom open_webui.utils.chat import generate_chat_completionfrom open_webui.utils.task import (    get_task_model_id,    rag_template,    tools_function_calling_generation_template,)from open_webui.utils.misc import (    deep_update,    get_message_list,    add_or_update_system_message,    add_or_update_user_message,    get_last_user_message,    get_last_assistant_message,    get_system_message,    prepend_to_first_user_message_content,    convert_logit_bias_input_to_json,)from open_webui.utils.tools import get_toolsfrom open_webui.utils.plugin import load_function_module_by_idfrom open_webui.utils.filter import (    get_sorted_filter_ids,    process_filter_functions,)from open_webui.utils.code_interpreter import execute_code_jupyterfrom open_webui.utils.payload import apply_system_prompt_to_bodyfrom open_webui.config import (    CACHE_DIR,    DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE,    DEFAULT_CODE_INTERPRETER_PROMPT,    CODE_INTERPRETER_BLOCKED_MODULES,)from open_webui.env import (    SRC_LOG_LEVELS,    GLOBAL_LOG_LEVEL,    CHAT_RESPONSE_STREAM_DELTA_CHUNK_SIZE,    CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES,    BYPASS_MODEL_ACCESS_CONTROL,    ENABLE_REALTIME_CHAT_SAVE,    ENABLE_QUERIES_CACHE,)from open_webui.constants import TASKSlogging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)log = logging.getLogger(__name__)log.setLevel(SRC_LOG_LEVELS["MAIN"])DEFAULT_REASONING_TAGS = [    ("<think>", "</think>"),    ("<thinking>", "</thinking>"),    ("<reason>", "</reason>"),    ("<reasoning>", "</reasoning>"),    ("<thought>", "</thought>"),    ("<Thought>", "</Thought>"),    ("<|begin_of_thought|>", "<|end_of_thought|>"),    ("◁think▷", "◁/think▷"),]DEFAULT_SOLUTION_TAGS = [("<|begin_of_solution|>", "<|end_of_solution|>")]DEFAULT_CODE_INTERPRETER_TAGS = [("<code_interpreter>", "</code_interpreter>")]async def chat_completion_tools_handler(    request: Request, body: dict, extra_params: dict, user: UserModel, models, tools) -> tuple[dict, dict]:    async def get_content_from_response(response) -> Optional[str]:        content = None        if hasattr(response, "body_iterator"):            async for chunk in response.body_iterator:                data = json.loads(chunk.decode("utf-8"))                content = data["choices"][0]["message"]["content"]            # Cleanup any remaining background tasks if necessary            if response.background is not None:                await response.background()        else:            content = response["choices"][0]["message"]["content"]        return content    def get_tools_function_calling_payload(messages, task_model_id, content):        user_message = get_last_user_message(messages)        history = "\n".join(            f"{message['role'].upper()}: \"\"\"{message['content']}\"\"\""            for message in messages[::-1][:4]        )        prompt = f"History:\n{history}\nQuery: {user_message}"        return {            "model": task_model_id,            "messages": [                {"role": "system", "content": content},                {"role": "user", "content": f"Query: {prompt}"},            ],            "stream": False,            "metadata": {"task": str(TASKS.FUNCTION_CALLING)},        }    event_caller = extra_params["__event_call__"]    metadata = extra_params["__metadata__"]    task_model_id = get_task_model_id(        body["model"],        request.app.state.config.TASK_MODEL,        request.app.state.config.TASK_MODEL_EXTERNAL,        models,    )    skip_files = False    sources = []    specs = [tool["spec"] for tool in tools.values()]    tools_specs = json.dumps(specs)    if request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE != "":        template = request.app.state.config.TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE    else:        template = DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE    tools_function_calling_prompt = tools_function_calling_generation_template(        template, tools_specs    )    payload = get_tools_function_calling_payload(        body["messages"], task_model_id, tools_function_calling_prompt    )    try:        response = await generate_chat_completion(request, form_data=payload, user=user)        log.debug(f"{response=}")        content = await get_content_from_response(response)        log.debug(f"{content=}")        if not content:            return body, {}        try:            content = content[content.find("{") : content.rfind("}") + 1]            if not content:                raise Exception("No JSON object found in the response")            result = json.loads(content)            async def tool_call_handler(tool_call):                nonlocal skip_files                log.debug(f"{tool_call=}")                tool_function_name = tool_call.get("name", None)                if tool_function_name not in tools:                    return body, {}                tool_function_params = tool_call.get("parameters", {})                try:                    tool = tools[tool_function_name]                    spec = tool.get("spec", {})                    allowed_params = (                        spec.get("parameters", {}).get("properties", {}).keys()                    )                    tool_function_params = {                        k: v                        for k, v in tool_function_params.items()                        if k in allowed_params                    }                    if tool.get("direct", False):                        tool_result = await event_caller(                            {                                "type": "execute:tool",                                "data": {                                    "id": str(uuid4()),                                    "name": tool_function_name,                                    "params": tool_function_params,                                    "server": tool.get("server", {}),                                    "session_id": metadata.get("session_id", None),                                },                            }                        )                    else:                        tool_function = tool["callable"]                        tool_result = await tool_function(**tool_function_params)                except Exception as e:                    tool_result = str(e)                tool_result_files = []                if isinstance(tool_result, list):                    for item in tool_result:                        # check if string                        if isinstance(item, str) and item.startswith("data:"):                            tool_result_files.append(item)                            tool_result.remove(item)                if isinstance(tool_result, dict) or isinstance(tool_result, list):                    tool_result = json.dumps(tool_result, indent=2)                if isinstance(tool_result, str):                    tool = tools[tool_function_name]                    tool_id = tool.get("tool_id", "")                    tool_name = (                        f"{tool_id}/{tool_function_name}"                        if tool_id                        else f"{tool_function_name}"                    )                    # Citation is enabled for this tool                    sources.append(                        {                            "source": {                                "name": (f"TOOL:{tool_name}"),                            },                            "document": [tool_result],                            "metadata": [                                {                                    "source": (f"TOOL:{tool_name}"),                                    "parameters": tool_function_params,                                }                            ],                            "tool_result": True,                        }                    )                    # Citation is not enabled for this tool                    body["messages"] = add_or_update_user_message(                        f"\nTool `{tool_name}` Output: {tool_result}",                        body["messages"],                    )                    if (                        tools[tool_function_name]                        .get("metadata", {})                        .get("file_handler", False)                    ):                        skip_files = True            # check if "tool_calls" in result            if result.get("tool_calls"):                for tool_call in result.get("tool_calls"):                    await tool_call_handler(tool_call)            else:                await tool_call_handler(result)        except Exception as e:            log.debug(f"Error: {e}")            content = None    except Exception as e:        log.debug(f"Error: {e}")        content = None    log.debug(f"tool_contexts: {sources}")    if skip_files and "files" in body.get("metadata", {}):        del body["metadata"]["files"]    return body, {"sources": sources}async def chat_memory_handler(    request: Request, form_data: dict, extra_params: dict, user):    try:        results = await query_memory(            request,            QueryMemoryForm(                **{                    "content": get_last_user_message(form_data["messages"]) or "",                    "k": 3,                }            ),            user,        )    except Exception as e:        log.debug(e)        results = None    user_context = ""    if results and hasattr(results, "documents"):        if results.documents and len(results.documents) > 0:            for doc_idx, doc in enumerate(results.documents[0]):                created_at_date = "Unknown Date"                if results.metadatas[0][doc_idx].get("created_at"):                    created_at_timestamp = results.metadatas[0][doc_idx]["created_at"]                    created_at_date = time.strftime(                        "%Y-%m-%d", time.localtime(created_at_timestamp)                    )                user_context += f"{doc_idx + 1}. [{created_at_date}] {doc}\n"    form_data["messages"] = add_or_update_system_message(        f"User Context:\n{user_context}\n", form_data["messages"], append=True    )    return form_dataasync def chat_web_search_handler(    request: Request, form_data: dict, extra_params: dict, user):    event_emitter = extra_params["__event_emitter__"]    await event_emitter(        {            "type": "status",            "data": {                "action": "web_search",                "description": "Searching the web",                "done": False,            },        }    )    messages = form_data["messages"]    user_message = get_last_user_message(messages)    queries = []    try:        res = await generate_queries(            request,            {                "model": form_data["model"],                "messages": messages,                "prompt": user_message,                "type": "web_search",            },            user,        )        response = res["choices"][0]["message"]["content"]        try:            bracket_start = response.find("{")            bracket_end = response.rfind("}") + 1            if bracket_start == -1 or bracket_end == -1:                raise Exception("No JSON object found in the response")            response = response[bracket_start:bracket_end]            queries = json.loads(response)            queries = queries.get("queries", [])        except Exception as e:            queries = [response]        if ENABLE_QUERIES_CACHE:            request.state.cached_queries = queries    except Exception as e:        log.exception(e)        queries = [user_message]    # Check if generated queries are empty    if len(queries) == 1 and queries[0].strip() == "":        queries = [user_message]    # Check if queries are not found    if len(queries) == 0:        await event_emitter(            {                "type": "status",                "data": {                    "action": "web_search",                    "description": "No search query generated",                    "done": True,                },            }        )        return form_data    await event_emitter(        {            "type": "status",            "data": {                "action": "web_search_queries_generated",                "queries": queries,                "done": False,            },        }    )    try:        results = await process_web_search(            request,            SearchForm(queries=queries),            user=user,        )        if results:            files = form_data.get("files", [])            if results.get("collection_names"):                for col_idx, collection_name in enumerate(                    results.get("collection_names")                ):                    files.append(                        {                            "collection_name": collection_name,                            "name": ", ".join(queries),                            "type": "web_search",                            "urls": results["filenames"],                            "queries": queries,                        }                    )            elif results.get("docs"):                # Invoked when bypass embedding and retrieval is set to True                docs = results["docs"]                files.append(                    {                        "docs": docs,                        "name": ", ".join(queries),                        "type": "web_search",                        "urls": results["filenames"],                        "queries": queries,                    }                )            form_data["files"] = files            await event_emitter(                {                    "type": "status",                    "data": {                        "action": "web_search",                        "description": "Searched {{count}} sites",                        "urls": results["filenames"],                        "items": results.get("items", []),                        "done": True,                    },                }            )        else:            await event_emitter(                {                    "type": "status",                    "data": {                        "action": "web_search",                        "description": "No search results found",                        "done": True,                        "error": True,                    },                }            )    except Exception as e:        log.exception(e)        await event_emitter(            {                "type": "status",                "data": {                    "action": "web_search",                    "description": "An error occurred while searching the web",                    "queries": queries,                    "done": True,                    "error": True,                },            }        )    return form_dataasync def chat_image_generation_handler(    request: Request, form_data: dict, extra_params: dict, user):    __event_emitter__ = extra_params["__event_emitter__"]    await __event_emitter__(        {            "type": "status",            "data": {"description": "Creating image", "done": False},        }    )    messages = form_data["messages"]    user_message = get_last_user_message(messages)    prompt = user_message    negative_prompt = ""    if request.app.state.config.ENABLE_IMAGE_PROMPT_GENERATION:        try:            res = await generate_image_prompt(                request,                {                    "model": form_data["model"],                    "messages": messages,                },                user,            )            response = res["choices"][0]["message"]["content"]            try:                bracket_start = response.find("{")                bracket_end = response.rfind("}") + 1                if bracket_start == -1 or bracket_end == -1:                    raise Exception("No JSON object found in the response")                response = response[bracket_start:bracket_end]                response = json.loads(response)                prompt = response.get("prompt", [])            except Exception as e:                prompt = user_message        except Exception as e:            log.exception(e)            prompt = user_message    system_message_content = ""    try:        images = await image_generations(            request=request,            form_data=GenerateImageForm(**{"prompt": prompt}),            user=user,        )        await __event_emitter__(            {                "type": "status",                "data": {"description": "Image created", "done": True},            }        )        await __event_emitter__(            {                "type": "files",                "data": {                    "files": [                        {                            "type": "image",                            "url": image["url"],                        }                        for image in images                    ]                },            }        )        system_message_content = "<context>User is shown the generated image, tell the user that the image has been generated</context>"    except Exception as e:        log.exception(e)        await __event_emitter__(            {                "type": "status",                "data": {                    "description": f"An error occurred while generating an image",                    "done": True,                },            }        )        system_message_content = "<context>Unable to generate an image, tell the user that an error occurred</context>"    if system_message_content:        form_data["messages"] = add_or_update_system_message(            system_message_content, form_data["messages"]        )    return form_dataasync def chat_completion_files_handler(    request: Request, body: dict, extra_params: dict, user: UserModel) -> tuple[dict, dict[str, list]]:    __event_emitter__ = extra_params["__event_emitter__"]    sources = []    if files := body.get("metadata", {}).get("files", None):        queries = []        try:            queries_response = await generate_queries(                request,                {                    "model": body["model"],                    "messages": body["messages"],                    "type": "retrieval",                },                user,            )            queries_response = queries_response["choices"][0]["message"]["content"]            try:                bracket_start = queries_response.find("{")                bracket_end = queries_response.rfind("}") + 1                if bracket_start == -1 or bracket_end == -1:                    raise Exception("No JSON object found in the response")                queries_response = queries_response[bracket_start:bracket_end]                queries_response = json.loads(queries_response)            except Exception as e:                queries_response = {"queries": [queries_response]}            queries = queries_response.get("queries", [])        except:            pass        if len(queries) == 0:            queries = [get_last_user_message(body["messages"])]        await __event_emitter__(            {                "type": "status",                "data": {                    "action": "queries_generated",                    "queries": queries,                    "done": False,                },            }        )        try:            # Offload get_sources_from_items to a separate thread            loop = asyncio.get_running_loop()            with ThreadPoolExecutor() as executor:                sources = await loop.run_in_executor(                    executor,                    lambda: get_sources_from_items(                        request=request,                        items=files,                        queries=queries,                        embedding_function=lambda query, prefix: request.app.state.EMBEDDING_FUNCTION(                            query, prefix=prefix, user=user                        ),                        k=request.app.state.config.TOP_K,                        reranking_function=(                            (                                lambda sentences: request.app.state.RERANKING_FUNCTION(                                    sentences, user=user                                )                            )                            if request.app.state.RERANKING_FUNCTION                            else None                        ),                        k_reranker=request.app.state.config.TOP_K_RERANKER,                        r=request.app.state.config.RELEVANCE_THRESHOLD,                        hybrid_bm25_weight=request.app.state.config.HYBRID_BM25_WEIGHT,                        hybrid_search=request.app.state.config.ENABLE_RAG_HYBRID_SEARCH,                        full_context=request.app.state.config.RAG_FULL_CONTEXT,                        user=user,                    ),                )        except Exception as e:            log.exception(e)        log.debug(f"rag_contexts:sources: {sources}")        unique_ids = set()        for source in sources or []:            if not source or len(source.keys()) == 0:                continue            documents = source.get("document") or []            metadatas = source.get("metadata") or []            src_info = source.get("source") or {}            for index, _ in enumerate(documents):                metadata = metadatas[index] if index < len(metadatas) else None                _id = (                    (metadata or {}).get("source")                    or (src_info or {}).get("id")                    or "N/A"                )                unique_ids.add(_id)        sources_count = len(unique_ids)        await __event_emitter__(            {                "type": "status",                "data": {                    "action": "sources_retrieved",                    "count": sources_count,                    "done": True,                },            }        )    return body, {"sources": sources}def apply_params_to_form_data(form_data, model):    params = form_data.pop("params", {})    custom_params = params.pop("custom_params", {})    open_webui_params = {        "stream_response": bool,        "stream_delta_chunk_size": int,        "function_calling": str,        "reasoning_tags": list,        "system": str,    }    for key in list(params.keys()):        if key in open_webui_params:            del params[key]    if custom_params:        # Attempt to parse custom_params if they are strings        for key, value in custom_params.items():            if isinstance(value, str):                try:                    # Attempt to parse the string as JSON                    custom_params[key] = json.loads(value)                except json.JSONDecodeError:                    # If it fails, keep the original string                    pass        # If custom_params are provided, merge them into params        params = deep_update(params, custom_params)    if model.get("owned_by") == "ollama":        # Ollama specific parameters        form_data["options"] = params    else:        if isinstance(params, dict):            for key, value in params.items():                if value is not None:                    form_data[key] = value        if "logit_bias" in params and params["logit_bias"] is not None:            try:                form_data["logit_bias"] = json.loads(                    convert_logit_bias_input_to_json(params["logit_bias"])                )            except Exception as e:                log.exception(f"Error parsing logit_bias: {e}")    return form_dataasync def process_chat_payload(request, form_data, user, metadata, model):    # Pipeline Inlet -> Filter Inlet -> Chat Memory -> Chat Web Search -> Chat Image Generation    # -> Chat Code Interpreter (Form Data Update) -> (Default) Chat Tools Function Calling    # -> Chat Files    form_data = apply_params_to_form_data(form_data, model)    log.debug(f"form_data: {form_data}")    system_message = get_system_message(form_data.get("messages", []))    if system_message:        try:            form_data = apply_system_prompt_to_body(                system_message.get("content"), form_data, metadata, user            )        except:            pass    event_emitter = get_event_emitter(metadata)    event_call = get_event_call(metadata)    oauth_token = None    try:        oauth_token = request.app.state.oauth_manager.get_oauth_token(            user.id,            request.cookies.get("oauth_session_id", None),        )    except Exception as e:        log.error(f"Error getting OAuth token: {e}")    extra_params = {        "__event_emitter__": event_emitter,        "__event_call__": event_call,        "__user__": user.model_dump() if isinstance(user, UserModel) else {},        "__metadata__": metadata,        "__request__": request,        "__model__": model,        "__oauth_token__": oauth_token,    }    # Initialize events to store additional event to be sent to the client    # Initialize contexts and citation    if getattr(request.state, "direct", False) and hasattr(request.state, "model"):        models = {            request.state.model["id"]: request.state.model,        }    else:        models = request.app.state.MODELS    task_model_id = get_task_model_id(        form_data["model"],        request.app.state.config.TASK_MODEL,        request.app.state.config.TASK_MODEL_EXTERNAL,        models,    )    events = []    sources = []    # Folder "Project" handling    # Check if the request has chat_id and is inside of a folder    chat_id = metadata.get("chat_id", None)    if chat_id and user:        chat = Chats.get_chat_by_id_and_user_id(chat_id, user.id)        if chat and chat.folder_id:            folder = Folders.get_folder_by_id_and_user_id(chat.folder_id, user.id)            if folder and folder.data:                if "system_prompt" in folder.data:                    form_data = apply_system_prompt_to_body(                        folder.data["system_prompt"], form_data, metadata, user                    )                if "files" in folder.data:                    form_data["files"] = [                        *folder.data["files"],                        *form_data.get("files", []),                    ]    # Model "Knowledge" handling    user_message = get_last_user_message(form_data["messages"])    model_knowledge = model.get("info", {}).get("meta", {}).get("knowledge", False)    if model_knowledge:        await event_emitter(            {                "type": "status",                "data": {                    "action": "knowledge_search",                    "query": user_message,                    "done": False,                },            }        )        knowledge_files = []        for item in model_knowledge:            if item.get("collection_name"):                knowledge_files.append(                    {                        "id": item.get("collection_name"),                        "name": item.get("name"),                        "legacy": True,                    }                )            elif item.get("collection_names"):                knowledge_files.append(                    {                        "name": item.get("name"),                        "type": "collection",                        "collection_names": item.get("collection_names"),                        "legacy": True,                    }                )            else:                knowledge_files.append(item)        files = form_data.get("files", [])        files.extend(knowledge_files)        form_data["files"] = files    variables = form_data.pop("variables", None)    # Process the form_data through the pipeline    try:        form_data = await process_pipeline_inlet_filter(            request, form_data, user, models        )    except Exception as e:        raise e    try:        filter_functions = [            Functions.get_function_by_id(filter_id)            for filter_id in get_sorted_filter_ids(                request, model, metadata.get("filter_ids", [])            )        ]        form_data, flags = await process_filter_functions(            request=request,            filter_functions=filter_functions,            filter_type="inlet",            form_data=form_data,            extra_params=extra_params,        )    except Exception as e:        raise Exception(f"{e}")    features = form_data.pop("features", None)    if features:        if "memory" in features and features["memory"]:            form_data = await chat_memory_handler(                request, form_data, extra_params, user            )        if "web_search" in features and features["web_search"]:            form_data = await chat_web_search_handler(                request, form_data, extra_params, user            )        if "image_generation" in features and features["image_generation"]:            form_data = await chat_image_generation_handler(                request, form_data, extra_params, user            )        if "code_interpreter" in features and features["code_interpreter"]:            form_data["messages"] = add_or_update_user_message(                (                    request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE                    if request.app.state.config.CODE_INTERPRETER_PROMPT_TEMPLATE != ""                    else DEFAULT_CODE_INTERPRETER_PROMPT                ),                form_data["messages"],            )    tool_ids = form_data.pop("tool_ids", None)    files = form_data.pop("files", None)    # Remove files duplicates    if files:        files = list({json.dumps(f, sort_keys=True): f for f in files}.values())    metadata = {        **metadata,        "tool_ids": tool_ids,        "files": files,    }    form_data["metadata"] = metadata    # Server side tools    tool_ids = metadata.get("tool_ids", None)    # Client side tools    tool_servers = metadata.get("tool_servers", None)    log.debug(f"{tool_ids=}")    log.debug(f"{tool_servers=}")    tools_dict = {}    if tool_ids:        tools_dict = await get_tools(            request,            tool_ids,            user,            {                **extra_params,                "__model__": models[task_model_id],                "__messages__": form_data["messages"],                "__files__": metadata.get("files", []),            },        )    if tool_servers:        for tool_server in tool_servers:            tool_specs = tool_server.pop("specs", [])            for tool in tool_specs:                tools_dict[tool["name"]] = {                    "spec": tool,                    "direct": True,                    "server": tool_server,                }    if tools_dict:        if metadata.get("params", {}).get("function_calling") == "native":            # If the function calling is native, then call the tools function calling handler            metadata["tools"] = tools_dict            form_data["tools"] = [                {"type": "function", "function": tool.get("spec", {})}                for tool in tools_dict.values()            ]        else:            # If the function calling is not native, then call the tools function calling handler            try:                form_data, flags = await chat_completion_tools_handler(                    request, form_data, extra_params, user, models, tools_dict                )                sources.extend(flags.get("sources", []))            except Exception as e:                log.exception(e)    try:        form_data, flags = await chat_completion_files_handler(            request, form_data, extra_params, user        )        sources.extend(flags.get("sources", []))    except Exception as e:        log.exception(e)    # If context is not empty, insert it into the messages    if len(sources) > 0:        context_string = ""        citation_idx_map = {}        for source in sources:            is_tool_result = source.get("tool_result", False)            if "document" in source and not is_tool_result:                for document_text, document_metadata in zip(                    source["document"], source["metadata"]                ):                    source_name = source.get("source", {}).get("name", None)                    source_id = (                        document_metadata.get("source", None)                        or source.get("source", {}).get("id", None)                        or "N/A"                    )                    if source_id not in citation_idx_map:                        citation_idx_map[source_id] = len(citation_idx_map) + 1                    context_string += (                        f'<source id="{citation_idx_map[source_id]}"'                        + (f' name="{source_name}"' if source_name else "")                        + f">{document_text}</source>\n"                    )        context_string = context_string.strip()        prompt = get_last_user_message(form_data["messages"])        if prompt is None:            raise Exception("No user message found")        if context_string != "":            # Workaround for Ollama 2.0+ system prompt issue            # TODO: replace with add_or_update_system_message            if model.get("owned_by") == "ollama":                form_data["messages"] = prepend_to_first_user_message_content(                    rag_template(                        request.app.state.config.RAG_TEMPLATE,                        context_string,                        prompt,                    ),                    form_data["messages"],                )            else:                form_data["messages"] = add_or_update_system_message(                    rag_template(                        request.app.state.config.RAG_TEMPLATE,                        context_string,                        prompt,                    ),                    form_data["messages"],                )    # If there are citations, add them to the data_items    sources = [        source        for source in sources        if source.get("source", {}).get("name", "")        or source.get("source", {}).get("id", "")    ]    if len(sources) > 0:        events.append({"sources": sources})    if model_knowledge:        await event_emitter(            {                "type": "status",                "data": {                    "action": "knowledge_search",                    "query": user_message,                    "done": True,                    "hidden": True,                },            }        )    return form_data, metadata, eventsasync def process_chat_response(    request, response, form_data, user, metadata, model, events, tasks):    async def background_tasks_handler():        message_map = Chats.get_messages_by_chat_id(metadata["chat_id"])        message = message_map.get(metadata["message_id"]) if message_map else None        if message:            message_list = get_message_list(message_map, metadata["message_id"])            # Remove details tags and files from the messages.            # as get_message_list creates a new list, it does not affect            # the original messages outside of this handler            messages = []            for message in message_list:                content = message.get("content", "")                if isinstance(content, list):                    for item in content:                        if item.get("type") == "text":                            content = item["text"]                            break                if isinstance(content, str):                    content = re.sub(                        r"<details\b[^>]*>.*?<\/details>|!\[.*?\]\(.*?\)",                        "",                        content,                        flags=re.S | re.I,                    ).strip()                messages.append(                    {                        **message,                        "role": message.get(                            "role", "assistant"                        ),  # Safe fallback for missing role                        "content": content,                    }                )            if tasks and messages:                if (                    TASKS.FOLLOW_UP_GENERATION in tasks                    and tasks[TASKS.FOLLOW_UP_GENERATION]                ):                    res = await generate_follow_ups(                        request,                        {                            "model": message["model"],                            "messages": messages,                            "message_id": metadata["message_id"],                            "chat_id": metadata["chat_id"],                        },                        user,                    )                    if res and isinstance(res, dict):                        if len(res.get("choices", [])) == 1:                            follow_ups_string = (                                res.get("choices", [])[0]                                .get("message", {})                                .get("content", "")                            )                        else:                            follow_ups_string = ""                        follow_ups_string = follow_ups_string[                            follow_ups_string.find("{") : follow_ups_string.rfind("}")                            + 1                        ]                        try:                            follow_ups = json.loads(follow_ups_string).get(                                "follow_ups", []                            )                            Chats.upsert_message_to_chat_by_id_and_message_id(                                metadata["chat_id"],                                metadata["message_id"],                                {                                    "followUps": follow_ups,                                },                            )                            await event_emitter(                                {                                    "type": "chat:message:follow_ups",                                    "data": {                                        "follow_ups": follow_ups,                                    },                                }                            )                        except Exception as e:                            pass                if TASKS.TITLE_GENERATION in tasks:                    user_message = get_last_user_message(messages)                    if user_message and len(user_message) > 100:                        user_message = user_message[:100] + "..."                    if tasks[TASKS.TITLE_GENERATION]:                        res = await generate_title(                            request,                            {                                "model": message["model"],                                "messages": messages,                                "chat_id": metadata["chat_id"],                            },                            user,                        )                        if res and isinstance(res, dict):                            if len(res.get("choices", [])) == 1:                                title_string = (                                    res.get("choices", [])[0]                                    .get("message", {})                                    .get(                                        "content", message.get("content", user_message)                                    )                                )                            else:                                title_string = ""                            title_string = title_string[                                title_string.find("{") : title_string.rfind("}") + 1                            ]                            try:                                title = json.loads(title_string).get(                                    "title", user_message                                )                            except Exception as e:                                title = ""                            if not title:                                title = messages[0].get("content", user_message)                            Chats.update_chat_title_by_id(metadata["chat_id"], title)                            await event_emitter(                                {                                    "type": "chat:title",                                    "data": title,                                }                            )                    elif len(messages) == 2:                        title = messages[0].get("content", user_message)                        Chats.update_chat_title_by_id(metadata["chat_id"], title)                        await event_emitter(                            {                                "type": "chat:title",                                "data": message.get("content", user_message),                            }                        )                if TASKS.TAGS_GENERATION in tasks and tasks[TASKS.TAGS_GENERATION]:                    res = await generate_chat_tags(                        request,                        {                            "model": message["model"],                            "messages": messages,                            "chat_id": metadata["chat_id"],                        },                        user,                    )                    if res and isinstance(res, dict):                        if len(res.get("choices", [])) == 1:                            tags_string = (                                res.get("choices", [])[0]                                .get("message", {})                                .get("content", "")                            )                        else:                            tags_string = ""                        tags_string = tags_string[                            tags_string.find("{") : tags_string.rfind("}") + 1                        ]                        try:                            tags = json.loads(tags_string).get("tags", [])                            Chats.update_chat_tags_by_id(                                metadata["chat_id"], tags, user                            )                            await event_emitter(                                {                                    "type": "chat:tags",                                    "data": tags,                                }                            )                        except Exception as e:                            pass    event_emitter = None    event_caller = None    if (        "session_id" in metadata        and metadata["session_id"]        and "chat_id" in metadata        and metadata["chat_id"]        and "message_id" in metadata        and metadata["message_id"]    ):        event_emitter = get_event_emitter(metadata)        event_caller = get_event_call(metadata)    # Non-streaming response    if not isinstance(response, StreamingResponse):        if event_emitter:            try:                if isinstance(response, dict) or isinstance(response, JSONResponse):                    if isinstance(response, list) and len(response) == 1:                        # If the response is a single-item list, unwrap it #17213                        response = response[0]                    if isinstance(response, JSONResponse) and isinstance(                        response.body, bytes                    ):                        try:                            response_data = json.loads(response.body.decode("utf-8"))                        except json.JSONDecodeError:                            response_data = {                                "error": {"detail": "Invalid JSON response"}                            }                    else:                        response_data = response                    if "error" in response_data:                        error = response_data.get("error")                        if isinstance(error, dict):                            error = error.get("detail", error)                        else:                            error = str(error)                        Chats.upsert_message_to_chat_by_id_and_message_id(                            metadata["chat_id"],                            metadata["message_id"],                            {                                "error": {"content": error},                            },                        )                        if isinstance(error, str) or isinstance(error, dict):                            await event_emitter(                                {                                    "type": "chat:message:error",                                    "data": {"error": {"content": error}},                                }                            )                    if "selected_model_id" in response_data:                        Chats.upsert_message_to_chat_by_id_and_message_id(                            metadata["chat_id"],                            metadata["message_id"],                            {                                "selectedModelId": response_data["selected_model_id"],                            },                        )                    choices = response_data.get("choices", [])                    if choices and choices[0].get("message", {}).get("content"):                        content = response_data["choices"][0]["message"]["content"]                        if content:                            await event_emitter(                                {                                    "type": "chat:completion",                                    "data": response_data,                                }                            )                            title = Chats.get_chat_title_by_id(metadata["chat_id"])                            await event_emitter(                                {                                    "type": "chat:completion",                                    "data": {                                        "done": True,                                        "content": content,                                        "title": title,                                    },                                }                            )                            # Save message in the database                            Chats.upsert_message_to_chat_by_id_and_message_id(                                metadata["chat_id"],                                metadata["message_id"],                                {                                    "role": "assistant",                                    "content": content,                                },                            )                            # Send a webhook notification if the user is not active                            if not get_active_status_by_user_id(user.id):                                webhook_url = Users.get_user_webhook_url_by_id(user.id)                                if webhook_url:                                    await post_webhook(                                        request.app.state.WEBUI_NAME,                                        webhook_url,                                        f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",                                        {                                            "action": "chat",                                            "message": content,                                            "title": title,                                            "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",                                        },                                    )                            await background_tasks_handler()                    if events and isinstance(events, list):                        extra_response = {}                        for event in events:                            if isinstance(event, dict):                                extra_response.update(event)                            else:                                extra_response[event] = True                        response_data = {                            **extra_response,                            **response_data,                        }                    if isinstance(response, dict):                        response = response_data                    if isinstance(response, JSONResponse):                        response = JSONResponse(                            content=response_data,                            headers=response.headers,                            status_code=response.status_code,                        )            except Exception as e:                log.debug(f"Error occurred while processing request: {e}")                pass            return response        else:            if events and isinstance(events, list) and isinstance(response, dict):                extra_response = {}                for event in events:                    if isinstance(event, dict):                        extra_response.update(event)                    else:                        extra_response[event] = True                response = {                    **extra_response,                    **response,                }            return response    # Non standard response    if not any(        content_type in response.headers["Content-Type"]        for content_type in ["text/event-stream", "application/x-ndjson"]    ):        return response    oauth_token = None    try:        oauth_token = request.app.state.oauth_manager.get_oauth_token(            user.id,            request.cookies.get("oauth_session_id", None),        )    except Exception as e:        log.error(f"Error getting OAuth token: {e}")    extra_params = {        "__event_emitter__": event_emitter,        "__event_call__": event_caller,        "__user__": user.model_dump() if isinstance(user, UserModel) else {},        "__metadata__": metadata,        "__oauth_token__": oauth_token,        "__request__": request,        "__model__": model,    }    filter_functions = [        Functions.get_function_by_id(filter_id)        for filter_id in get_sorted_filter_ids(            request, model, metadata.get("filter_ids", [])        )    ]    # Streaming response    if event_emitter and event_caller:        task_id = str(uuid4())  # Create a unique task ID.        model_id = form_data.get("model", "")        def split_content_and_whitespace(content):            content_stripped = content.rstrip()            original_whitespace = (                content[len(content_stripped) :]                if len(content) > len(content_stripped)                else ""            )            return content_stripped, original_whitespace        def is_opening_code_block(content):            backtick_segments = content.split("```")            # Even number of segments means the last backticks are opening a new block            return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0        # Handle as a background task        async def response_handler(response, events):            def serialize_content_blocks(content_blocks, raw=False):                content = ""                for block in content_blocks:                    if block["type"] == "text":                        block_content = block["content"].strip()                        if block_content:                            content = f"{content}{block_content}\n"                    elif block["type"] == "tool_calls":                        attributes = block.get("attributes", {})                        tool_calls = block.get("content", [])                        results = block.get("results", [])                        if content and not content.endswith("\n"):                            content += "\n"                        if results:                            tool_calls_display_content = ""                            for tool_call in tool_calls:                                tool_call_id = tool_call.get("id", "")                                tool_name = tool_call.get("function", {}).get(                                    "name", ""                                )                                tool_arguments = tool_call.get("function", {}).get(                                    "arguments", ""                                )                                tool_result = None                                tool_result_files = None                                for result in results:                                    if tool_call_id == result.get("tool_call_id", ""):                                        tool_result = result.get("content", None)                                        tool_result_files = result.get("files", None)                                        break                                if tool_result is not None:                                    tool_calls_display_content = f'{tool_calls_display_content}<details type="tool_calls" done="true" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}" result="{html.escape(json.dumps(tool_result, ensure_ascii=False))}" files="{html.escape(json.dumps(tool_result_files)) if tool_result_files else ""}">\n<summary>Tool Executed</summary>\n</details>\n'                                else:                                    tool_calls_display_content = f'{tool_calls_display_content}<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>\n'                            if not raw:                                content = f"{content}{tool_calls_display_content}"                        else:                            tool_calls_display_content = ""                            for tool_call in tool_calls:                                tool_call_id = tool_call.get("id", "")                                tool_name = tool_call.get("function", {}).get(                                    "name", ""                                )                                tool_arguments = tool_call.get("function", {}).get(                                    "arguments", ""                                )                                tool_calls_display_content = f'{tool_calls_display_content}\n<details type="tool_calls" done="false" id="{tool_call_id}" name="{tool_name}" arguments="{html.escape(json.dumps(tool_arguments))}">\n<summary>Executing...</summary>\n</details>\n'                            if not raw:                                content = f"{content}{tool_calls_display_content}"                    elif block["type"] == "reasoning":                        reasoning_display_content = "\n".join(                            (f"> {line}" if not line.startswith(">") else line)                            for line in block["content"].splitlines()                        )                        reasoning_duration = block.get("duration", None)                        start_tag = block.get("start_tag", "")                        end_tag = block.get("end_tag", "")                        if content and not content.endswith("\n"):                            content += "\n"                        if reasoning_duration is not None:                            if raw:                                content = (                                    f'{content}{start_tag}{block["content"]}{end_tag}\n'                                )                            else:                                content = f'{content}<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'                        else:                            if raw:                                content = (                                    f'{content}{start_tag}{block["content"]}{end_tag}\n'                                )                            else:                                content = f'{content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'                    elif block["type"] == "code_interpreter":                        attributes = block.get("attributes", {})                        output = block.get("output", None)                        lang = attributes.get("lang", "")                        content_stripped, original_whitespace = (                            split_content_and_whitespace(content)                        )                        if is_opening_code_block(content_stripped):                            # Remove trailing backticks that would open a new block                            content = (                                content_stripped.rstrip("`").rstrip()                                + original_whitespace                            )                        else:                            # Keep content as is - either closing backticks or no backticks                            content = content_stripped + original_whitespace                        if content and not content.endswith("\n"):                            content += "\n"                        if output:                            output = html.escape(json.dumps(output))                            if raw:                                content = f'{content}<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n```output\n{output}\n```\n'                            else:                                content = f'{content}<details type="code_interpreter" done="true" output="{output}">\n<summary>Analyzed</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'                        else:                            if raw:                                content = f'{content}<code_interpreter type="code" lang="{lang}">\n{block["content"]}\n</code_interpreter>\n'                            else:                                content = f'{content}<details type="code_interpreter" done="false">\n<summary>Analyzing...</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'                    else:                        block_content = str(block["content"]).strip()                        if block_content:                            content = f"{content}{block['type']}: {block_content}\n"                return content.strip()            def convert_content_blocks_to_messages(content_blocks, raw=False):                messages = []                temp_blocks = []                for idx, block in enumerate(content_blocks):                    if block["type"] == "tool_calls":                        messages.append(                            {                                "role": "assistant",                                "content": serialize_content_blocks(temp_blocks, raw),                                "tool_calls": block.get("content"),                            }                        )                        results = block.get("results", [])                        for result in results:                            messages.append(                                {                                    "role": "tool",                                    "tool_call_id": result["tool_call_id"],                                    "content": result.get("content", "") or "",                                }                            )                        temp_blocks = []                    else:                        temp_blocks.append(block)                if temp_blocks:                    content = serialize_content_blocks(temp_blocks, raw)                    if content:                        messages.append(                            {                                "role": "assistant",                                "content": content,                            }                        )                return messages            def tag_content_handler(content_type, tags, content, content_blocks):                end_flag = False                def extract_attributes(tag_content):                    """Extract attributes from a tag if they exist."""                    attributes = {}                    if not tag_content:  # Ensure tag_content is not None                        return attributes                    # Match attributes in the format: key="value" (ignores single quotes for simplicity)                    matches = re.findall(r'(\w+)\s*=\s*"([^"]+)"', tag_content)                    for key, value in matches:                        attributes[key] = value                    return attributes                if content_blocks[-1]["type"] == "text":                    for start_tag, end_tag in tags:                        start_tag_pattern = rf"{re.escape(start_tag)}"                        if start_tag.startswith("<") and start_tag.endswith(">"):                            # Match start tag e.g., <tag> or <tag attr="value">                            # remove both '<' and '>' from start_tag                            # Match start tag with attributes                            start_tag_pattern = (                                rf"<{re.escape(start_tag[1:-1])}(\s.*?)?>"                            )                        match = re.search(start_tag_pattern, content)                        if match:                            try:                                attr_content = (                                    match.group(1) if match.group(1) else ""                                )  # Ensure it's not None                            except:                                attr_content = ""                            attributes = extract_attributes(                                attr_content                            )  # Extract attributes safely                            # Capture everything before and after the matched tag                            before_tag = content[                                : match.start()                            ]  # Content before opening tag                            after_tag = content[                                match.end() :                            ]  # Content after opening tag                            # Remove the start tag and after from the currently handling text block                            content_blocks[-1]["content"] = content_blocks[-1][                                "content"                            ].replace(match.group(0) + after_tag, "")                            if before_tag:                                content_blocks[-1]["content"] = before_tag                            if not content_blocks[-1]["content"]:                                content_blocks.pop()                            # Append the new block                            content_blocks.append(                                {                                    "type": content_type,                                    "start_tag": start_tag,                                    "end_tag": end_tag,                                    "attributes": attributes,                                    "content": "",                                    "started_at": time.time(),                                }                            )                            if after_tag:                                content_blocks[-1]["content"] = after_tag                                tag_content_handler(                                    content_type, tags, after_tag, content_blocks                                )                            break                elif content_blocks[-1]["type"] == content_type:                    start_tag = content_blocks[-1]["start_tag"]                    end_tag = content_blocks[-1]["end_tag"]                    if end_tag.startswith("<") and end_tag.endswith(">"):                        # Match end tag e.g., </tag>                        end_tag_pattern = rf"{re.escape(end_tag)}"                    else:                        # Handle cases where end_tag is just a tag name                        end_tag_pattern = rf"{re.escape(end_tag)}"                    # Check if the content has the end tag                    if re.search(end_tag_pattern, content):                        end_flag = True                        block_content = content_blocks[-1]["content"]                        # Strip start and end tags from the content                        start_tag_pattern = rf"<{re.escape(start_tag)}(.*?)>"                        block_content = re.sub(                            start_tag_pattern, "", block_content                        ).strip()                        end_tag_regex = re.compile(end_tag_pattern, re.DOTALL)                        split_content = end_tag_regex.split(block_content, maxsplit=1)                        # Content inside the tag                        block_content = (                            split_content[0].strip() if split_content else ""                        )                        # Leftover content (everything after `</tag>`)                        leftover_content = (                            split_content[1].strip() if len(split_content) > 1 else ""                        )                        if block_content:                            content_blocks[-1]["content"] = block_content                            content_blocks[-1]["ended_at"] = time.time()                            content_blocks[-1]["duration"] = int(                                content_blocks[-1]["ended_at"]                                - content_blocks[-1]["started_at"]                            )                            # Reset the content_blocks by appending a new text block                            if content_type != "code_interpreter":                                if leftover_content:                                    content_blocks.append(                                        {                                            "type": "text",                                            "content": leftover_content,                                        }                                    )                                else:                                    content_blocks.append(                                        {                                            "type": "text",                                            "content": "",                                        }                                    )                        else:                            # Remove the block if content is empty                            content_blocks.pop()                            if leftover_content:                                content_blocks.append(                                    {                                        "type": "text",                                        "content": leftover_content,                                    }                                )                            else:                                content_blocks.append(                                    {                                        "type": "text",                                        "content": "",                                    }                                )                        # Clean processed content                        start_tag_pattern = rf"{re.escape(start_tag)}"                        if start_tag.startswith("<") and start_tag.endswith(">"):                            # Match start tag e.g., <tag> or <tag attr="value">                            # remove both '<' and '>' from start_tag                            # Match start tag with attributes                            start_tag_pattern = (                                rf"<{re.escape(start_tag[1:-1])}(\s.*?)?>"                            )                        content = re.sub(                            rf"{start_tag_pattern}(.|\n)*?{re.escape(end_tag)}",                            "",                            content,                            flags=re.DOTALL,                        )                return content, content_blocks, end_flag            message = Chats.get_message_by_id_and_message_id(                metadata["chat_id"], metadata["message_id"]            )            tool_calls = []            last_assistant_message = None            try:                if form_data["messages"][-1]["role"] == "assistant":                    last_assistant_message = get_last_assistant_message(                        form_data["messages"]                    )            except Exception as e:                pass            content = (                message.get("content", "")                if message                else last_assistant_message if last_assistant_message else ""            )            content_blocks = [                {                    "type": "text",                    "content": content,                }            ]            reasoning_tags_param = metadata.get("params", {}).get("reasoning_tags")            DETECT_REASONING_TAGS = reasoning_tags_param is not False            DETECT_CODE_INTERPRETER = metadata.get("features", {}).get(                "code_interpreter", False            )            reasoning_tags = []            if DETECT_REASONING_TAGS:                if (                    isinstance(reasoning_tags_param, list)                    and len(reasoning_tags_param) == 2                ):                    reasoning_tags = [                        (reasoning_tags_param[0], reasoning_tags_param[1])                    ]                else:                    reasoning_tags = DEFAULT_REASONING_TAGS            try:                for event in events:                    await event_emitter(                        {                            "type": "chat:completion",                            "data": event,                        }                    )                    # Save message in the database                    Chats.upsert_message_to_chat_by_id_and_message_id(                        metadata["chat_id"],                        metadata["message_id"],                        {                            **event,                        },                    )                async def stream_body_handler(response, form_data):                    nonlocal content                    nonlocal content_blocks                    response_tool_calls = []                    delta_count = 0                    delta_chunk_size = max(                        CHAT_RESPONSE_STREAM_DELTA_CHUNK_SIZE,                        int(                            metadata.get("params", {}).get("stream_delta_chunk_size")                            or 1                        ),                    )                    last_delta_data = None                    async def flush_pending_delta_data(threshold: int = 0):                        nonlocal delta_count                        nonlocal last_delta_data                        if delta_count >= threshold and last_delta_data:                            await event_emitter(                                {                                    "type": "chat:completion",                                    "data": last_delta_data,                                }                            )                            delta_count = 0                            last_delta_data = None                    async for line in response.body_iterator:                        line = line.decode("utf-8") if isinstance(line, bytes) else line                        data = line                        # Skip empty lines                        if not data.strip():                            continue                        # "data:" is the prefix for each event                        if not data.startswith("data:"):                            continue                        # Remove the prefix                        data = data[len("data:") :].strip()                        try:                            data = json.loads(data)                            data, _ = await process_filter_functions(                                request=request,                                filter_functions=filter_functions,                                filter_type="stream",                                form_data=data,                                extra_params={"__body__": form_data, **extra_params},                            )                            if data:                                if "event" in data:                                    await event_emitter(data.get("event", {}))                                if "selected_model_id" in data:                                    model_id = data["selected_model_id"]                                    Chats.upsert_message_to_chat_by_id_and_message_id(                                        metadata["chat_id"],                                        metadata["message_id"],                                        {                                            "selectedModelId": model_id,                                        },                                    )                                    await event_emitter(                                        {                                            "type": "chat:completion",                                            "data": data,                                        }                                    )                                else:                                    choices = data.get("choices", [])                                    if not choices:                                        error = data.get("error", {})                                        if error:                                            await event_emitter(                                                {                                                    "type": "chat:completion",                                                    "data": {                                                        "error": error,                                                    },                                                }                                            )                                        usage = data.get("usage", {})                                        usage.update(                                            data.get("timing", {})                                        )  # llama.cpp                                        if usage:                                            await event_emitter(                                                {                                                    "type": "chat:completion",                                                    "data": {                                                        "usage": usage,                                                    },                                                }                                            )                                        continue                                    delta = choices[0].get("delta", {})                                    delta_tool_calls = delta.get("tool_calls", None)                                    if delta_tool_calls:                                        for delta_tool_call in delta_tool_calls:                                            tool_call_index = delta_tool_call.get(                                                "index"                                            )                                            if tool_call_index is not None:                                                # Check if the tool call already exists                                                current_response_tool_call = None                                                for (                                                    response_tool_call                                                ) in response_tool_calls:                                                    if (                                                        response_tool_call.get("index")                                                        == tool_call_index                                                    ):                                                        current_response_tool_call = (                                                            response_tool_call                                                        )                                                        break                                                if current_response_tool_call is None:                                                    # Add the new tool call                                                    delta_tool_call.setdefault(                                                        "function", {}                                                    )                                                    delta_tool_call[                                                        "function"                                                    ].setdefault("name", "")                                                    delta_tool_call[                                                        "function"                                                    ].setdefault("arguments", "")                                                    response_tool_calls.append(                                                        delta_tool_call                                                    )                                                else:                                                    # Update the existing tool call                                                    delta_name = delta_tool_call.get(                                                        "function", {}                                                    ).get("name")                                                    delta_arguments = (                                                        delta_tool_call.get(                                                            "function", {}                                                        ).get("arguments")                                                    )                                                    if delta_name:                                                        current_response_tool_call[                                                            "function"                                                        ]["name"] += delta_name                                                    if delta_arguments:                                                        current_response_tool_call[                                                            "function"                                                        ][                                                            "arguments"                                                        ] += delta_arguments                                    value = delta.get("content")                                    reasoning_content = (                                        delta.get("reasoning_content")                                        or delta.get("reasoning")                                        or delta.get("thinking")                                    )                                    if reasoning_content:                                        if (                                            not content_blocks                                            or content_blocks[-1]["type"] != "reasoning"                                        ):                                            reasoning_block = {                                                "type": "reasoning",                                                "start_tag": "<think>",                                                "end_tag": "</think>",                                                "attributes": {                                                    "type": "reasoning_content"                                                },                                                "content": "",                                                "started_at": time.time(),                                            }                                            content_blocks.append(reasoning_block)                                        else:                                            reasoning_block = content_blocks[-1]                                        reasoning_block["content"] += reasoning_content                                        data = {                                            "content": serialize_content_blocks(                                                content_blocks                                            )                                        }                                    if value:                                        if (                                            content_blocks                                            and content_blocks[-1]["type"]                                            == "reasoning"                                            and content_blocks[-1]                                            .get("attributes", {})                                            .get("type")                                            == "reasoning_content"                                        ):                                            reasoning_block = content_blocks[-1]                                            reasoning_block["ended_at"] = time.time()                                            reasoning_block["duration"] = int(                                                reasoning_block["ended_at"]                                                - reasoning_block["started_at"]                                            )                                            content_blocks.append(                                                {                                                    "type": "text",                                                    "content": "",                                                }                                            )                                        content = f"{content}{value}"                                        if not content_blocks:                                            content_blocks.append(                                                {                                                    "type": "text",                                                    "content": "",                                                }                                            )                                        content_blocks[-1]["content"] = (                                            content_blocks[-1]["content"] + value                                        )                                        if DETECT_REASONING_TAGS:                                            content, content_blocks, _ = (                                                tag_content_handler(                                                    "reasoning",                                                    reasoning_tags,                                                    content,                                                    content_blocks,                                                )                                            )                                            content, content_blocks, _ = (                                                tag_content_handler(                                                    "solution",                                                    DEFAULT_SOLUTION_TAGS,                                                    content,                                                    content_blocks,                                                )                                            )                                        if DETECT_CODE_INTERPRETER:                                            content, content_blocks, end = (                                                tag_content_handler(                                                    "code_interpreter",                                                    DEFAULT_CODE_INTERPRETER_TAGS,                                                    content,                                                    content_blocks,                                                )                                            )                                            if end:                                                break                                        if ENABLE_REALTIME_CHAT_SAVE:                                            # Save message in the database                                            Chats.upsert_message_to_chat_by_id_and_message_id(                                                metadata["chat_id"],                                                metadata["message_id"],                                                {                                                    "content": serialize_content_blocks(                                                        content_blocks                                                    ),                                                },                                            )                                        else:                                            data = {                                                "content": serialize_content_blocks(                                                    content_blocks                                                ),                                            }                                if delta:                                    delta_count += 1                                    last_delta_data = data                                    if delta_count >= delta_chunk_size:                                        await flush_pending_delta_data(delta_chunk_size)                                else:                                    await event_emitter(                                        {                                            "type": "chat:completion",                                            "data": data,                                        }                                    )                        except Exception as e:                            done = "data: [DONE]" in line                            if done:                                pass                            else:                                log.debug(f"Error: {e}")                                continue                    await flush_pending_delta_data()                    if content_blocks:                        # Clean up the last text block                        if content_blocks[-1]["type"] == "text":                            content_blocks[-1]["content"] = content_blocks[-1][                                "content"                            ].strip()                            if not content_blocks[-1]["content"]:                                content_blocks.pop()                                if not content_blocks:                                    content_blocks.append(                                        {                                            "type": "text",                                            "content": "",                                        }                                    )                        if content_blocks[-1]["type"] == "reasoning":                            reasoning_block = content_blocks[-1]                            if reasoning_block.get("ended_at") is None:                                reasoning_block["ended_at"] = time.time()                                reasoning_block["duration"] = int(                                    reasoning_block["ended_at"]                                    - reasoning_block["started_at"]                                )                    if response_tool_calls:                        tool_calls.append(response_tool_calls)                    if response.background:                        await response.background()                await stream_body_handler(response, form_data)                tool_call_retries = 0                while (                    len(tool_calls) > 0                    and tool_call_retries < CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES                ):                    tool_call_retries += 1                    response_tool_calls = tool_calls.pop(0)                    content_blocks.append(                        {                            "type": "tool_calls",                            "content": response_tool_calls,                        }                    )                    await event_emitter(                        {                            "type": "chat:completion",                            "data": {                                "content": serialize_content_blocks(content_blocks),                            },                        }                    )                    tools = metadata.get("tools", {})                    results = []                    for tool_call in response_tool_calls:                        tool_call_id = tool_call.get("id", "")                        tool_name = tool_call.get("function", {}).get("name", "")                        tool_args = tool_call.get("function", {}).get("arguments", "{}")                        tool_function_params = {}                        try:                            # json.loads cannot be used because some models do not produce valid JSON                            tool_function_params = ast.literal_eval(tool_args)                        except Exception as e:                            log.debug(e)                            # Fallback to JSON parsing                            try:                                tool_function_params = json.loads(tool_args)                            except Exception as e:                                log.error(                                    f"Error parsing tool call arguments: {tool_args}"                                )                        # Mutate the original tool call response params as they are passed back to the passed                        # back to the LLM via the content blocks. If they are in a json block and are invalid json,                        # this can cause downstream LLM integrations to fail (e.g. bedrock gateway) where response                        # params are not valid json.                        # Main case so far is no args = "" = invalid json.                        log.debug(                            f"Parsed args from {tool_args} to {tool_function_params}"                        )                        tool_call.setdefault("function", {})["arguments"] = json.dumps(                            tool_function_params                        )                        tool_result = None                        if tool_name in tools:                            tool = tools[tool_name]                            spec = tool.get("spec", {})                            try:                                allowed_params = (                                    spec.get("parameters", {})                                    .get("properties", {})                                    .keys()                                )                                tool_function_params = {                                    k: v                                    for k, v in tool_function_params.items()                                    if k in allowed_params                                }                                if tool.get("direct", False):                                    tool_result = await event_caller(                                        {                                            "type": "execute:tool",                                            "data": {                                                "id": str(uuid4()),                                                "name": tool_name,                                                "params": tool_function_params,                                                "server": tool.get("server", {}),                                                "session_id": metadata.get(                                                    "session_id", None                                                ),                                            },                                        }                                    )                                else:                                    tool_function = tool["callable"]                                    tool_result = await tool_function(                                        **tool_function_params                                    )                            except Exception as e:                                tool_result = str(e)                        tool_result_files = []                        if isinstance(tool_result, list):                            for item in tool_result:                                # check if string                                if isinstance(item, str) and item.startswith("data:"):                                    tool_result_files.append(item)                                    tool_result.remove(item)                        if isinstance(tool_result, dict) or isinstance(                            tool_result, list                        ):                            tool_result = json.dumps(                                tool_result, indent=2, ensure_ascii=False                            )                        results.append(                            {                                "tool_call_id": tool_call_id,                                "content": tool_result or "",                                **(                                    {"files": tool_result_files}                                    if tool_result_files                                    else {}                                ),                            }                        )                    content_blocks[-1]["results"] = results                    content_blocks.append(                        {                            "type": "text",                            "content": "",                        }                    )                    await event_emitter(                        {                            "type": "chat:completion",                            "data": {                                "content": serialize_content_blocks(content_blocks),                            },                        }                    )                    try:                        new_form_data = {                            "model": model_id,                            "stream": True,                            "tools": form_data["tools"],                            "messages": [                                *form_data["messages"],                                *convert_content_blocks_to_messages(                                    content_blocks, True                                ),                            ],                        }                        res = await generate_chat_completion(                            request,                            new_form_data,                            user,                        )                        if isinstance(res, StreamingResponse):                            await stream_body_handler(res, new_form_data)                        else:                            break                    except Exception as e:                        log.debug(e)                        break                if DETECT_CODE_INTERPRETER:                    MAX_RETRIES = 5                    retries = 0                    while (                        content_blocks[-1]["type"] == "code_interpreter"                        and retries < MAX_RETRIES                    ):                        await event_emitter(                            {                                "type": "chat:completion",                                "data": {                                    "content": serialize_content_blocks(content_blocks),                                },                            }                        )                        retries += 1                        log.debug(f"Attempt count: {retries}")                        output = ""                        try:                            if content_blocks[-1]["attributes"].get("type") == "code":                                code = content_blocks[-1]["content"]                                if CODE_INTERPRETER_BLOCKED_MODULES:                                    blocking_code = textwrap.dedent(                                        f"""                                        import builtins                                        BLOCKED_MODULES = {CODE_INTERPRETER_BLOCKED_MODULES}                                        _real_import = builtins.__import__                                        def restricted_import(name, globals=None, locals=None, fromlist=(), level=0):                                            if name.split('.')[0] in BLOCKED_MODULES:                                                importer_name = globals.get('__name__') if globals else None                                                if importer_name == '__main__':                                                    raise ImportError(                                                        f"Direct import of module {{name}} is restricted."                                                    )                                            return _real_import(name, globals, locals, fromlist, level)                                        builtins.__import__ = restricted_import                                    """                                    )                                    code = blocking_code + "\n" + code                                if (                                    request.app.state.config.CODE_INTERPRETER_ENGINE                                    == "pyodide"                                ):                                    output = await event_caller(                                        {                                            "type": "execute:python",                                            "data": {                                                "id": str(uuid4()),                                                "code": code,                                                "session_id": metadata.get(                                                    "session_id", None                                                ),                                            },                                        }                                    )                                elif (                                    request.app.state.config.CODE_INTERPRETER_ENGINE                                    == "jupyter"                                ):                                    output = await execute_code_jupyter(                                        request.app.state.config.CODE_INTERPRETER_JUPYTER_URL,                                        code,                                        (                                            request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_TOKEN                                            if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH                                            == "token"                                            else None                                        ),                                        (                                            request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD                                            if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH                                            == "password"                                            else None                                        ),                                        request.app.state.config.CODE_INTERPRETER_JUPYTER_TIMEOUT,                                    )                                else:                                    output = {                                        "stdout": "Code interpreter engine not configured."                                    }                                log.debug(f"Code interpreter output: {output}")                                if isinstance(output, dict):                                    stdout = output.get("stdout", "")                                    if isinstance(stdout, str):                                        stdoutLines = stdout.split("\n")                                        for idx, line in enumerate(stdoutLines):                                            if "data:image/png;base64" in line:                                                image_url = ""                                                # Extract base64 image data from the line                                                image_data, content_type = (                                                    load_b64_image_data(line)                                                )                                                if image_data is not None:                                                    image_url = upload_image(                                                        request,                                                        image_data,                                                        content_type,                                                        metadata,                                                        user,                                                    )                                                stdoutLines[idx] = (                                                    f""                                                )                                        output["stdout"] = "\n".join(stdoutLines)                                    result = output.get("result", "")                                    if isinstance(result, str):                                        resultLines = result.split("\n")                                        for idx, line in enumerate(resultLines):                                            if "data:image/png;base64" in line:                                                image_url = ""                                                # Extract base64 image data from the line                                                image_data, content_type = (                                                    load_b64_image_data(line)                                                )                                                if image_data is not None:                                                    image_url = upload_image(                                                        request,                                                        image_data,                                                        content_type,                                                        metadata,                                                        user,                                                    )                                                resultLines[idx] = (                                                    f""                                                )                                        output["result"] = "\n".join(resultLines)                        except Exception as e:                            output = str(e)                        content_blocks[-1]["output"] = output                        content_blocks.append(                            {                                "type": "text",                                "content": "",                            }                        )                        await event_emitter(                            {                                "type": "chat:completion",                                "data": {                                    "content": serialize_content_blocks(content_blocks),                                },                            }                        )                        try:                            new_form_data = {                                "model": model_id,                                "stream": True,                                "messages": [                                    *form_data["messages"],                                    {                                        "role": "assistant",                                        "content": serialize_content_blocks(                                            content_blocks, raw=True                                        ),                                    },                                ],                            }                            res = await generate_chat_completion(                                request,                                new_form_data,                                user,                            )                            if isinstance(res, StreamingResponse):                                await stream_body_handler(res, new_form_data)                            else:                                break                        except Exception as e:                            log.debug(e)                            break                title = Chats.get_chat_title_by_id(metadata["chat_id"])                data = {                    "done": True,                    "content": serialize_content_blocks(content_blocks),                    "title": title,                }                if not ENABLE_REALTIME_CHAT_SAVE:                    # Save message in the database                    Chats.upsert_message_to_chat_by_id_and_message_id(                        metadata["chat_id"],                        metadata["message_id"],                        {                            "content": serialize_content_blocks(content_blocks),                        },                    )                # Send a webhook notification if the user is not active                if not get_active_status_by_user_id(user.id):                    webhook_url = Users.get_user_webhook_url_by_id(user.id)                    if webhook_url:                        await post_webhook(                            request.app.state.WEBUI_NAME,                            webhook_url,                            f"{title} - {request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}\n\n{content}",                            {                                "action": "chat",                                "message": content,                                "title": title,                                "url": f"{request.app.state.config.WEBUI_URL}/c/{metadata['chat_id']}",                            },                        )                await event_emitter(                    {                        "type": "chat:completion",                        "data": data,                    }                )                await background_tasks_handler()            except asyncio.CancelledError:                log.warning("Task was cancelled!")                await event_emitter({"type": "chat:tasks:cancel"})                if not ENABLE_REALTIME_CHAT_SAVE:                    # Save message in the database                    Chats.upsert_message_to_chat_by_id_and_message_id(                        metadata["chat_id"],                        metadata["message_id"],                        {                            "content": serialize_content_blocks(content_blocks),                        },                    )            if response.background is not None:                await response.background()        return await response_handler(response, events)    else:        # Fallback to the original response        async def stream_wrapper(original_generator, events):            def wrap_item(item):                return f"data: {item}\n\n"            for event in events:                event, _ = await process_filter_functions(                    request=request,                    filter_functions=filter_functions,                    filter_type="stream",                    form_data=event,                    extra_params=extra_params,                )                if event:                    yield wrap_item(json.dumps(event))            async for data in original_generator:                data, _ = await process_filter_functions(                    request=request,                    filter_functions=filter_functions,                    filter_type="stream",                    form_data=data,                    extra_params=extra_params,                )                if data:                    yield data        return StreamingResponse(            stream_wrapper(response.body_iterator, events),            headers=dict(response.headers),            background=response.background,        )
 |