folders.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import logging
  2. import time
  3. import uuid
  4. from typing import Optional
  5. import re
  6. from pydantic import BaseModel, ConfigDict
  7. from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func
  8. from open_webui.internal.db import Base, get_db
  9. from open_webui.env import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # Folder DB Schema
  14. ####################
  15. class Folder(Base):
  16. __tablename__ = "folder"
  17. id = Column(Text, primary_key=True)
  18. parent_id = Column(Text, nullable=True)
  19. user_id = Column(Text)
  20. name = Column(Text)
  21. items = Column(JSON, nullable=True)
  22. meta = Column(JSON, nullable=True)
  23. data = Column(JSON, nullable=True)
  24. is_expanded = Column(Boolean, default=False)
  25. created_at = Column(BigInteger)
  26. updated_at = Column(BigInteger)
  27. class FolderModel(BaseModel):
  28. id: str
  29. parent_id: Optional[str] = None
  30. user_id: str
  31. name: str
  32. items: Optional[dict] = None
  33. meta: Optional[dict] = None
  34. data: Optional[dict] = None
  35. is_expanded: bool = False
  36. created_at: int
  37. updated_at: int
  38. model_config = ConfigDict(from_attributes=True)
  39. ####################
  40. # Forms
  41. ####################
  42. class FolderForm(BaseModel):
  43. name: str
  44. data: Optional[dict] = None
  45. meta: Optional[dict] = None
  46. model_config = ConfigDict(extra="allow")
  47. class FolderUpdateForm(BaseModel):
  48. name: Optional[str] = None
  49. data: Optional[dict] = None
  50. meta: Optional[dict] = None
  51. model_config = ConfigDict(extra="allow")
  52. class FolderTable:
  53. def insert_new_folder(
  54. self, user_id: str, form_data: FolderForm, parent_id: Optional[str] = None
  55. ) -> Optional[FolderModel]:
  56. with get_db() as db:
  57. id = str(uuid.uuid4())
  58. folder = FolderModel(
  59. **{
  60. "id": id,
  61. "user_id": user_id,
  62. **(form_data.model_dump(exclude_unset=True) or {}),
  63. "parent_id": parent_id,
  64. "created_at": int(time.time()),
  65. "updated_at": int(time.time()),
  66. }
  67. )
  68. try:
  69. result = Folder(**folder.model_dump())
  70. db.add(result)
  71. db.commit()
  72. db.refresh(result)
  73. if result:
  74. return FolderModel.model_validate(result)
  75. else:
  76. return None
  77. except Exception as e:
  78. log.exception(f"Error inserting a new folder: {e}")
  79. return None
  80. def get_folder_by_id_and_user_id(
  81. self, id: str, user_id: str
  82. ) -> Optional[FolderModel]:
  83. try:
  84. with get_db() as db:
  85. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  86. if not folder:
  87. return None
  88. return FolderModel.model_validate(folder)
  89. except Exception:
  90. return None
  91. def get_children_folders_by_id_and_user_id(
  92. self, id: str, user_id: str
  93. ) -> Optional[list[FolderModel]]:
  94. try:
  95. with get_db() as db:
  96. folders = []
  97. def get_children(folder):
  98. children = self.get_folders_by_parent_id_and_user_id(
  99. folder.id, user_id
  100. )
  101. for child in children:
  102. get_children(child)
  103. folders.append(child)
  104. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  105. if not folder:
  106. return None
  107. get_children(folder)
  108. return folders
  109. except Exception:
  110. return None
  111. def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
  112. with get_db() as db:
  113. return [
  114. FolderModel.model_validate(folder)
  115. for folder in db.query(Folder).filter_by(user_id=user_id).all()
  116. ]
  117. def get_folder_by_parent_id_and_user_id_and_name(
  118. self, parent_id: Optional[str], user_id: str, name: str
  119. ) -> Optional[FolderModel]:
  120. try:
  121. with get_db() as db:
  122. # Check if folder exists
  123. folder = (
  124. db.query(Folder)
  125. .filter_by(parent_id=parent_id, user_id=user_id)
  126. .filter(Folder.name.ilike(name))
  127. .first()
  128. )
  129. if not folder:
  130. return None
  131. return FolderModel.model_validate(folder)
  132. except Exception as e:
  133. log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
  134. return None
  135. def get_folders_by_parent_id_and_user_id(
  136. self, parent_id: Optional[str], user_id: str
  137. ) -> list[FolderModel]:
  138. with get_db() as db:
  139. return [
  140. FolderModel.model_validate(folder)
  141. for folder in db.query(Folder)
  142. .filter_by(parent_id=parent_id, user_id=user_id)
  143. .all()
  144. ]
  145. def update_folder_parent_id_by_id_and_user_id(
  146. self,
  147. id: str,
  148. user_id: str,
  149. parent_id: str,
  150. ) -> Optional[FolderModel]:
  151. try:
  152. with get_db() as db:
  153. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  154. if not folder:
  155. return None
  156. folder.parent_id = parent_id
  157. folder.updated_at = int(time.time())
  158. db.commit()
  159. return FolderModel.model_validate(folder)
  160. except Exception as e:
  161. log.error(f"update_folder: {e}")
  162. return
  163. def update_folder_by_id_and_user_id(
  164. self, id: str, user_id: str, form_data: FolderUpdateForm
  165. ) -> Optional[FolderModel]:
  166. try:
  167. with get_db() as db:
  168. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  169. if not folder:
  170. return None
  171. form_data = form_data.model_dump(exclude_unset=True)
  172. existing_folder = (
  173. db.query(Folder)
  174. .filter_by(
  175. name=form_data.get("name"),
  176. parent_id=folder.parent_id,
  177. user_id=user_id,
  178. )
  179. .first()
  180. )
  181. if existing_folder and existing_folder.id != id:
  182. return None
  183. folder.name = form_data.get("name", folder.name)
  184. if "data" in form_data:
  185. folder.data = {
  186. **(folder.data or {}),
  187. **form_data["data"],
  188. }
  189. if "meta" in form_data:
  190. folder.meta = {
  191. **(folder.meta or {}),
  192. **form_data["meta"],
  193. }
  194. folder.updated_at = int(time.time())
  195. db.commit()
  196. return FolderModel.model_validate(folder)
  197. except Exception as e:
  198. log.error(f"update_folder: {e}")
  199. return
  200. def update_folder_is_expanded_by_id_and_user_id(
  201. self, id: str, user_id: str, is_expanded: bool
  202. ) -> Optional[FolderModel]:
  203. try:
  204. with get_db() as db:
  205. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  206. if not folder:
  207. return None
  208. folder.is_expanded = is_expanded
  209. folder.updated_at = int(time.time())
  210. db.commit()
  211. return FolderModel.model_validate(folder)
  212. except Exception as e:
  213. log.error(f"update_folder: {e}")
  214. return
  215. def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> list[str]:
  216. try:
  217. folder_ids = []
  218. with get_db() as db:
  219. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  220. if not folder:
  221. return folder_ids
  222. folder_ids.append(folder.id)
  223. # Delete all children folders
  224. def delete_children(folder):
  225. folder_children = self.get_folders_by_parent_id_and_user_id(
  226. folder.id, user_id
  227. )
  228. for folder_child in folder_children:
  229. delete_children(folder_child)
  230. folder_ids.append(folder_child.id)
  231. folder = db.query(Folder).filter_by(id=folder_child.id).first()
  232. db.delete(folder)
  233. db.commit()
  234. delete_children(folder)
  235. db.delete(folder)
  236. db.commit()
  237. return folder_ids
  238. except Exception as e:
  239. log.error(f"delete_folder: {e}")
  240. return []
  241. def normalize_folder_name(self, name: str) -> str:
  242. # Replace _ and space with a single space, lower case, collapse multiple spaces
  243. name = re.sub(r"[\s_]+", " ", name)
  244. return name.strip().lower()
  245. def search_folders_by_names(
  246. self, user_id: str, queries: list[str]
  247. ) -> list[FolderModel]:
  248. """
  249. Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive.
  250. """
  251. normalized_queries = [self.normalize_folder_name(q) for q in queries]
  252. if not normalized_queries:
  253. return []
  254. results = {}
  255. with get_db() as db:
  256. folders = db.query(Folder).filter_by(user_id=user_id).all()
  257. for folder in folders:
  258. if self.normalize_folder_name(folder.name) in normalized_queries:
  259. results[folder.id] = FolderModel.model_validate(folder)
  260. # get children folders
  261. children = self.get_children_folders_by_id_and_user_id(
  262. folder.id, user_id
  263. )
  264. for child in children:
  265. results[child.id] = child
  266. # Return the results as a list
  267. if not results:
  268. return []
  269. else:
  270. results = list(results.values())
  271. return results
  272. def search_folders_by_name_contains(
  273. self, user_id: str, query: str
  274. ) -> list[FolderModel]:
  275. """
  276. Partial match: normalized name contains (as substring) the normalized query.
  277. """
  278. normalized_query = self.normalize_folder_name(query)
  279. results = []
  280. with get_db() as db:
  281. folders = db.query(Folder).filter_by(user_id=user_id).all()
  282. for folder in folders:
  283. norm_name = self.normalize_folder_name(folder.name)
  284. if normalized_query in norm_name:
  285. results.append(FolderModel.model_validate(folder))
  286. return results
  287. Folders = FolderTable()