123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- from typing import Optional
- import io
- import base64
- from open_webui.models.models import (
- ModelForm,
- ModelModel,
- ModelResponse,
- ModelUserResponse,
- Models,
- )
- from pydantic import BaseModel
- from open_webui.constants import ERROR_MESSAGES
- from fastapi import APIRouter, Depends, HTTPException, Request, status, Response
- from fastapi.responses import FileResponse, StreamingResponse
- from open_webui.utils.auth import get_admin_user, get_verified_user
- from open_webui.utils.access_control import has_access, has_permission
- from open_webui.config import BYPASS_ADMIN_ACCESS_CONTROL, STATIC_DIR
- router = APIRouter()
- ###########################
- # GetModels
- ###########################
- @router.get("/", response_model=list[ModelUserResponse])
- async def get_models(id: Optional[str] = None, user=Depends(get_verified_user)):
- if user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL:
- return Models.get_models()
- else:
- return Models.get_models_by_user_id(user.id)
- ###########################
- # GetBaseModels
- ###########################
- @router.get("/base", response_model=list[ModelResponse])
- async def get_base_models(user=Depends(get_admin_user)):
- return Models.get_base_models()
- ############################
- # CreateNewModel
- ############################
- @router.post("/create", response_model=Optional[ModelModel])
- async def create_new_model(
- request: Request,
- form_data: ModelForm,
- user=Depends(get_verified_user),
- ):
- if user.role != "admin" and not has_permission(
- user.id, "workspace.models", request.app.state.config.USER_PERMISSIONS
- ):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.UNAUTHORIZED,
- )
- model = Models.get_model_by_id(form_data.id)
- if model:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.MODEL_ID_TAKEN,
- )
- else:
- model = Models.insert_new_model(form_data, user.id)
- if model:
- return model
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.DEFAULT(),
- )
- ############################
- # ExportModels
- ############################
- @router.get("/export", response_model=list[ModelModel])
- async def export_models(user=Depends(get_admin_user)):
- return Models.get_models()
- ############################
- # SyncModels
- ############################
- class SyncModelsForm(BaseModel):
- models: list[ModelModel] = []
- @router.post("/sync", response_model=list[ModelModel])
- async def sync_models(
- request: Request, form_data: SyncModelsForm, user=Depends(get_admin_user)
- ):
- return Models.sync_models(user.id, form_data.models)
- ###########################
- # GetModelById
- ###########################
- # Note: We're not using the typical url path param here, but instead using a query parameter to allow '/' in the id
- @router.get("/model", response_model=Optional[ModelResponse])
- async def get_model_by_id(id: str, user=Depends(get_verified_user)):
- model = Models.get_model_by_id(id)
- if model:
- if (
- (user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
- or model.user_id == user.id
- or has_access(user.id, "read", model.access_control)
- ):
- return model
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ###########################
- # GetModelById
- ###########################
- @router.get("/model/profile/image")
- async def get_model_profile_image(id: str, user=Depends(get_verified_user)):
- model = Models.get_model_by_id(id)
- if model:
- if model.meta.profile_image_url:
- if model.meta.profile_image_url.startswith("http"):
- return Response(
- status_code=status.HTTP_302_FOUND,
- headers={"Location": model.meta.profile_image_url},
- )
- elif model.meta.profile_image_url.startswith("data:image"):
- try:
- header, base64_data = model.meta.profile_image_url.split(",", 1)
- image_data = base64.b64decode(base64_data)
- image_buffer = io.BytesIO(image_data)
- return StreamingResponse(
- image_buffer,
- media_type="image/png",
- headers={"Content-Disposition": "inline; filename=image.png"},
- )
- except Exception as e:
- pass
- return FileResponse(f"{STATIC_DIR}/favicon.png")
- else:
- return FileResponse(f"{STATIC_DIR}/favicon.png")
- ############################
- # ToggleModelById
- ############################
- @router.post("/model/toggle", response_model=Optional[ModelResponse])
- async def toggle_model_by_id(id: str, user=Depends(get_verified_user)):
- model = Models.get_model_by_id(id)
- if model:
- if (
- user.role == "admin"
- or model.user_id == user.id
- or has_access(user.id, "write", model.access_control)
- ):
- model = Models.toggle_model_by_id(id)
- if model:
- return model
- else:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.DEFAULT("Error updating function"),
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.UNAUTHORIZED,
- )
- else:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- ############################
- # UpdateModelById
- ############################
- @router.post("/model/update", response_model=Optional[ModelModel])
- async def update_model_by_id(
- id: str,
- form_data: ModelForm,
- user=Depends(get_verified_user),
- ):
- model = Models.get_model_by_id(id)
- if not model:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- model.user_id != user.id
- and not has_access(user.id, "write", model.access_control)
- and user.role != "admin"
- ):
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
- )
- model = Models.update_model_by_id(id, form_data)
- return model
- ############################
- # DeleteModelById
- ############################
- @router.delete("/model/delete", response_model=bool)
- async def delete_model_by_id(id: str, user=Depends(get_verified_user)):
- model = Models.get_model_by_id(id)
- if not model:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.NOT_FOUND,
- )
- if (
- user.role != "admin"
- and model.user_id != user.id
- and not has_access(user.id, "write", model.access_control)
- ):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail=ERROR_MESSAGES.UNAUTHORIZED,
- )
- result = Models.delete_model_by_id(id)
- return result
- @router.delete("/delete/all", response_model=bool)
- async def delete_all_models(user=Depends(get_admin_user)):
- result = Models.delete_all_models()
- return result
|