123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- import logging
- import time
- import uuid
- from typing import Optional
- import re
- from pydantic import BaseModel, ConfigDict
- from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func
- from open_webui.internal.db import Base, get_db
- from open_webui.env import SRC_LOG_LEVELS
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["MODELS"])
- ####################
- # Folder DB Schema
- ####################
- class Folder(Base):
- __tablename__ = "folder"
- id = Column(Text, primary_key=True)
- parent_id = Column(Text, nullable=True)
- user_id = Column(Text)
- name = Column(Text)
- items = Column(JSON, nullable=True)
- meta = Column(JSON, nullable=True)
- data = Column(JSON, nullable=True)
- is_expanded = Column(Boolean, default=False)
- created_at = Column(BigInteger)
- updated_at = Column(BigInteger)
- class FolderModel(BaseModel):
- id: str
- parent_id: Optional[str] = None
- user_id: str
- name: str
- items: Optional[dict] = None
- meta: Optional[dict] = None
- data: Optional[dict] = None
- is_expanded: bool = False
- created_at: int
- updated_at: int
- model_config = ConfigDict(from_attributes=True)
- ####################
- # Forms
- ####################
- class FolderForm(BaseModel):
- name: str
- data: Optional[dict] = None
- model_config = ConfigDict(extra="allow")
- class FolderTable:
- def insert_new_folder(
- self, user_id: str, form_data: FolderForm, parent_id: Optional[str] = None
- ) -> Optional[FolderModel]:
- with get_db() as db:
- id = str(uuid.uuid4())
- folder = FolderModel(
- **{
- "id": id,
- "user_id": user_id,
- **(form_data.model_dump(exclude_unset=True) or {}),
- "parent_id": parent_id,
- "created_at": int(time.time()),
- "updated_at": int(time.time()),
- }
- )
- try:
- result = Folder(**folder.model_dump())
- db.add(result)
- db.commit()
- db.refresh(result)
- if result:
- return FolderModel.model_validate(result)
- else:
- return None
- except Exception as e:
- log.exception(f"Error inserting a new folder: {e}")
- return None
- def get_folder_by_id_and_user_id(
- self, id: str, user_id: str
- ) -> Optional[FolderModel]:
- try:
- with get_db() as db:
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return None
- return FolderModel.model_validate(folder)
- except Exception:
- return None
- def get_children_folders_by_id_and_user_id(
- self, id: str, user_id: str
- ) -> Optional[list[FolderModel]]:
- try:
- with get_db() as db:
- folders = []
- def get_children(folder):
- children = self.get_folders_by_parent_id_and_user_id(
- folder.id, user_id
- )
- for child in children:
- get_children(child)
- folders.append(child)
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return None
- get_children(folder)
- return folders
- except Exception:
- return None
- def get_folders_by_user_id(self, user_id: str) -> list[FolderModel]:
- with get_db() as db:
- return [
- FolderModel.model_validate(folder)
- for folder in db.query(Folder).filter_by(user_id=user_id).all()
- ]
- def get_folder_by_parent_id_and_user_id_and_name(
- self, parent_id: Optional[str], user_id: str, name: str
- ) -> Optional[FolderModel]:
- try:
- with get_db() as db:
- # Check if folder exists
- folder = (
- db.query(Folder)
- .filter_by(parent_id=parent_id, user_id=user_id)
- .filter(Folder.name.ilike(name))
- .first()
- )
- if not folder:
- return None
- return FolderModel.model_validate(folder)
- except Exception as e:
- log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}")
- return None
- def get_folders_by_parent_id_and_user_id(
- self, parent_id: Optional[str], user_id: str
- ) -> list[FolderModel]:
- with get_db() as db:
- return [
- FolderModel.model_validate(folder)
- for folder in db.query(Folder)
- .filter_by(parent_id=parent_id, user_id=user_id)
- .all()
- ]
- def update_folder_parent_id_by_id_and_user_id(
- self,
- id: str,
- user_id: str,
- parent_id: str,
- ) -> Optional[FolderModel]:
- try:
- with get_db() as db:
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return None
- folder.parent_id = parent_id
- folder.updated_at = int(time.time())
- db.commit()
- return FolderModel.model_validate(folder)
- except Exception as e:
- log.error(f"update_folder: {e}")
- return
- def update_folder_by_id_and_user_id(
- self, id: str, user_id: str, form_data: FolderForm
- ) -> Optional[FolderModel]:
- try:
- with get_db() as db:
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return None
- form_data = form_data.model_dump(exclude_unset=True)
- existing_folder = (
- db.query(Folder)
- .filter_by(
- name=form_data.get("name"),
- parent_id=folder.parent_id,
- user_id=user_id,
- )
- .first()
- )
- if existing_folder and existing_folder.id != id:
- return None
- folder.name = form_data.get("name", folder.name)
- if "data" in form_data:
- folder.data = {
- **(folder.data or {}),
- **form_data["data"],
- }
- folder.updated_at = int(time.time())
- db.commit()
- return FolderModel.model_validate(folder)
- except Exception as e:
- log.error(f"update_folder: {e}")
- return
- def update_folder_is_expanded_by_id_and_user_id(
- self, id: str, user_id: str, is_expanded: bool
- ) -> Optional[FolderModel]:
- try:
- with get_db() as db:
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return None
- folder.is_expanded = is_expanded
- folder.updated_at = int(time.time())
- db.commit()
- return FolderModel.model_validate(folder)
- except Exception as e:
- log.error(f"update_folder: {e}")
- return
- def delete_folder_by_id_and_user_id(self, id: str, user_id: str) -> list[str]:
- try:
- folder_ids = []
- with get_db() as db:
- folder = db.query(Folder).filter_by(id=id, user_id=user_id).first()
- if not folder:
- return folder_ids
- folder_ids.append(folder.id)
- # Delete all children folders
- def delete_children(folder):
- folder_children = self.get_folders_by_parent_id_and_user_id(
- folder.id, user_id
- )
- for folder_child in folder_children:
- delete_children(folder_child)
- folder_ids.append(folder_child.id)
- folder = db.query(Folder).filter_by(id=folder_child.id).first()
- db.delete(folder)
- db.commit()
- delete_children(folder)
- db.delete(folder)
- db.commit()
- return folder_ids
- except Exception as e:
- log.error(f"delete_folder: {e}")
- return []
- def normalize_folder_name(self, name: str) -> str:
- # Replace _ and space with a single space, lower case, collapse multiple spaces
- name = re.sub(r"[\s_]+", " ", name)
- return name.strip().lower()
- def search_folders_by_names(
- self, user_id: str, queries: list[str]
- ) -> list[FolderModel]:
- """
- Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive.
- """
- normalized_queries = [self.normalize_folder_name(q) for q in queries]
- if not normalized_queries:
- return []
- results = {}
- with get_db() as db:
- folders = db.query(Folder).filter_by(user_id=user_id).all()
- for folder in folders:
- if self.normalize_folder_name(folder.name) in normalized_queries:
- results[folder.id] = FolderModel.model_validate(folder)
- # get children folders
- children = self.get_children_folders_by_id_and_user_id(
- folder.id, user_id
- )
- for child in children:
- results[child.id] = child
- # Return the results as a list
- if not results:
- return []
- else:
- results = list(results.values())
- return results
- def search_folders_by_name_contains(
- self, user_id: str, query: str
- ) -> list[FolderModel]:
- """
- Partial match: normalized name contains (as substring) the normalized query.
- """
- normalized_query = self.normalize_folder_name(query)
- results = []
- with get_db() as db:
- folders = db.query(Folder).filter_by(user_id=user_id).all()
- for folder in folders:
- norm_name = self.normalize_folder_name(folder.name)
- if normalized_query in norm_name:
- results.append(FolderModel.model_validate(folder))
- return results
- Folders = FolderTable()
|