123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- import logging
- import os
- import uuid
- from fnmatch import fnmatch
- from pathlib import Path
- from typing import Optional
- from urllib.parse import quote
- from fastapi import (
- APIRouter,
- Depends,
- File,
- HTTPException,
- Request,
- UploadFile,
- status,
- Query,
- )
- from fastapi.responses import FileResponse, StreamingResponse
- from open_webui.constants import ERROR_MESSAGES
- from open_webui.env import SRC_LOG_LEVELS
- from open_webui.models.files import (
- FileForm,
- FileModel,
- FileModelResponse,
- Files,
- )
- from open_webui.models.knowledge import Knowledges
- from open_webui.routers.knowledge import get_knowledge, get_knowledge_list
- from open_webui.routers.retrieval import ProcessFileForm, process_file
- from open_webui.routers.audio import transcribe
- from open_webui.storage.provider import Storage
- from open_webui.utils.auth import get_admin_user, get_verified_user
- from pydantic import BaseModel
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MODELS"])
- router = APIRouter()
- ############################
- # Check if the current user has access to a file through any knowledge bases the user may be in.
- ############################
- def has_access_to_file(
- file_id: Optional[str], access_type: str, user=Depends(get_verified_user)
- ) -> bool:
- file = Files.get_file_by_id(file_id)
- log.debug(f"Checking if user has {access_type} access to file")
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- has_access = False
- knowledge_base_id = file.meta.get("collection_name") if file.meta else None
- if knowledge_base_id:
- knowledge_bases = Knowledges.get_knowledge_bases_by_user_id(
- user.id, access_type
- )
- for knowledge_base in knowledge_bases:
- if knowledge_base.id == knowledge_base_id:
- has_access = True
- break
- return has_access
- ############################
- # Upload File
- ############################
- @router.post("/", response_model=FileModelResponse)
- def upload_file(
- request: Request,
- file: UploadFile = File(...),
- user=Depends(get_verified_user),
- file_metadata: dict = {},
- process: bool = Query(True),
- ):
- log.info(f"file.content_type: {file.content_type}")
- try:
- unsanitized_filename = file.filename
- filename = os.path.basename(unsanitized_filename)
- # replace filename with uuid
- id = str(uuid.uuid4())
- name = filename
- filename = f"{id}_{filename}"
- contents, file_path = Storage.upload_file(file.file, filename)
- file_item = Files.insert_new_file(
- user.id,
- FileForm(
- **{
- "id": id,
- "filename": name,
- "path": file_path,
- "meta": {
- "name": name,
- "content_type": file.content_type,
- "size": len(contents),
- "data": file_metadata,
- },
- }
- ),
- )
- if process:
- try:
- if file.content_type in [
- "audio/mpeg",
- "audio/wav",
- "audio/ogg",
- "audio/x-m4a",
- ]:
- file_path = Storage.get_file(file_path)
- result = transcribe(request, file_path)
- process_file(
- request,
- ProcessFileForm(file_id=id, content=result.get("text", "")),
- user=user,
- )
- elif file.content_type not in ["image/png", "image/jpeg", "image/gif"]:
- process_file(request, ProcessFileForm(file_id=id), user=user)
- file_item = Files.get_file_by_id(id=id)
- except Exception as e:
- log.exception(e)
- log.error(f"Error processing file: {file_item.id}")
- file_item = FileModelResponse(
- **{
- **file_item.model_dump(),
- "error": str(e.detail) if hasattr(e, "detail") else str(e),
- }
- )
- if file_item:
- return file_item
- else:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
- )
- except Exception as e:
- log.exception(e)
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT(e),
- )
- ############################
- # List Files
- ############################
- @router.get("/", response_model=list[FileModelResponse])
- async def list_files(user=Depends(get_verified_user), content: bool = Query(True)):
- if user.role == "admin":
- files = Files.get_files()
- else:
- files = Files.get_files_by_user_id(user.id)
- if not content:
- for file in files:
- del file.data["content"]
- return files
- ############################
- # Search Files
- ############################
- @router.get("/search", response_model=list[FileModelResponse])
- async def search_files(
- filename: str = Query(
- ...,
- description="Filename pattern to search for. Supports wildcards such as '*.txt'",
- ),
- content: bool = Query(True),
- user=Depends(get_verified_user),
- ):
- """
- Search for files by filename with support for wildcard patterns.
- """
- # Get files according to user role
- if user.role == "admin":
- files = Files.get_files()
- else:
- files = Files.get_files_by_user_id(user.id)
- # Get matching files
- matching_files = [
- file for file in files if fnmatch(file.filename.lower(), filename.lower())
- ]
- if not matching_files:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail="No files found matching the pattern.",
- )
- if not content:
- for file in matching_files:
- del file.data["content"]
- return matching_files
- ############################
- # Delete All Files
- ############################
- @router.delete("/all")
- async def delete_all_files(user=Depends(get_admin_user)):
- result = Files.delete_all_files()
- if result:
- try:
- Storage.delete_all_files()
- except Exception as e:
- log.exception(e)
- log.error("Error deleting files")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
- )
- return {"message": "All files deleted successfully"}
- else:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
- )
- ############################
- # Get File By Id
- ############################
- @router.get("/{id}", response_model=Optional[FileModel])
- async def get_file_by_id(id: str, user=Depends(get_verified_user)):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "read", user)
- ):
- return file
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # Get File Data Content By Id
- ############################
- @router.get("/{id}/data/content")
- async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "read", user)
- ):
- return {"content": file.data.get("content", "")}
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # Update File Data Content By Id
- ############################
- class ContentForm(BaseModel):
- content: str
- @router.post("/{id}/data/content/update")
- async def update_file_data_content_by_id(
- request: Request, id: str, form_data: ContentForm, user=Depends(get_verified_user)
- ):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "write", user)
- ):
- try:
- process_file(
- request,
- ProcessFileForm(file_id=id, content=form_data.content),
- user=user,
- )
- file = Files.get_file_by_id(id=id)
- except Exception as e:
- log.exception(e)
- log.error(f"Error processing file: {file.id}")
- return {"content": file.data.get("content", "")}
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # Get File Content By Id
- ############################
- @router.get("/{id}/content")
- async def get_file_content_by_id(
- id: str, user=Depends(get_verified_user), attachment: bool = Query(False)
- ):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "read", user)
- ):
- try:
- file_path = Storage.get_file(file.path)
- file_path = Path(file_path)
- # Check if the file already exists in the cache
- if file_path.is_file():
- # Handle Unicode filenames
- filename = file.meta.get("name", file.filename)
- encoded_filename = quote(filename) # RFC5987 encoding
- content_type = file.meta.get("content_type")
- filename = file.meta.get("name", file.filename)
- encoded_filename = quote(filename)
- headers = {}
- if attachment:
- headers["Content-Disposition"] = (
- f"attachment; filename*=UTF-8''{encoded_filename}"
- )
- else:
- if content_type == "application/pdf" or filename.lower().endswith(
- ".pdf"
- ):
- headers["Content-Disposition"] = (
- f"inline; filename*=UTF-8''{encoded_filename}"
- )
- content_type = "application/pdf"
- elif content_type != "text/plain":
- headers["Content-Disposition"] = (
- f"attachment; filename*=UTF-8''{encoded_filename}"
- )
- return FileResponse(file_path, headers=headers, media_type=content_type)
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- except Exception as e:
- log.exception(e)
- log.error("Error getting file content")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- @router.get("/{id}/content/html")
- async def get_html_file_content_by_id(id: str, user=Depends(get_verified_user)):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "read", user)
- ):
- try:
- file_path = Storage.get_file(file.path)
- file_path = Path(file_path)
- # Check if the file already exists in the cache
- if file_path.is_file():
- log.info(f"file_path: {file_path}")
- return FileResponse(file_path)
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- except Exception as e:
- log.exception(e)
- log.error("Error getting file content")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- @router.get("/{id}/content/{file_name}")
- async def get_file_content_by_id(id: str, user=Depends(get_verified_user)):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "read", user)
- ):
- file_path = file.path
- # Handle Unicode filenames
- filename = file.meta.get("name", file.filename)
- encoded_filename = quote(filename) # RFC5987 encoding
- headers = {
- "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
- }
- if file_path:
- file_path = Storage.get_file(file_path)
- file_path = Path(file_path)
- # Check if the file already exists in the cache
- if file_path.is_file():
- return FileResponse(file_path, headers=headers)
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- else:
- # File path doesn’t exist, return the content as .txt if possible
- file_content = file.content.get("content", "")
- file_name = file.filename
- # Create a generator that encodes the file content
- def generator():
- yield file_content.encode("utf-8")
- return StreamingResponse(
- generator(),
- media_type="text/plain",
- headers=headers,
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # Delete File By Id
- ############################
- @router.delete("/{id}")
- async def delete_file_by_id(id: str, user=Depends(get_verified_user)):
- file = Files.get_file_by_id(id)
- if not file:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- file.user_id == user.id
- or user.role == "admin"
- or has_access_to_file(id, "write", user)
- ):
- # We should add Chroma cleanup here
- result = Files.delete_file_by_id(id)
- if result:
- try:
- Storage.delete_file(file.path)
- except Exception as e:
- log.exception(e)
- log.error("Error deleting files")
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
- )
- return {"message": "File deleted successfully"}
- else:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error deleting file"),
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
|