folders.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import logging
  2. import time
  3. import uuid
  4. from typing import Optional
  5. import re
  6. from pydantic import BaseModel, ConfigDict
  7. from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func
  8. from open_webui.internal.db import Base, get_db
  9. from open_webui.env import SRC_LOG_LEVELS
  10. log = logging.getLogger(__name__)
  11. log.setLevel(SRC_LOG_LEVELS["MODELS"])
  12. ####################
  13. # Folder DB Schema
  14. ####################
  15. class Folder(Base):
  16. __tablename__ = "folder"
  17. id = Column(Text, primary_key=True)
  18. parent_id = Column(Text, nullable=True)
  19. user_id = Column(Text)
  20. name = Column(Text)
  21. items = Column(JSON, nullable=True)
  22. meta = Column(JSON, nullable=True)
  23. data = Column(JSON, nullable=True)
  24. is_expanded = Column(Boolean, default=False)
  25. created_at = Column(BigInteger)
  26. updated_at = Column(BigInteger)
  27. class FolderModel(BaseModel):
  28. id: str
  29. parent_id: Optional[str] = None
  30. user_id: str
  31. name: str
  32. items: Optional[dict] = None
  33. meta: Optional[dict] = None
  34. data: Optional[dict] = None
  35. is_expanded: bool = False
  36. created_at: int
  37. updated_at: int
  38. model_config = ConfigDict(from_attributes=True)
  39. class FolderMetadataResponse(BaseModel):
  40. icon: Optional[str] = None
  41. class FolderNameIdResponse(BaseModel):
  42. id: str
  43. name: str
  44. meta: Optional[FolderMetadataResponse] = None
  45. parent_id: Optional[str] = None
  46. is_expanded: bool = False
  47. created_at: int
  48. updated_at: int
  49. ####################
  50. # Forms
  51. ####################
  52. class FolderForm(BaseModel):
  53. name: str
  54. data: Optional[dict] = None
  55. meta: Optional[dict] = None
  56. model_config = ConfigDict(extra="allow")
  57. class FolderUpdateForm(BaseModel):
  58. name: Optional[str] = None
  59. data: Optional[dict] = None
  60. meta: Optional[dict] = None
  61. model_config = ConfigDict(extra="allow")
  62. class FolderTable:
  63. def insert_new_folder(
  64. self, user_id: str, form_data: FolderForm, parent_id: Optional[str] = None
  65. ) -> Optional[FolderModel]:
  66. with get_db() as db:
  67. id = str(uuid.uuid4())
  68. folder = FolderModel(
  69. **{
  70. "id": id,
  71. "user_id": user_id,
  72. **(form_data.model_dump(exclude_unset=True) or {}),
  73. "parent_id": parent_id,
  74. "created_at": int(time.time()),
  75. "updated_at": int(time.time()),
  76. }
  77. )
  78. try:
  79. result = Folder(**folder.model_dump())
  80. db.add(result)
  81. db.commit()
  82. db.refresh(result)
  83. if result:
  84. return FolderModel.model_validate(result)
  85. else:
  86. return None
  87. except Exception as e:
  88. log.exception(f"Error inserting a new folder: {e}")
  89. return None
  90. def get_folder_by_id_and_user_id(
  91. self, id: str, user_id: str
  92. ) -> Optional[FolderModel]:
  93. try:
  94. with get_db() as db:
  95. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  96. if not folder:
  97. return None
  98. return FolderModel.model_validate(folder)
  99. except Exception:
  100. return None
  101. def get_children_folders_by_id_and_user_id(
  102. self, id: str, user_id: str
  103. ) -> Optional[list[FolderModel]]:
  104. try:
  105. with get_db() as db:
  106. folders = []
  107. def get_children(folder):
  108. children = self.get_folders_by_parent_id_and_user_id(
  109. folder.id, user_id
  110. )
  111. for child in children:
  112. get_children(child)
  113. folders.append(child)
  114. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  115. if not folder:
  116. return None
  117. get_children(folder)
  118. return folders
  119. except Exception:
  120. return None
  121. def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
  122. with get_db() as db:
  123. return [
  124. FolderModel.model_validate(folder)
  125. for folder in db.query(Folder).filter_by(user_id=user_id).all()
  126. ]
  127. def get_folder_by_parent_id_and_user_id_and_name(
  128. self, parent_id: Optional[str], user_id: str, name: str
  129. ) -> Optional[FolderModel]:
  130. try:
  131. with get_db() as db:
  132. # Check if folder exists
  133. folder = (
  134. db.query(Folder)
  135. .filter_by(parent_id=parent_id, user_id=user_id)
  136. .filter(Folder.name.ilike(name))
  137. .first()
  138. )
  139. if not folder:
  140. return None
  141. return FolderModel.model_validate(folder)
  142. except Exception as e:
  143. log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
  144. return None
  145. def get_folders_by_parent_id_and_user_id(
  146. self, parent_id: Optional[str], user_id: str
  147. ) -> list[FolderModel]:
  148. with get_db() as db:
  149. return [
  150. FolderModel.model_validate(folder)
  151. for folder in db.query(Folder)
  152. .filter_by(parent_id=parent_id, user_id=user_id)
  153. .all()
  154. ]
  155. def update_folder_parent_id_by_id_and_user_id(
  156. self,
  157. id: str,
  158. user_id: str,
  159. parent_id: str,
  160. ) -> Optional[FolderModel]:
  161. try:
  162. with get_db() as db:
  163. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  164. if not folder:
  165. return None
  166. folder.parent_id = parent_id
  167. folder.updated_at = int(time.time())
  168. db.commit()
  169. return FolderModel.model_validate(folder)
  170. except Exception as e:
  171. log.error(f"update_folder: {e}")
  172. return
  173. def update_folder_by_id_and_user_id(
  174. self, id: str, user_id: str, form_data: FolderUpdateForm
  175. ) -> Optional[FolderModel]:
  176. try:
  177. with get_db() as db:
  178. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  179. if not folder:
  180. return None
  181. form_data = form_data.model_dump(exclude_unset=True)
  182. existing_folder = (
  183. db.query(Folder)
  184. .filter_by(
  185. name=form_data.get("name"),
  186. parent_id=folder.parent_id,
  187. user_id=user_id,
  188. )
  189. .first()
  190. )
  191. if existing_folder and existing_folder.id != id:
  192. return None
  193. folder.name = form_data.get("name", folder.name)
  194. if "data" in form_data:
  195. folder.data = {
  196. **(folder.data or {}),
  197. **form_data["data"],
  198. }
  199. if "meta" in form_data:
  200. folder.meta = {
  201. **(folder.meta or {}),
  202. **form_data["meta"],
  203. }
  204. folder.updated_at = int(time.time())
  205. db.commit()
  206. return FolderModel.model_validate(folder)
  207. except Exception as e:
  208. log.error(f"update_folder: {e}")
  209. return
  210. def update_folder_is_expanded_by_id_and_user_id(
  211. self, id: str, user_id: str, is_expanded: bool
  212. ) -> Optional[FolderModel]:
  213. try:
  214. with get_db() as db:
  215. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  216. if not folder:
  217. return None
  218. folder.is_expanded = is_expanded
  219. folder.updated_at = int(time.time())
  220. db.commit()
  221. return FolderModel.model_validate(folder)
  222. except Exception as e:
  223. log.error(f"update_folder: {e}")
  224. return
  225. def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> list[str]:
  226. try:
  227. folder_ids = []
  228. with get_db() as db:
  229. folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
  230. if not folder:
  231. return folder_ids
  232. folder_ids.append(folder.id)
  233. # Delete all children folders
  234. def delete_children(folder):
  235. folder_children = self.get_folders_by_parent_id_and_user_id(
  236. folder.id, user_id
  237. )
  238. for folder_child in folder_children:
  239. delete_children(folder_child)
  240. folder_ids.append(folder_child.id)
  241. folder = db.query(Folder).filter_by(id=folder_child.id).first()
  242. db.delete(folder)
  243. db.commit()
  244. delete_children(folder)
  245. db.delete(folder)
  246. db.commit()
  247. return folder_ids
  248. except Exception as e:
  249. log.error(f"delete_folder: {e}")
  250. return []
  251. def normalize_folder_name(self, name: str) -> str:
  252. # Replace _ and space with a single space, lower case, collapse multiple spaces
  253. name = re.sub(r"[\s_]+", " ", name)
  254. return name.strip().lower()
  255. def search_folders_by_names(
  256. self, user_id: str, queries: list[str]
  257. ) -> list[FolderModel]:
  258. """
  259. Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive.
  260. """
  261. normalized_queries = [self.normalize_folder_name(q) for q in queries]
  262. if not normalized_queries:
  263. return []
  264. results = {}
  265. with get_db() as db:
  266. folders = db.query(Folder).filter_by(user_id=user_id).all()
  267. for folder in folders:
  268. if self.normalize_folder_name(folder.name) in normalized_queries:
  269. results[folder.id] = FolderModel.model_validate(folder)
  270. # get children folders
  271. children = self.get_children_folders_by_id_and_user_id(
  272. folder.id, user_id
  273. )
  274. for child in children:
  275. results[child.id] = child
  276. # Return the results as a list
  277. if not results:
  278. return []
  279. else:
  280. results = list(results.values())
  281. return results
  282. def search_folders_by_name_contains(
  283. self, user_id: str, query: str
  284. ) -> list[FolderModel]:
  285. """
  286. Partial match: normalized name contains (as substring) the normalized query.
  287. """
  288. normalized_query = self.normalize_folder_name(query)
  289. results = []
  290. with get_db() as db:
  291. folders = db.query(Folder).filter_by(user_id=user_id).all()
  292. for folder in folders:
  293. norm_name = self.normalize_folder_name(folder.name)
  294. if normalized_query in norm_name:
  295. results.append(FolderModel.model_validate(folder))
  296. return results
  297. Folders = FolderTable()