123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 |
- import logging
- from typing import Optional, Tuple
- from urllib.parse import urlparse
- import grpc
- from open_webui.config import (
- QDRANT_API_KEY,
- QDRANT_GRPC_PORT,
- QDRANT_ON_DISK,
- QDRANT_PREFER_GRPC,
- QDRANT_URI,
- QDRANT_COLLECTION_PREFIX,
- )
- from open_webui.env import SRC_LOG_LEVELS
- from open_webui.retrieval.vector.main import (
- GetResult,
- SearchResult,
- VectorDBBase,
- VectorItem,
- )
- from qdrant_client import QdrantClient as Qclient
- from qdrant_client.http.exceptions import UnexpectedResponse
- from qdrant_client.http.models import PointStruct
- from qdrant_client.models import models
- NO_LIMIT = 999999999
- log = logging.getLogger(__name__)
- log.setLevel(SRC_LOG_LEVELS["RAG"])
- class QdrantClient(VectorDBBase):
- def __init__(self):
- self.collection_prefix = QDRANT_COLLECTION_PREFIX
- self.QDRANT_URI = QDRANT_URI
- self.QDRANT_API_KEY = QDRANT_API_KEY
- self.QDRANT_ON_DISK = QDRANT_ON_DISK
- self.PREFER_GRPC = QDRANT_PREFER_GRPC
- self.GRPC_PORT = QDRANT_GRPC_PORT
- if not self.QDRANT_URI:
- self.client = None
- return
- # Unified handling for either scheme
- parsed = urlparse(self.QDRANT_URI)
- host = parsed.hostname or self.QDRANT_URI
- http_port = parsed.port or 6333 # default REST port
- if self.PREFER_GRPC:
- self.client = Qclient(
- host=host,
- port=http_port,
- grpc_port=self.GRPC_PORT,
- prefer_grpc=self.PREFER_GRPC,
- api_key=self.QDRANT_API_KEY,
- )
- else:
- self.client = Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY)
- # Main collection types for multi-tenancy
- self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories"
- self.KNOWLEDGE_COLLECTION = f"{self.collection_prefix}_knowledge"
- self.FILE_COLLECTION = f"{self.collection_prefix}_files"
- self.WEB_SEARCH_COLLECTION = f"{self.collection_prefix}_web-search"
- self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based"
- def _result_to_get_result(self, points) -> GetResult:
- ids = []
- documents = []
- metadatas = []
- for point in points:
- payload = point.payload
- ids.append(point.id)
- documents.append(payload["text"])
- metadatas.append(payload["metadata"])
- return GetResult(
- **{
- "ids": [ids],
- "documents": [documents],
- "metadatas": [metadatas],
- }
- )
- def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]:
- """
- Maps the traditional collection name to multi-tenant collection and tenant ID.
- Returns:
- tuple: (collection_name, tenant_id)
- """
- # Check for user memory collections
- tenant_id = collection_name
- if collection_name.startswith("user-memory-"):
- return self.MEMORY_COLLECTION, tenant_id
- # Check for file collections
- elif collection_name.startswith("file-"):
- return self.FILE_COLLECTION, tenant_id
- # Check for web search collections
- elif collection_name.startswith("web-search-"):
- return self.WEB_SEARCH_COLLECTION, tenant_id
- # Handle hash-based collections (YouTube and web URLs)
- elif len(collection_name) == 63 and all(
- c in "0123456789abcdef" for c in collection_name
- ):
- return self.HASH_BASED_COLLECTION, tenant_id
- else:
- return self.KNOWLEDGE_COLLECTION, tenant_id
- def _extract_error_message(self, exception):
- """
- Extract error message from either HTTP or gRPC exceptions
- Returns:
- tuple: (status_code, error_message)
- """
- # Check if it's an HTTP exception
- if isinstance(exception, UnexpectedResponse):
- try:
- error_data = exception.structured()
- error_msg = error_data.get("status", {}).get("error", "")
- return exception.status_code, error_msg
- except Exception as inner_e:
- log.error(f"Failed to parse HTTP error: {inner_e}")
- return exception.status_code, str(exception)
- # Check if it's a gRPC exception
- elif isinstance(exception, grpc.RpcError):
- # Extract status code from gRPC error
- status_code = None
- if hasattr(exception, "code") and callable(exception.code):
- status_code = exception.code().value[0]
- # Extract error message
- error_msg = str(exception)
- if "details =" in error_msg:
- # Parse the details line which contains the actual error message
- try:
- details_line = [
- line.strip()
- for line in error_msg.split("\n")
- if "details =" in line
- ][0]
- error_msg = details_line.split("details =")[1].strip(' "')
- except (IndexError, AttributeError):
- # Fall back to full message if parsing fails
- pass
- return status_code, error_msg
- # For any other type of exception
- return None, str(exception)
- def _is_collection_not_found_error(self, exception):
- """
- Check if the exception is due to collection not found, supporting both HTTP and gRPC
- """
- status_code, error_msg = self._extract_error_message(exception)
- # HTTP error (404)
- if (
- status_code == 404
- and "Collection" in error_msg
- and "doesn't exist" in error_msg
- ):
- return True
- # gRPC error (NOT_FOUND status)
- if (
- isinstance(exception, grpc.RpcError)
- and exception.code() == grpc.StatusCode.NOT_FOUND
- ):
- return True
- return False
- def _is_dimension_mismatch_error(self, exception):
- """
- Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC
- """
- status_code, error_msg = self._extract_error_message(exception)
- # Common patterns in both HTTP and gRPC
- return (
- "Vector dimension error" in error_msg
- or "dimensions mismatch" in error_msg
- or "invalid vector size" in error_msg
- )
- def _create_multi_tenant_collection_if_not_exists(
- self, mt_collection_name: str, dimension: int = 384
- ):
- """
- Creates a collection with multi-tenancy configuration if it doesn't exist.
- Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'.
- When creating collections dynamically (insert/upsert), the actual vector dimensions will be used.
- """
- try:
- # Try to create the collection directly - will fail if it already exists
- self.client.create_collection(
- collection_name=mt_collection_name,
- vectors_config=models.VectorParams(
- size=dimension,
- distance=models.Distance.COSINE,
- on_disk=self.QDRANT_ON_DISK,
- ),
- hnsw_config=models.HnswConfigDiff(
- payload_m=16, # Enable per-tenant indexing
- m=0,
- on_disk=self.QDRANT_ON_DISK,
- ),
- )
- # Create tenant ID payload index
- self.client.create_payload_index(
- collection_name=mt_collection_name,
- field_name="tenant_id",
- field_schema=models.KeywordIndexParams(
- type=models.KeywordIndexType.KEYWORD,
- is_tenant=True,
- on_disk=self.QDRANT_ON_DISK,
- ),
- wait=True,
- )
- log.info(
- f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
- )
- except (UnexpectedResponse, grpc.RpcError) as e:
- # Check for the specific error indicating collection already exists
- status_code, error_msg = self._extract_error_message(e)
- # HTTP status code 409 or gRPC ALREADY_EXISTS
- if (isinstance(e, UnexpectedResponse) and status_code == 409) or (
- isinstance(e, grpc.RpcError)
- and e.code() == grpc.StatusCode.ALREADY_EXISTS
- ):
- if "already exists" in error_msg:
- log.debug(f"Collection {mt_collection_name} already exists")
- return
- # If it's not an already exists error, re-raise
- raise e
- except Exception as e:
- raise e
- def _create_points(self, items: list[VectorItem], tenant_id: str):
- """
- Create point structs from vector items with tenant ID.
- """
- return [
- PointStruct(
- id=item["id"],
- vector=item["vector"],
- payload={
- "text": item["text"],
- "metadata": item["metadata"],
- "tenant_id": tenant_id,
- },
- )
- for item in items
- ]
- def has_collection(self, collection_name: str) -> bool:
- """
- Check if a logical collection exists by checking for any points with the tenant ID.
- """
- if not self.client:
- return False
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Create tenant filter
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- try:
- # Try directly querying - most of the time collection should exist
- response = self.client.query_points(
- collection_name=mt_collection,
- query_filter=models.Filter(must=[tenant_filter]),
- limit=1,
- )
- # Collection exists with this tenant ID if there are points
- return len(response.points) > 0
- except (UnexpectedResponse, grpc.RpcError) as e:
- if self._is_collection_not_found_error(e):
- log.debug(f"Collection {mt_collection} doesn't exist")
- return False
- else:
- # For other API errors, log and return False
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unexpected Qdrant error: {error_msg}")
- return False
- except Exception as e:
- # For any other errors, log and return False
- log.debug(f"Error checking collection {mt_collection}: {e}")
- return False
- def delete(
- self,
- collection_name: str,
- ids: Optional[list[str]] = None,
- filter: Optional[dict] = None,
- ):
- """
- Delete vectors by ID or filter from a collection with tenant isolation.
- """
- if not self.client:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Create tenant filter
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- must_conditions = [tenant_filter]
- should_conditions = []
- if ids:
- for id_value in ids:
- should_conditions.append(
- models.FieldCondition(
- key="metadata.id",
- match=models.MatchValue(value=id_value),
- ),
- )
- elif filter:
- for key, value in filter.items():
- must_conditions.append(
- models.FieldCondition(
- key=f"metadata.{key}",
- match=models.MatchValue(value=value),
- ),
- )
- try:
- # Try to delete directly - most of the time collection should exist
- update_result = self.client.delete(
- collection_name=mt_collection,
- points_selector=models.FilterSelector(
- filter=models.Filter(must=must_conditions, should=should_conditions)
- ),
- )
- return update_result
- except (UnexpectedResponse, grpc.RpcError) as e:
- if self._is_collection_not_found_error(e):
- log.debug(
- f"Collection {mt_collection} doesn't exist, nothing to delete"
- )
- return None
- else:
- # For other API errors, log and re-raise
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unexpected Qdrant error: {error_msg}")
- raise
- except Exception as e:
- # For non-Qdrant exceptions, re-raise
- raise
- def search(
- self, collection_name: str, vectors: list[list[float | int]], limit: int
- ) -> Optional[SearchResult]:
- """
- Search for the nearest neighbor items based on the vectors with tenant isolation.
- """
- if not self.client:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Get the vector dimension from the query vector
- dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
- try:
- # Try the search operation directly - most of the time collection should exist
- # Create tenant filter
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- # Ensure vector dimensions match the collection
- collection_dim = self.client.get_collection(
- mt_collection
- ).config.params.vectors.size
- if collection_dim != dimension:
- if collection_dim < dimension:
- vectors = [vector[:collection_dim] for vector in vectors]
- else:
- vectors = [
- vector + [0] * (collection_dim - dimension)
- for vector in vectors
- ]
- # Search with tenant filter
- prefetch_query = models.Prefetch(
- filter=models.Filter(must=[tenant_filter]),
- limit=NO_LIMIT,
- )
- query_response = self.client.query_points(
- collection_name=mt_collection,
- query=vectors[0],
- prefetch=prefetch_query,
- limit=limit,
- )
- get_result = self._result_to_get_result(query_response.points)
- return SearchResult(
- ids=get_result.ids,
- documents=get_result.documents,
- metadatas=get_result.metadatas,
- # qdrant distance is [-1, 1], normalize to [0, 1]
- distances=[
- [(point.score + 1.0) / 2.0 for point in query_response.points]
- ],
- )
- except (UnexpectedResponse, grpc.RpcError) as e:
- if self._is_collection_not_found_error(e):
- log.debug(
- f"Collection {mt_collection} doesn't exist, search returns None"
- )
- return None
- else:
- # For other API errors, log and re-raise
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unexpected Qdrant error during search: {error_msg}")
- raise
- except Exception as e:
- # For non-Qdrant exceptions, log and return None
- log.exception(f"Error searching collection '{collection_name}': {e}")
- return None
- def query(self, collection_name: str, filter: dict, limit: Optional[int] = None):
- """
- Query points with filters and tenant isolation.
- """
- if not self.client:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Set default limit if not provided
- if limit is None:
- limit = NO_LIMIT
- # Create tenant filter
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- # Create metadata filters
- field_conditions = []
- for key, value in filter.items():
- field_conditions.append(
- models.FieldCondition(
- key=f"metadata.{key}", match=models.MatchValue(value=value)
- )
- )
- # Combine tenant filter with metadata filters
- combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
- try:
- # Try the query directly - most of the time collection should exist
- points = self.client.query_points(
- collection_name=mt_collection,
- query_filter=combined_filter,
- limit=limit,
- )
- return self._result_to_get_result(points.points)
- except (UnexpectedResponse, grpc.RpcError) as e:
- if self._is_collection_not_found_error(e):
- log.debug(
- f"Collection {mt_collection} doesn't exist, query returns None"
- )
- return None
- else:
- # For other API errors, log and re-raise
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unexpected Qdrant error during query: {error_msg}")
- raise
- except Exception as e:
- # For non-Qdrant exceptions, log and re-raise
- log.exception(f"Error querying collection '{collection_name}': {e}")
- return None
- def get(self, collection_name: str) -> Optional[GetResult]:
- """
- Get all items in a collection with tenant isolation.
- """
- if not self.client:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Create tenant filter
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- try:
- # Try to get points directly - most of the time collection should exist
- points = self.client.query_points(
- collection_name=mt_collection,
- query_filter=models.Filter(must=[tenant_filter]),
- limit=NO_LIMIT,
- )
- return self._result_to_get_result(points.points)
- except (UnexpectedResponse, grpc.RpcError) as e:
- if self._is_collection_not_found_error(e):
- log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
- return None
- else:
- # For other API errors, log and re-raise
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unexpected Qdrant error during get: {error_msg}")
- raise
- except Exception as e:
- # For non-Qdrant exceptions, log and return None
- log.exception(f"Error getting collection '{collection_name}': {e}")
- return None
- def _handle_operation_with_error_retry(
- self, operation_name, mt_collection, points, dimension
- ):
- """
- Private helper to handle common error cases for insert and upsert operations.
- Args:
- operation_name: 'insert' or 'upsert'
- mt_collection: The multi-tenant collection name
- points: The vector points to insert/upsert
- dimension: The dimension of the vectors
- Returns:
- The operation result (for upsert) or None (for insert)
- """
- try:
- if operation_name == "insert":
- self.client.upload_points(mt_collection, points)
- return None
- else: # upsert
- return self.client.upsert(mt_collection, points)
- except (UnexpectedResponse, grpc.RpcError) as e:
- # Handle collection not found
- if self._is_collection_not_found_error(e):
- log.info(
- f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}."
- )
- # Create collection with correct dimensions from our vectors
- self._create_multi_tenant_collection_if_not_exists(
- mt_collection_name=mt_collection, dimension=dimension
- )
- # Try operation again - no need for dimension adjustment since we just created with correct dimensions
- if operation_name == "insert":
- self.client.upload_points(mt_collection, points)
- return None
- else: # upsert
- return self.client.upsert(mt_collection, points)
- # Handle dimension mismatch
- elif self._is_dimension_mismatch_error(e):
- # For dimension errors, the collection must exist, so get its configuration
- mt_collection_info = self.client.get_collection(mt_collection)
- existing_size = mt_collection_info.config.params.vectors.size
- log.info(
- f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}"
- )
- if existing_size < dimension:
- # Truncate vectors to fit
- log.info(
- f"Truncating vectors from {dimension} to {existing_size} dimensions"
- )
- points = [
- PointStruct(
- id=point.id,
- vector=point.vector[:existing_size],
- payload=point.payload,
- )
- for point in points
- ]
- elif existing_size > dimension:
- # Pad vectors with zeros
- log.info(
- f"Padding vectors from {dimension} to {existing_size} dimensions with zeros"
- )
- points = [
- PointStruct(
- id=point.id,
- vector=point.vector
- + [0] * (existing_size - len(point.vector)),
- payload=point.payload,
- )
- for point in points
- ]
- # Try operation again with adjusted dimensions
- if operation_name == "insert":
- self.client.upload_points(mt_collection, points)
- return None
- else: # upsert
- return self.client.upsert(mt_collection, points)
- else:
- # Not a known error we can handle, log and re-raise
- _, error_msg = self._extract_error_message(e)
- log.warning(f"Unhandled Qdrant error: {error_msg}")
- raise
- except Exception as e:
- # For non-Qdrant exceptions, re-raise
- raise
- def insert(self, collection_name: str, items: list[VectorItem]):
- """
- Insert items with tenant ID.
- """
- if not self.client or not items:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Get dimensions from the actual vectors
- dimension = len(items[0]["vector"]) if items else None
- # Create points with tenant ID
- points = self._create_points(items, tenant_id)
- # Handle the operation with error retry
- return self._handle_operation_with_error_retry(
- "insert", mt_collection, points, dimension
- )
- def upsert(self, collection_name: str, items: list[VectorItem]):
- """
- Upsert items with tenant ID.
- """
- if not self.client or not items:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- # Get dimensions from the actual vectors
- dimension = len(items[0]["vector"]) if items else None
- # Create points with tenant ID
- points = self._create_points(items, tenant_id)
- # Handle the operation with error retry
- return self._handle_operation_with_error_retry(
- "upsert", mt_collection, points, dimension
- )
- def reset(self):
- """
- Reset the database by deleting all collections.
- """
- if not self.client:
- return None
- collection_names = self.client.get_collections().collections
- for collection_name in collection_names:
- if collection_name.name.startswith(self.collection_prefix):
- self.client.delete_collection(collection_name=collection_name.name)
- def delete_collection(self, collection_name: str):
- """
- Delete a collection.
- """
- if not self.client:
- return None
- # Map to multi-tenant collection and tenant ID
- mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
- tenant_filter = models.FieldCondition(
- key="tenant_id", match=models.MatchValue(value=tenant_id)
- )
- field_conditions = [tenant_filter]
- update_result = self.client.delete(
- collection_name=mt_collection,
- points_selector=models.FilterSelector(
- filter=models.Filter(must=field_conditions)
- ),
- )
- if self.client.get_collection(mt_collection).points_count == 0:
- self.client.delete_collection(mt_collection)
- return update_result
|