1
0

folders.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. import logging
  2. import time
  3. import uuid
  4. from typing import Optional
  5. from open_webui.internal.db import Base, get_db
  6. from open_webui.models.chats import Chats
  7. from open_webui.env import SRC_LOG_LEVELS
  8. from pydantic import BaseModel, ConfigDict
  9. from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
  10. from open_webui.utils.access_control import get_permissions
  11. log = logging.getLogger(__name__)
  12. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  13. ####################
  14. # Folder DB Schema
  15. ####################
  16. class Folder(Base):
  17. __tablename__ = "folder"
  18. id = Column(Text, primary_key=True)
  19. parent_id = Column(Text, nullable=True)
  20. user_id = Column(Text)
  21. name = Column(Text)
  22. items = Column(JSON, nullable=True)
  23. meta = Column(JSON, nullable=True)
  24. data = Column(JSON, nullable=True)
  25. is_expanded = Column(Boolean, default=False)
  26. created_at = Column(BigInteger)
  27. updated_at = Column(BigInteger)
  28. class FolderModel(BaseModel):
  29. id: str
  30. parent_id: Optional[str] = None
  31. user_id: str
  32. name: str
  33. items: Optional[dict] = None
  34. meta: Optional[dict] = None
  35. data: Optional[dict] = None
  36. is_expanded: bool = False
  37. created_at: int
  38. updated_at: int
  39. model_config = ConfigDict(from_attributes=True)
  40. ####################
  41. # Forms
  42. ####################
  43. class FolderForm(BaseModel):
  44. name: str
  45. data: Optional[dict] = None
  46. model_config = ConfigDict(extra="allow")
  47. class FolderTable:
  48. def insert_new_folder(
  49. self, user_id: str, form_data: FolderForm, parent_id: Optional[str] = None
  50. ) -> Optional[FolderModel]:
  51. with get_db() as db:
  52. id = str(uuid.uuid4())
  53. folder = FolderModel(
  54. **{
  55. "id": id,
  56. "user_id": user_id,
  57. **(form_data.model_dump(exclude_unset=True) or {}),
  58. "parent_id": parent_id,
  59. "created_at": int(time.time()),
  60. "updated_at": int(time.time()),
  61. }
  62. )
  63. try:
  64. result = Folder(**folder.model_dump())
  65. db.add(result)
  66. db.commit()
  67. db.refresh(result)
  68. if result:
  69. return FolderModel.model_validate(result)
  70. else:
  71. return None
  72. except Exception as e:
  73. log.exception(f"Error inserting a new folder: {e}")
  74. return None
  75. def get_folder_by_id_and_user_id(
  76. self, id: str, user_id: str
  77. ) -> Optional[FolderModel]:
  78. try:
  79. with get_db() as db:
  80. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  81. if not folder:
  82. return None
  83. return FolderModel.model_validate(folder)
  84. except Exception:
  85. return None
  86. def get_children_folders_by_id_and_user_id(
  87. self, id: str, user_id: str
  88. ) -> Optional[FolderModel]:
  89. try:
  90. with get_db() as db:
  91. folders = []
  92. def get_children(folder):
  93. children = self.get_folders_by_parent_id_and_user_id(
  94. folder.id, user_id
  95. )
  96. for child in children:
  97. get_children(child)
  98. folders.append(child)
  99. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  100. if not folder:
  101. return None
  102. get_children(folder)
  103. return folders
  104. except Exception:
  105. return None
  106. def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
  107. with get_db() as db:
  108. return [
  109. FolderModel.model_validate(folder)
  110. for folder in db.query(Folder).filter_by(user_id=user_id).all()
  111. ]
  112. def get_folder_by_parent_id_and_user_id_and_name(
  113. self, parent_id: Optional[str], user_id: str, name: str
  114. ) -> Optional[FolderModel]:
  115. try:
  116. with get_db() as db:
  117. # Check if folder exists
  118. folder = (
  119. db.query(Folder)
  120. .filter_by(parent_id=parent_id, user_id=user_id)
  121. .filter(Folder.name.ilike(name))
  122. .first()
  123. )
  124. if not folder:
  125. return None
  126. return FolderModel.model_validate(folder)
  127. except Exception as e:
  128. log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
  129. return None
  130. def get_folders_by_parent_id_and_user_id(
  131. self, parent_id: Optional[str], user_id: str
  132. ) -> list[FolderModel]:
  133. with get_db() as db:
  134. return [
  135. FolderModel.model_validate(folder)
  136. for folder in db.query(Folder)
  137. .filter_by(parent_id=parent_id, user_id=user_id)
  138. .all()
  139. ]
  140. def update_folder_parent_id_by_id_and_user_id(
  141. self,
  142. id: str,
  143. user_id: str,
  144. parent_id: str,
  145. ) -> Optional[FolderModel]:
  146. try:
  147. with get_db() as db:
  148. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  149. if not folder:
  150. return None
  151. folder.parent_id = parent_id
  152. folder.updated_at = int(time.time())
  153. db.commit()
  154. return FolderModel.model_validate(folder)
  155. except Exception as e:
  156. log.error(f"update_folder: {e}")
  157. return
  158. def update_folder_by_id_and_user_id(
  159. self, id: str, user_id: str, form_data: FolderForm
  160. ) -> Optional[FolderModel]:
  161. try:
  162. with get_db() as db:
  163. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  164. if not folder:
  165. return None
  166. form_data = form_data.model_dump(exclude_unset=True)
  167. existing_folder = (
  168. db.query(Folder)
  169. .filter_by(
  170. name=form_data.get("name"),
  171. parent_id=folder.parent_id,
  172. user_id=user_id,
  173. )
  174. .first()
  175. )
  176. if existing_folder and existing_folder.id != id:
  177. return None
  178. folder.name = form_data.get("name", folder.name)
  179. if "data" in form_data:
  180. folder.data = {
  181. **(folder.data or {}),
  182. **form_data["data"],
  183. }
  184. folder.updated_at = int(time.time())
  185. db.commit()
  186. return FolderModel.model_validate(folder)
  187. except Exception as e:
  188. log.error(f"update_folder: {e}")
  189. return
  190. def update_folder_is_expanded_by_id_and_user_id(
  191. self, id: str, user_id: str, is_expanded: bool
  192. ) -> Optional[FolderModel]:
  193. try:
  194. with get_db() as db:
  195. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  196. if not folder:
  197. return None
  198. folder.is_expanded = is_expanded
  199. folder.updated_at = int(time.time())
  200. db.commit()
  201. return FolderModel.model_validate(folder)
  202. except Exception as e:
  203. log.error(f"update_folder: {e}")
  204. return
  205. def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> list[str]:
  206. try:
  207. folder_ids = []
  208. with get_db() as db:
  209. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  210. if not folder:
  211. return folder_ids
  212. folder_ids.append(folder.id)
  213. # Delete all children folders
  214. def delete_children(folder):
  215. folder_children = self.get_folders_by_parent_id_and_user_id(
  216. folder.id, user_id
  217. )
  218. for folder_child in folder_children:
  219. delete_children(folder_child)
  220. folder_ids.append(folder_child.id)
  221. folder = db.query(Folder).filter_by(id=folder_child.id).first()
  222. db.delete(folder)
  223. db.commit()
  224. delete_children(folder)
  225. db.delete(folder)
  226. db.commit()
  227. return folder_ids
  228. except Exception as e:
  229. log.error(f"delete_folder: {e}")
  230. return []
  231. Folders = FolderTable()