|
@@ -2,14 +2,14 @@ import logging
|
|
|
import time
|
|
|
import uuid
|
|
|
from typing import Optional
|
|
|
+import re
|
|
|
|
|
|
-from open_webui.internal.db import Base, get_db
|
|
|
-from open_webui.models.chats import Chats
|
|
|
|
|
|
-from open_webui.env import SRC_LOG_LEVELS
|
|
|
from pydantic import BaseModel, ConfigDict
|
|
|
-from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
|
|
|
-from open_webui.utils.access_control import get_permissions
|
|
|
+from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func
|
|
|
+
|
|
|
+from open_webui.internal.db import Base, get_db
|
|
|
+from open_webui.env import SRC_LOG_LEVELS
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
@@ -106,7 +106,7 @@ class FolderTable:
|
|
|
|
|
|
def get_children_folders_by_id_and_user_id(
|
|
|
self, id: str, user_id: str
|
|
|
- ) -> Optional[FolderModel]:
|
|
|
+ ) -> Optional[list[FolderModel]]:
|
|
|
try:
|
|
|
with get_db() as db:
|
|
|
folders = []
|
|
@@ -283,5 +283,57 @@ class FolderTable:
|
|
|
log.error(f"delete_folder: {e}")
|
|
|
return []
|
|
|
|
|
|
+ def normalize_folder_name(self, name: str) -> str:
|
|
|
+ # Replace _ and space with a single space, lower case, collapse multiple spaces
|
|
|
+ name = re.sub(r"[\s_]+", " ", name)
|
|
|
+ return name.strip().lower()
|
|
|
+
|
|
|
+ def search_folders_by_names(
|
|
|
+ self, user_id: str, queries: list[str]
|
|
|
+ ) -> list[FolderModel]:
|
|
|
+ """
|
|
|
+ Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive.
|
|
|
+ """
|
|
|
+ normalized_queries = [self.normalize_folder_name(q) for q in queries]
|
|
|
+ if not normalized_queries:
|
|
|
+ return []
|
|
|
+
|
|
|
+ results = {}
|
|
|
+ with get_db() as db:
|
|
|
+ folders = db.query(Folder).filter_by(user_id=user_id).all()
|
|
|
+ for folder in folders:
|
|
|
+ if self.normalize_folder_name(folder.name) in normalized_queries:
|
|
|
+ results[folder.id] = FolderModel.model_validate(folder)
|
|
|
+
|
|
|
+ # get children folders
|
|
|
+ children = self.get_children_folders_by_id_and_user_id(
|
|
|
+ folder.id, user_id
|
|
|
+ )
|
|
|
+ for child in children:
|
|
|
+ results[child.id] = child
|
|
|
+
|
|
|
+ # Return the results as a list
|
|
|
+ if not results:
|
|
|
+ return []
|
|
|
+ else:
|
|
|
+ results = list(results.values())
|
|
|
+ return results
|
|
|
+
|
|
|
+ def search_folders_by_name_contains(
|
|
|
+ self, user_id: str, query: str
|
|
|
+ ) -> list[FolderModel]:
|
|
|
+ """
|
|
|
+ Partial match: normalized name contains (as substring) the normalized query.
|
|
|
+ """
|
|
|
+ normalized_query = self.normalize_folder_name(query)
|
|
|
+ results = []
|
|
|
+ with get_db() as db:
|
|
|
+ folders = db.query(Folder).filter_by(user_id=user_id).all()
|
|
|
+ for folder in folders:
|
|
|
+ norm_name = self.normalize_folder_name(folder.name)
|
|
|
+ if normalized_query in norm_name:
|
|
|
+ results.append(FolderModel.model_validate(folder))
|
|
|
+ return results
|
|
|
+
|
|
|
|
|
|
Folders = FolderTable()
|