pipelines.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. from fastapi import (
  2. Depends,
  3. FastAPI,
  4. File,
  5. Form,
  6. HTTPException,
  7. Request,
  8. UploadFile,
  9. status,
  10. APIRouter,
  11. )
  12. import os
  13. import logging
  14. import shutil
  15. import requests
  16. from pydantic import BaseModel
  17. from starlette.responses import FileResponse
  18. from typing import Optional
  19. from open_webui.env import SRC_LOG_LEVELS
  20. from open_webui.config import CACHE_DIR
  21. from open_webui.constants import ERROR_MESSAGES
  22. from open_webui.routers.openai import get_all_models_responses
  23. from open_webui.utils.auth import get_admin_user
  24. log = logging.getLogger(__name__)
  25. log.setLevel(SRC_LOG_LEVELS["MAIN"])
  26. ##################################
  27. #
  28. # Pipelines Endpoints
  29. #
  30. ##################################
  31. router = APIRouter()
  32. @router.get("/api/pipelines/list")
  33. async def get_pipelines_list(request: Request, user=Depends(get_admin_user)):
  34. responses = await get_all_models_responses(request)
  35. log.debug(f"get_pipelines_list: get_openai_models_responses returned {responses}")
  36. urlIdxs = [
  37. idx
  38. for idx, response in enumerate(responses)
  39. if response is not None and "pipelines" in response
  40. ]
  41. return {
  42. "data": [
  43. {
  44. "url": request.app.state.config.OPENAI_API_BASE_URLS[urlIdx],
  45. "idx": urlIdx,
  46. }
  47. for urlIdx in urlIdxs
  48. ]
  49. }
  50. @router.post("/api/pipelines/upload")
  51. async def upload_pipeline(
  52. request: Request,
  53. urlIdx: int = Form(...),
  54. file: UploadFile = File(...),
  55. user=Depends(get_admin_user),
  56. ):
  57. print("upload_pipeline", urlIdx, file.filename)
  58. # Check if the uploaded file is a python file
  59. if not (file.filename and file.filename.endswith(".py")):
  60. raise HTTPException(
  61. status_code=status.HTTP_400_BAD_REQUEST,
  62. detail="Only Python (.py) files are allowed.",
  63. )
  64. upload_folder = f"{CACHE_DIR}/pipelines"
  65. os.makedirs(upload_folder, exist_ok=True)
  66. file_path = os.path.join(upload_folder, file.filename)
  67. r = None
  68. try:
  69. # Save the uploaded file
  70. with open(file_path, "wb") as buffer:
  71. shutil.copyfileobj(file.file, buffer)
  72. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  73. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  74. with open(file_path, "rb") as f:
  75. files = {"file": f}
  76. r = requests.post(
  77. f"{url}/pipelines/upload",
  78. headers={"Authorization": f"Bearer {key}"},
  79. files=files,
  80. )
  81. r.raise_for_status()
  82. data = r.json()
  83. return {**data}
  84. except Exception as e:
  85. # Handle connection error here
  86. print(f"Connection error: {e}")
  87. detail = None
  88. status_code = status.HTTP_404_NOT_FOUND
  89. if r is not None:
  90. status_code = r.status_code
  91. try:
  92. res = r.json()
  93. if "detail" in res:
  94. detail = res["detail"]
  95. except Exception:
  96. pass
  97. raise HTTPException(
  98. status_code=status_code,
  99. detail=detail if detail else "Pipeline not found",
  100. )
  101. finally:
  102. # Ensure the file is deleted after the upload is completed or on failure
  103. if os.path.exists(file_path):
  104. os.remove(file_path)
  105. class AddPipelineForm(BaseModel):
  106. url: str
  107. urlIdx: int
  108. @router.post("/api/pipelines/add")
  109. async def add_pipeline(
  110. request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user)
  111. ):
  112. r = None
  113. try:
  114. urlIdx = form_data.urlIdx
  115. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  116. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  117. r = requests.post(
  118. f"{url}/pipelines/add",
  119. headers={"Authorization": f"Bearer {key}"},
  120. json={"url": form_data.url},
  121. )
  122. r.raise_for_status()
  123. data = r.json()
  124. return {**data}
  125. except Exception as e:
  126. # Handle connection error here
  127. print(f"Connection error: {e}")
  128. detail = None
  129. if r is not None:
  130. try:
  131. res = r.json()
  132. if "detail" in res:
  133. detail = res["detail"]
  134. except Exception:
  135. pass
  136. raise HTTPException(
  137. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  138. detail=detail if detail else "Pipeline not found",
  139. )
  140. class DeletePipelineForm(BaseModel):
  141. id: str
  142. urlIdx: int
  143. @router.delete("/api/pipelines/delete")
  144. async def delete_pipeline(
  145. request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user)
  146. ):
  147. r = None
  148. try:
  149. urlIdx = form_data.urlIdx
  150. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  151. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  152. r = requests.delete(
  153. f"{url}/pipelines/delete",
  154. headers={"Authorization": f"Bearer {key}"},
  155. json={"id": form_data.id},
  156. )
  157. r.raise_for_status()
  158. data = r.json()
  159. return {**data}
  160. except Exception as e:
  161. # Handle connection error here
  162. print(f"Connection error: {e}")
  163. detail = None
  164. if r is not None:
  165. try:
  166. res = r.json()
  167. if "detail" in res:
  168. detail = res["detail"]
  169. except Exception:
  170. pass
  171. raise HTTPException(
  172. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  173. detail=detail if detail else "Pipeline not found",
  174. )
  175. @router.get("/api/pipelines")
  176. async def get_pipelines(
  177. request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user)
  178. ):
  179. r = None
  180. try:
  181. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  182. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  183. r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"})
  184. r.raise_for_status()
  185. data = r.json()
  186. return {**data}
  187. except Exception as e:
  188. # Handle connection error here
  189. print(f"Connection error: {e}")
  190. detail = None
  191. if r is not None:
  192. try:
  193. res = r.json()
  194. if "detail" in res:
  195. detail = res["detail"]
  196. except Exception:
  197. pass
  198. raise HTTPException(
  199. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  200. detail=detail if detail else "Pipeline not found",
  201. )
  202. @router.get("/api/pipelines/{pipeline_id}/valves")
  203. async def get_pipeline_valves(
  204. request: Request,
  205. urlIdx: Optional[int],
  206. pipeline_id: str,
  207. user=Depends(get_admin_user),
  208. ):
  209. r = None
  210. try:
  211. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  212. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  213. r = requests.get(
  214. f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"}
  215. )
  216. r.raise_for_status()
  217. data = r.json()
  218. return {**data}
  219. except Exception as e:
  220. # Handle connection error here
  221. print(f"Connection error: {e}")
  222. detail = None
  223. if r is not None:
  224. try:
  225. res = r.json()
  226. if "detail" in res:
  227. detail = res["detail"]
  228. except Exception:
  229. pass
  230. raise HTTPException(
  231. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  232. detail=detail if detail else "Pipeline not found",
  233. )
  234. @router.get("/api/pipelines/{pipeline_id}/valves/spec")
  235. async def get_pipeline_valves_spec(
  236. request: Request,
  237. urlIdx: Optional[int],
  238. pipeline_id: str,
  239. user=Depends(get_admin_user),
  240. ):
  241. r = None
  242. try:
  243. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  244. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  245. r = requests.get(
  246. f"{url}/{pipeline_id}/valves/spec",
  247. headers={"Authorization": f"Bearer {key}"},
  248. )
  249. r.raise_for_status()
  250. data = r.json()
  251. return {**data}
  252. except Exception as e:
  253. # Handle connection error here
  254. print(f"Connection error: {e}")
  255. detail = None
  256. if r is not None:
  257. try:
  258. res = r.json()
  259. if "detail" in res:
  260. detail = res["detail"]
  261. except Exception:
  262. pass
  263. raise HTTPException(
  264. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  265. detail=detail if detail else "Pipeline not found",
  266. )
  267. @router.post("/api/pipelines/{pipeline_id}/valves/update")
  268. async def update_pipeline_valves(
  269. request: Request,
  270. urlIdx: Optional[int],
  271. pipeline_id: str,
  272. form_data: dict,
  273. user=Depends(get_admin_user),
  274. ):
  275. r = None
  276. try:
  277. url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
  278. key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
  279. r = requests.post(
  280. f"{url}/{pipeline_id}/valves/update",
  281. headers={"Authorization": f"Bearer {key}"},
  282. json={**form_data},
  283. )
  284. r.raise_for_status()
  285. data = r.json()
  286. return {**data}
  287. except Exception as e:
  288. # Handle connection error here
  289. print(f"Connection error: {e}")
  290. detail = None
  291. if r is not None:
  292. try:
  293. res = r.json()
  294. if "detail" in res:
  295. detail = res["detail"]
  296. except Exception:
  297. pass
  298. raise HTTPException(
  299. status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
  300. detail=detail if detail else "Pipeline not found",
  301. )