import logging import os import uuid import json from fnmatch import fnmatch from pathlib import Path from typing import Optional from urllib.parse import quote import asyncio from fastapi import ( BackgroundTasks, APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status, Query, ) from fastapi.responses import FileResponse, StreamingResponse from open_webui.constants import ERROR_MESSAGES from open_webui.env import SRC_LOG_LEVELS from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT from open_webui.models.users import Users from open_webui.models.files import ( FileForm, FileModel, FileModelResponse, Files, ) from open_webui.models.knowledge import Knowledges from open_webui.routers.knowledge import get_knowledge, get_knowledge_list from open_webui.routers.retrieval import ProcessFileForm, process_file from open_webui.routers.audio import transcribe from open_webui.storage.provider import Storage from open_webui.utils.auth import get_admin_user, get_verified_user from pydantic import BaseModel log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() ############################ # Check if the current user has access to a file through any knowledge bases the user may be in. ############################ def has_access_to_file( file_id: Optional[str], access_type: str, user=Depends(get_verified_user) ) -> bool: file = Files.get_file_by_id(file_id) log.debug(f"Checking if user has {access_type} access to file") if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) has_access = False knowledge_base_id = file.meta.get("collection_name") if file.meta else None if knowledge_base_id: knowledge_bases = Knowledges.get_knowledge_bases_by_user_id( user.id, access_type ) for knowledge_base in knowledge_bases: if knowledge_base.id == knowledge_base_id: has_access = True break return has_access ############################ # Upload File ############################ def process_uploaded_file(request, file, file_path, file_item, file_metadata, user): try: if file.content_type: stt_supported_content_types = getattr( request.app.state.config, "STT_SUPPORTED_CONTENT_TYPES", [] ) if any( fnmatch(file.content_type, content_type) for content_type in ( stt_supported_content_types if stt_supported_content_types and any(t.strip() for t in stt_supported_content_types) else ["audio/*", "video/webm"] ) ): file_path = Storage.get_file(file_path) result = transcribe(request, file_path, file_metadata) process_file( request, ProcessFileForm( file_id=file_item.id, content=result.get("text", "") ), user=user, ) elif (not file.content_type.startswith(("image/", "video/"))) or ( request.app.state.config.CONTENT_EXTRACTION_ENGINE == "external" ): process_file(request, ProcessFileForm(file_id=file_item.id), user=user) else: log.info( f"File type {file.content_type} is not provided, but trying to process anyway" ) process_file(request, ProcessFileForm(file_id=file_item.id), user=user) except Exception as e: log.error(f"Error processing file: {file_item.id}") Files.update_file_data_by_id( file_item.id, { "status": "failed", "error": str(e.detail) if hasattr(e, "detail") else str(e), }, ) @router.post("/", response_model=FileModelResponse) def upload_file( request: Request, background_tasks: BackgroundTasks, file: UploadFile = File(...), metadata: Optional[dict | str] = Form(None), process: bool = Query(True), process_in_background: bool = Query(True), user=Depends(get_verified_user), ): return upload_file_handler( request, file=file, metadata=metadata, process=process, process_in_background=process_in_background, user=user, background_tasks=background_tasks, ) def upload_file_handler( request: Request, file: UploadFile = File(...), metadata: Optional[dict | str] = Form(None), process: bool = Query(True), process_in_background: bool = Query(True), user=Depends(get_verified_user), background_tasks: Optional[BackgroundTasks] = None, ): log.info(f"file.content_type: {file.content_type}") if isinstance(metadata, str): try: metadata = json.loads(metadata) except json.JSONDecodeError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Invalid metadata format"), ) file_metadata = metadata if metadata else {} try: unsanitized_filename = file.filename filename = os.path.basename(unsanitized_filename) file_extension = os.path.splitext(filename)[1] # Remove the leading dot from the file extension file_extension = file_extension[1:] if file_extension else "" if process and request.app.state.config.ALLOWED_FILE_EXTENSIONS: request.app.state.config.ALLOWED_FILE_EXTENSIONS = [ ext for ext in request.app.state.config.ALLOWED_FILE_EXTENSIONS if ext ] if file_extension not in request.app.state.config.ALLOWED_FILE_EXTENSIONS: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT( f"File type {file_extension} is not allowed" ), ) # replace filename with uuid id = str(uuid.uuid4()) name = filename filename = f"{id}_{filename}" contents, file_path = Storage.upload_file( file.file, filename, { "OpenWebUI-User-Email": user.email, "OpenWebUI-User-Id": user.id, "OpenWebUI-User-Name": user.name, "OpenWebUI-File-Id": id, }, ) file_item = Files.insert_new_file( user.id, FileForm( **{ "id": id, "filename": name, "path": file_path, "data": { **({"status": "pending"} if process else {}), }, "meta": { "name": name, "content_type": file.content_type, "size": len(contents), "data": file_metadata, }, } ), ) if process: if background_tasks and process_in_background: background_tasks.add_task( process_uploaded_file, request, file, file_path, file_item, file_metadata, user, ) return {"status": True, **file_item.model_dump()} else: process_uploaded_file( request, file, file_path, file_item, file_metadata, user, ) return {"status": True, **file_item.model_dump()} else: if file_item: return file_item else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), ) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), ) ############################ # List Files ############################ @router.get("/", response_model=list[FileModelResponse]) async def list_files(user=Depends(get_verified_user), content: bool = Query(True)): if user.role == "admin": files = Files.get_files() else: files = Files.get_files_by_user_id(user.id) if not content: for file in files: if "content" in file.data: del file.data["content"] return files ############################ # Search Files ############################ @router.get("/search", response_model=list[FileModelResponse]) async def search_files( filename: str = Query( ..., description="Filename pattern to search for. Supports wildcards such as '*.txt'", ), content: bool = Query(True), user=Depends(get_verified_user), ): """ Search for files by filename with support for wildcard patterns. """ # Get files according to user role if user.role == "admin": files = Files.get_files() else: files = Files.get_files_by_user_id(user.id) # Get matching files matching_files = [ file for file in files if fnmatch(file.filename.lower(), filename.lower()) ] if not matching_files: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No files found matching the pattern.", ) if not content: for file in matching_files: if "content" in file.data: del file.data["content"] return matching_files ############################ # Delete All Files ############################ @router.delete("/all") async def delete_all_files(user=Depends(get_admin_user)): result = Files.delete_all_files() if result: try: Storage.delete_all_files() VECTOR_DB_CLIENT.reset() except Exception as e: log.exception(e) log.error("Error deleting files") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), ) return {"message": "All files deleted successfully"} else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), ) ############################ # Get File By Id ############################ @router.get("/{id}", response_model=Optional[FileModel]) async def get_file_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): return file else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) @router.get("/{id}/process/status") async def get_file_process_status( id: str, stream: bool = Query(False), user=Depends(get_verified_user) ): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): if stream: MAX_FILE_PROCESSING_DURATION = 3600 * 2 async def event_stream(file_item): if file_item: for _ in range(MAX_FILE_PROCESSING_DURATION): file_item = Files.get_file_by_id(file_item.id) if file_item: data = file_item.model_dump().get("data", {}) status = data.get("status") if status: event = {"status": status} if status == "failed": event["error"] = data.get("error") yield f"data: {json.dumps(event)}\n\n" if status in ("completed", "failed"): break else: # Legacy break await asyncio.sleep(0.5) else: yield f"data: {json.dumps({'status': 'not_found'})}\n\n" return StreamingResponse( event_stream(file), media_type="text/event-stream", ) else: return {"status": file.data.get("status", "pending")} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Get File Data Content By Id ############################ @router.get("/{id}/data/content") async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): return {"content": file.data.get("content", "")} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Update File Data Content By Id ############################ class ContentForm(BaseModel): content: str @router.post("/{id}/data/content/update") async def update_file_data_content_by_id( request: Request, id: str, form_data: ContentForm, user=Depends(get_verified_user) ): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "write", user) ): try: process_file( request, ProcessFileForm(file_id=id, content=form_data.content), user=user, ) file = Files.get_file_by_id(id=id) except Exception as e: log.exception(e) log.error(f"Error processing file: {file.id}") return {"content": file.data.get("content", "")} else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Get File Content By Id ############################ @router.get("/{id}/content") async def get_file_content_by_id( id: str, user=Depends(get_verified_user), attachment: bool = Query(False) ): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): try: file_path = Storage.get_file(file.path) file_path = Path(file_path) # Check if the file already exists in the cache if file_path.is_file(): # Handle Unicode filenames filename = file.meta.get("name", file.filename) encoded_filename = quote(filename) # RFC5987 encoding content_type = file.meta.get("content_type") filename = file.meta.get("name", file.filename) encoded_filename = quote(filename) headers = {} if attachment: headers["Content-Disposition"] = ( f"attachment; filename*=UTF-8''{encoded_filename}" ) else: if content_type == "application/pdf" or filename.lower().endswith( ".pdf" ): headers["Content-Disposition"] = ( f"inline; filename*=UTF-8''{encoded_filename}" ) content_type = "application/pdf" elif content_type != "text/plain": headers["Content-Disposition"] = ( f"attachment; filename*=UTF-8''{encoded_filename}" ) return FileResponse(file_path, headers=headers, media_type=content_type) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) except Exception as e: log.exception(e) log.error("Error getting file content") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) @router.get("/{id}/content/html") async def get_html_file_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) file_user = Users.get_user_by_id(file.user_id) if not file_user.role == "admin": raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): try: file_path = Storage.get_file(file.path) file_path = Path(file_path) # Check if the file already exists in the cache if file_path.is_file(): log.info(f"file_path: {file_path}") return FileResponse(file_path) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) except Exception as e: log.exception(e) log.error("Error getting file content") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) @router.get("/{id}/content/{file_name}") async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "read", user) ): file_path = file.path # Handle Unicode filenames filename = file.meta.get("name", file.filename) encoded_filename = quote(filename) # RFC5987 encoding headers = { "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}" } if file_path: file_path = Storage.get_file(file_path) file_path = Path(file_path) # Check if the file already exists in the cache if file_path.is_file(): return FileResponse(file_path, headers=headers) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) else: # File path doesn’t exist, return the content as .txt if possible file_content = file.content.get("content", "") file_name = file.filename # Create a generator that encodes the file content def generator(): yield file_content.encode("utf-8") return StreamingResponse( generator(), media_type="text/plain", headers=headers, ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) ############################ # Delete File By Id ############################ @router.delete("/{id}") async def delete_file_by_id(id: str, user=Depends(get_verified_user)): file = Files.get_file_by_id(id) if not file: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, ) if ( file.user_id == user.id or user.role == "admin" or has_access_to_file(id, "write", user) ): result = Files.delete_file_by_id(id) if result: try: Storage.delete_file(file.path) VECTOR_DB_CLIENT.delete(collection_name=f"file-{id}") except Exception as e: log.exception(e) log.error("Error deleting files") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), ) return {"message": "File deleted successfully"} else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), ) else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND, )