folders.py 11 KB


  1. import logging
  2. import time
  3. import uuid
  4. from typing import Optional
  5. import re
  6. from pydantic import BaseModel, ConfigDict
  7. from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func
  8. from open_webui.internal.db import Base, get_db
  9. from open_webui.env import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # Folder DB Schema
  14. ####################
  15. class Folder(Base):
  16. __tablename__ = "folder"
  17. id = Column(Text, primary_key=True)
  18. parent_id = Column(Text, nullable=True)
  19. user_id = Column(Text)
  20. name = Column(Text)
  21. items = Column(JSON, nullable=True)
  22. meta = Column(JSON, nullable=True)
  23. data = Column(JSON, nullable=True)
  24. is_expanded = Column(Boolean, default=False)
  25. created_at = Column(BigInteger)
  26. updated_at = Column(BigInteger)
  27. class FolderModel(BaseModel):
  28. id: str
  29. parent_id: Optional[str] = None
  30. user_id: str
  31. name: str
  32. items: Optional[dict] = None
  33. meta: Optional[dict] = None
  34. data: Optional[dict] = None
  35. is_expanded: bool = False
  36. created_at: int
  37. updated_at: int
  38. model_config = ConfigDict(from_attributes=True)
  39. ####################
  40. # Forms
  41. ####################
  42. class FolderForm(BaseModel):
  43. name: str
  44. data: Optional[dict] = None
  45. model_config = ConfigDict(extra="allow")
  46. class FolderTable:
  47. def insert_new_folder(
  48. self, user_id: str, form_data: FolderForm, parent_id: Optional[str] = None
  49. ) -> Optional[FolderModel]:
  50. with get_db() as db:
  51. id = str(uuid.uuid4())
  52. folder = FolderModel(
  53. **{
  54. "id": id,
  55. "user_id": user_id,
  56. **(form_data.model_dump(exclude_unset=True) or {}),
  57. "parent_id": parent_id,
  58. "created_at": int(time.time()),
  59. "updated_at": int(time.time()),
  60. }
  61. )
  62. try:
  63. result = Folder(**folder.model_dump())
  64. db.add(result)
  65. db.commit()
  66. db.refresh(result)
  67. if result:
  68. return FolderModel.model_validate(result)
  69. else:
  70. return None
  71. except Exception as e:
  72. log.exception(f"Error inserting a new folder: {e}")
  73. return None
  74. def get_folder_by_id_and_user_id(
  75. self, id: str, user_id: str
  76. ) -> Optional[FolderModel]:
  77. try:
  78. with get_db() as db:
  79. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  80. if not folder:
  81. return None
  82. return FolderModel.model_validate(folder)
  83. except Exception:
  84. return None
  85. def get_children_folders_by_id_and_user_id(
  86. self, id: str, user_id: str
  87. ) -> Optional[list[FolderModel]]:
  88. try:
  89. with get_db() as db:
  90. folders = []
  91. def get_children(folder):
  92. children = self.get_folders_by_parent_id_and_user_id(
  93. folder.id, user_id
  94. )
  95. for child in children:
  96. get_children(child)
  97. folders.append(child)
  98. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  99. if not folder:
  100. return None
  101. get_children(folder)
  102. return folders
  103. except Exception:
  104. return None
  105. def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
  106. with get_db() as db:
  107. return [
  108. FolderModel.model_validate(folder)
  109. for folder in db.query(Folder).filter_by(user_id=user_id).all()
  110. ]
  111. def get_folder_by_parent_id_and_user_id_and_name(
  112. self, parent_id: Optional[str], user_id: str, name: str
  113. ) -> Optional[FolderModel]:
  114. try:
  115. with get_db() as db:
  116. # Check if folder exists
  117. folder = (
  118. db.query(Folder)
  119. .filter_by(parent_id=parent_id, user_id=user_id)
  120. .filter(Folder.name.ilike(name))
  121. .first()
  122. )
  123. if not folder:
  124. return None
  125. return FolderModel.model_validate(folder)
  126. except Exception as e:
  127. log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
  128. return None
  129. def get_folders_by_parent_id_and_user_id(
  130. self, parent_id: Optional[str], user_id: str
  131. ) -> list[FolderModel]:
  132. with get_db() as db:
  133. return [
  134. FolderModel.model_validate(folder)
  135. for folder in db.query(Folder)
  136. .filter_by(parent_id=parent_id, user_id=user_id)
  137. .all()
  138. ]
  139. def update_folder_parent_id_by_id_and_user_id(
  140. self,
  141. id: str,
  142. user_id: str,
  143. parent_id: str,
  144. ) -> Optional[FolderModel]:
  145. try:
  146. with get_db() as db:
  147. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  148. if not folder:
  149. return None
  150. folder.parent_id = parent_id
  151. folder.updated_at = int(time.time())
  152. db.commit()
  153. return FolderModel.model_validate(folder)
  154. except Exception as e:
  155. log.error(f"update_folder: {e}")
  156. return
  157. def update_folder_by_id_and_user_id(
  158. self, id: str, user_id: str, form_data: FolderForm
  159. ) -> Optional[FolderModel]:
  160. try:
  161. with get_db() as db:
  162. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  163. if not folder:
  164. return None
  165. form_data = form_data.model_dump(exclude_unset=True)
  166. existing_folder = (
  167. db.query(Folder)
  168. .filter_by(
  169. name=form_data.get("name"),
  170. parent_id=folder.parent_id,
  171. user_id=user_id,
  172. )
  173. .first()
  174. )
  175. if existing_folder and existing_folder.id != id:
  176. return None
  177. folder.name = form_data.get("name", folder.name)
  178. if "data" in form_data:
  179. folder.data = {
  180. **(folder.data or {}),
  181. **form_data["data"],
  182. }
  183. folder.updated_at = int(time.time())
  184. db.commit()
  185. return FolderModel.model_validate(folder)
  186. except Exception as e:
  187. log.error(f"update_folder: {e}")
  188. return
  189. def update_folder_is_expanded_by_id_and_user_id(
  190. self, id: str, user_id: str, is_expanded: bool
  191. ) -> Optional[FolderModel]:
  192. try:
  193. with get_db() as db:
  194. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  195. if not folder:
  196. return None
  197. folder.is_expanded = is_expanded
  198. folder.updated_at = int(time.time())
  199. db.commit()
  200. return FolderModel.model_validate(folder)
  201. except Exception as e:
  202. log.error(f"update_folder: {e}")
  203. return
  204. def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> list[str]:
  205. try:
  206. folder_ids = []
  207. with get_db() as db:
  208. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  209. if not folder:
  210. return folder_ids
  211. folder_ids.append(folder.id)
  212. # Delete all children folders
  213. def delete_children(folder):
  214. folder_children = self.get_folders_by_parent_id_and_user_id(
  215. folder.id, user_id
  216. )
  217. for folder_child in folder_children:
  218. delete_children(folder_child)
  219. folder_ids.append(folder_child.id)
  220. folder = db.query(Folder).filter_by(id=folder_child.id).first()
  221. db.delete(folder)
  222. db.commit()
  223. delete_children(folder)
  224. db.delete(folder)
  225. db.commit()
  226. return folder_ids
  227. except Exception as e:
  228. log.error(f"delete_folder: {e}")
  229. return []
  230. def normalize_folder_name(self, name: str) -> str:
  231. # Replace _ and space with a single space, lower case, collapse multiple spaces
  232. name = re.sub(r"[\s_]+", " ", name)
  233. return name.strip().lower()
  234. def search_folders_by_names(
  235. self, user_id: str, queries: list[str]
  236. ) -> list[FolderModel]:
  237. """
  238. Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive.
  239. """
  240. normalized_queries = [self.normalize_folder_name(q) for q in queries]
  241. if not normalized_queries:
  242. return []
  243. results = {}
  244. with get_db() as db:
  245. folders = db.query(Folder).filter_by(user_id=user_id).all()
  246. for folder in folders:
  247. if self.normalize_folder_name(folder.name) in normalized_queries:
  248. results[folder.id] = FolderModel.model_validate(folder)
  249. # get children folders
  250. children = self.get_children_folders_by_id_and_user_id(
  251. folder.id, user_id
  252. )
  253. for child in children:
  254. results[child.id] = child
  255. # Return the results as a list
  256. if not results:
  257. return []
  258. else:
  259. results = list(results.values())
  260. return results
  261. def search_folders_by_name_contains(
  262. self, user_id: str, query: str
  263. ) -> list[FolderModel]:
  264. """
  265. Partial match: normalized name contains (as substring) the normalized query.
  266. """
  267. normalized_query = self.normalize_folder_name(query)
  268. results = []
  269. with get_db() as db:
  270. folders = db.query(Folder).filter_by(user_id=user_id).all()
  271. for folder in folders:
  272. norm_name = self.normalize_folder_name(folder.name)
  273. if normalized_query in norm_name:
  274. results.append(FolderModel.model_validate(folder))
  275. return results
  276. Folders = FolderTable()