123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- from fastapi import (
- Depends,
- FastAPI,
- File,
- Form,
- HTTPException,
- Request,
- UploadFile,
- status,
- APIRouter,
- )
- import os
- import logging
- import shutil
- import requests
- from pydantic import BaseModel
- from starlette.responses import FileResponse
- from typing import Optional
- from open_webui.env import SRC_LOG_LEVELS
- from open_webui.config import CACHE_DIR
- from open_webui.constants import ERROR_MESSAGES
- from open_webui.routers.openai import get_all_models_responses
- from open_webui.utils.auth import get_admin_user
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MAIN"])
- ##################################
- #
- # Pipelines Endpoints
- #
- ##################################
- router = APIRouter()
- @router.get("/api/pipelines/list")
- async def get_pipelines_list(request: Request, user=Depends(get_admin_user)):
- responses = await get_all_models_responses(request)
- log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}")
- urlIdxs = [
- idx
- for idx, response in enumerate(responses)
- if response is not None and "pipelines" in response
- ]
- return {
- "data": [
- {
- "url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx],
- "idx": urlIdx,
- }
- for urlIdx in urlIdxs
- ]
- }
- @router.post("/api/pipelines/upload")
- async def upload_pipeline(
- request: Request,
- urlIdx: int = Form(...),
- file: UploadFile = File(...),
- user=Depends(get_admin_user),
- ):
- print("upload_pipeline", urlIdx, file.filename)
- # Check if the uploaded file is a python file
- if not (file.filename and file.filename.endswith(".py")):
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail="Only Python (.py) files are allowed.",
- )
- upload_folder = f"{CACHE_DIR}/pipelines"
- os.makedirs(upload_folder, exist_ok=True)
- file_path = os.path.join(upload_folder, file.filename)
- r = None
- try:
- # Save the uploaded file
- with open(file_path, "wb") as buffer:
- shutil.copyfileobj(file.file, buffer)
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- with open(file_path, "rb") as f:
- files = {"file": f}
- r = requests.post(
- f"{url}/pipelines/upload",
- headers={"Authorization": f"Bearer {key}"},
- files=files,
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- status_code = status.HTTP_404_NOT_FOUND
- if r is not None:
- status_code = r.status_code
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=status_code,
- detail=detail if detail else "Pipeline not found",
- )
- finally:
- # Ensure the file is deleted after the upload is completed or on failure
- if os.path.exists(file_path):
- os.remove(file_path)
- class AddPipelineForm(BaseModel):
- url: str
- urlIdx: int
- @router.post("/api/pipelines/add")
- async def add_pipeline(
- request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user)
- ):
- r = None
- try:
- urlIdx = form_data.urlIdx
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.post(
- f"{url}/pipelines/add",
- headers={"Authorization": f"Bearer {key}"},
- json={"url": form_data.url},
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
- class DeletePipelineForm(BaseModel):
- id: str
- urlIdx: int
- @router.delete("/api/pipelines/delete")
- async def delete_pipeline(
- request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user)
- ):
- r = None
- try:
- urlIdx = form_data.urlIdx
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.delete(
- f"{url}/pipelines/delete",
- headers={"Authorization": f"Bearer {key}"},
- json={"id": form_data.id},
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
- @router.get("/api/pipelines")
- async def get_pipelines(
- request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user)
- ):
- r = None
- try:
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"})
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
- @router.get("/api/pipelines/{pipeline_id}/valves")
- async def get_pipeline_valves(
- request: Request,
- urlIdx: Optional[int],
- pipeline_id: str,
- user=Depends(get_admin_user),
- ):
- r = None
- try:
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.get(
- f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"}
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
- @router.get("/api/pipelines/{pipeline_id}/valves/spec")
- async def get_pipeline_valves_spec(
- request: Request,
- urlIdx: Optional[int],
- pipeline_id: str,
- user=Depends(get_admin_user),
- ):
- r = None
- try:
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.get(
- f"{url}/{pipeline_id}/valves/spec",
- headers={"Authorization": f"Bearer {key}"},
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
- @router.post("/api/pipelines/{pipeline_id}/valves/update")
- async def update_pipeline_valves(
- request: Request,
- urlIdx: Optional[int],
- pipeline_id: str,
- form_data: dict,
- user=Depends(get_admin_user),
- ):
- r = None
- try:
- url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
- key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
- r = requests.post(
- f"{url}/{pipeline_id}/valves/update",
- headers={"Authorization": f"Bearer {key}"},
- json={**form_data},
- )
- r.raise_for_status()
- data = r.json()
- return {**data}
- except Exception as e:
- # Handle connection error here
- print(f"Connection error: {e}")
- detail = None
- if r is not None:
- try:
- res = r.json()
- if "detail" in res:
- detail = res["detail"]
- except Exception:
- pass
- raise HTTPException(
- status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
- detail=detail if detail else "Pipeline not found",
- )
|