qdrant_multitenancy.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. import logging
  2. from typing import Optional, Tuple
  3. from urllib.parse import urlparse
  4. import grpc
  5. from open_webui.config import (
  6. QDRANT_API_KEY,
  7. QDRANT_GRPC_PORT,
  8. QDRANT_ON_DISK,
  9. QDRANT_PREFER_GRPC,
  10. QDRANT_URI,
  11. QDRANT_COLLECTION_PREFIX,
  12. )
  13. from open_webui.env import SRC_LOG_LEVELS
  14. from open_webui.retrieval.vector.main import (
  15. GetResult,
  16. SearchResult,
  17. VectorDBBase,
  18. VectorItem,
  19. )
  20. from qdrant_client import QdrantClient as Qclient
  21. from qdrant_client.http.exceptions import UnexpectedResponse
  22. from qdrant_client.http.models import PointStruct
  23. from qdrant_client.models import models
  24. NO_LIMIT = 999999999
  25. log = logging.getLogger(__name__)
  26. log.setLevel(SRC_LOG_LEVELS["RAG"])
  27. class QdrantClient(VectorDBBase):
  28. def __init__(self):
  29. self.collection_prefix = QDRANT_COLLECTION_PREFIX
  30. self.QDRANT_URI = QDRANT_URI
  31. self.QDRANT_API_KEY = QDRANT_API_KEY
  32. self.QDRANT_ON_DISK = QDRANT_ON_DISK
  33. self.PREFER_GRPC = QDRANT_PREFER_GRPC
  34. self.GRPC_PORT = QDRANT_GRPC_PORT
  35. if not self.QDRANT_URI:
  36. self.client = None
  37. return
  38. # Unified handling for either scheme
  39. parsed = urlparse(self.QDRANT_URI)
  40. host = parsed.hostname or self.QDRANT_URI
  41. http_port = parsed.port or 6333 # default REST port
  42. if self.PREFER_GRPC:
  43. self.client = Qclient(
  44. host=host,
  45. port=http_port,
  46. grpc_port=self.GRPC_PORT,
  47. prefer_grpc=self.PREFER_GRPC,
  48. api_key=self.QDRANT_API_KEY,
  49. )
  50. else:
  51. self.client = Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY)
  52. # Main collection types for multi-tenancy
  53. self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories"
  54. self.KNOWLEDGE_COLLECTION = f"{self.collection_prefix}_knowledge"
  55. self.FILE_COLLECTION = f"{self.collection_prefix}_files"
  56. self.WEB_SEARCH_COLLECTION = f"{self.collection_prefix}_web-search"
  57. self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based"
  58. def _result_to_get_result(self, points) -> GetResult:
  59. ids = []
  60. documents = []
  61. metadatas = []
  62. for point in points:
  63. payload = point.payload
  64. ids.append(point.id)
  65. documents.append(payload["text"])
  66. metadatas.append(payload["metadata"])
  67. return GetResult(
  68. **{
  69. "ids": [ids],
  70. "documents": [documents],
  71. "metadatas": [metadatas],
  72. }
  73. )
  74. def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]:
  75. """
  76. Maps the traditional collection name to multi-tenant collection and tenant ID.
  77. Returns:
  78. tuple: (collection_name, tenant_id)
  79. """
  80. # Check for user memory collections
  81. tenant_id = collection_name
  82. if collection_name.startswith("user-memory-"):
  83. return self.MEMORY_COLLECTION, tenant_id
  84. # Check for file collections
  85. elif collection_name.startswith("file-"):
  86. return self.FILE_COLLECTION, tenant_id
  87. # Check for web search collections
  88. elif collection_name.startswith("web-search-"):
  89. return self.WEB_SEARCH_COLLECTION, tenant_id
  90. # Handle hash-based collections (YouTube and web URLs)
  91. elif len(collection_name) == 63 and all(
  92. c in "0123456789abcdef" for c in collection_name
  93. ):
  94. return self.HASH_BASED_COLLECTION, tenant_id
  95. else:
  96. return self.KNOWLEDGE_COLLECTION, tenant_id
  97. def _extract_error_message(self, exception):
  98. """
  99. Extract error message from either HTTP or gRPC exceptions
  100. Returns:
  101. tuple: (status_code, error_message)
  102. """
  103. # Check if it's an HTTP exception
  104. if isinstance(exception, UnexpectedResponse):
  105. try:
  106. error_data = exception.structured()
  107. error_msg = error_data.get("status", {}).get("error", "")
  108. return exception.status_code, error_msg
  109. except Exception as inner_e:
  110. log.error(f"Failed to parse HTTP error: {inner_e}")
  111. return exception.status_code, str(exception)
  112. # Check if it's a gRPC exception
  113. elif isinstance(exception, grpc.RpcError):
  114. # Extract status code from gRPC error
  115. status_code = None
  116. if hasattr(exception, "code") and callable(exception.code):
  117. status_code = exception.code().value[0]
  118. # Extract error message
  119. error_msg = str(exception)
  120. if "details =" in error_msg:
  121. # Parse the details line which contains the actual error message
  122. try:
  123. details_line = [
  124. line.strip()
  125. for line in error_msg.split("\n")
  126. if "details =" in line
  127. ][0]
  128. error_msg = details_line.split("details =")[1].strip(' "')
  129. except (IndexError, AttributeError):
  130. # Fall back to full message if parsing fails
  131. pass
  132. return status_code, error_msg
  133. # For any other type of exception
  134. return None, str(exception)
  135. def _is_collection_not_found_error(self, exception):
  136. """
  137. Check if the exception is due to collection not found, supporting both HTTP and gRPC
  138. """
  139. status_code, error_msg = self._extract_error_message(exception)
  140. # HTTP error (404)
  141. if (
  142. status_code == 404
  143. and "Collection" in error_msg
  144. and "doesn't exist" in error_msg
  145. ):
  146. return True
  147. # gRPC error (NOT_FOUND status)
  148. if (
  149. isinstance(exception, grpc.RpcError)
  150. and exception.code() == grpc.StatusCode.NOT_FOUND
  151. ):
  152. return True
  153. return False
  154. def _is_dimension_mismatch_error(self, exception):
  155. """
  156. Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC
  157. """
  158. status_code, error_msg = self._extract_error_message(exception)
  159. # Common patterns in both HTTP and gRPC
  160. return (
  161. "Vector dimension error" in error_msg
  162. or "dimensions mismatch" in error_msg
  163. or "invalid vector size" in error_msg
  164. )
  165. def _create_multi_tenant_collection_if_not_exists(
  166. self, mt_collection_name: str, dimension: int = 384
  167. ):
  168. """
  169. Creates a collection with multi-tenancy configuration if it doesn't exist.
  170. Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'.
  171. When creating collections dynamically (insert/upsert), the actual vector dimensions will be used.
  172. """
  173. try:
  174. # Try to create the collection directly - will fail if it already exists
  175. self.client.create_collection(
  176. collection_name=mt_collection_name,
  177. vectors_config=models.VectorParams(
  178. size=dimension,
  179. distance=models.Distance.COSINE,
  180. on_disk=self.QDRANT_ON_DISK,
  181. ),
  182. hnsw_config=models.HnswConfigDiff(
  183. payload_m=16, # Enable per-tenant indexing
  184. m=0,
  185. on_disk=self.QDRANT_ON_DISK,
  186. ),
  187. )
  188. # Create tenant ID payload index
  189. self.client.create_payload_index(
  190. collection_name=mt_collection_name,
  191. field_name="tenant_id",
  192. field_schema=models.KeywordIndexParams(
  193. type=models.KeywordIndexType.KEYWORD,
  194. is_tenant=True,
  195. on_disk=self.QDRANT_ON_DISK,
  196. ),
  197. wait=True,
  198. )
  199. # Create payload indexes for efficient filtering on metadata.hash and metadata.file_id
  200. self.client.create_payload_index(
  201. collection_name=mt_collection_name,
  202. field_name="metadata.hash",
  203. field_schema=models.KeywordIndexParams(
  204. type=models.KeywordIndexType.KEYWORD,
  205. is_tenant=False,
  206. on_disk=self.QDRANT_ON_DISK,
  207. ),
  208. )
  209. self.client.create_payload_index(
  210. collection_name=mt_collection_name,
  211. field_name="metadata.file_id",
  212. field_schema=models.KeywordIndexParams(
  213. type=models.KeywordIndexType.KEYWORD,
  214. is_tenant=False,
  215. on_disk=self.QDRANT_ON_DISK,
  216. ),
  217. )
  218. log.info(
  219. f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
  220. )
  221. except (UnexpectedResponse, grpc.RpcError) as e:
  222. # Check for the specific error indicating collection already exists
  223. status_code, error_msg = self._extract_error_message(e)
  224. # HTTP status code 409 or gRPC ALREADY_EXISTS
  225. if (isinstance(e, UnexpectedResponse) and status_code == 409) or (
  226. isinstance(e, grpc.RpcError)
  227. and e.code() == grpc.StatusCode.ALREADY_EXISTS
  228. ):
  229. if "already exists" in error_msg:
  230. log.debug(f"Collection {mt_collection_name} already exists")
  231. return
  232. # If it's not an already exists error, re-raise
  233. raise e
  234. except Exception as e:
  235. raise e
  236. def _create_points(self, items: list[VectorItem], tenant_id: str):
  237. """
  238. Create point structs from vector items with tenant ID.
  239. """
  240. return [
  241. PointStruct(
  242. id=item["id"],
  243. vector=item["vector"],
  244. payload={
  245. "text": item["text"],
  246. "metadata": item["metadata"],
  247. "tenant_id": tenant_id,
  248. },
  249. )
  250. for item in items
  251. ]
  252. def has_collection(self, collection_name: str) -> bool:
  253. """
  254. Check if a logical collection exists by checking for any points with the tenant ID.
  255. """
  256. if not self.client:
  257. return False
  258. # Map to multi-tenant collection and tenant ID
  259. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  260. # Create tenant filter
  261. tenant_filter = models.FieldCondition(
  262. key="tenant_id", match=models.MatchValue(value=tenant_id)
  263. )
  264. try:
  265. # Try directly querying - most of the time collection should exist
  266. response = self.client.query_points(
  267. collection_name=mt_collection,
  268. query_filter=models.Filter(must=[tenant_filter]),
  269. limit=1,
  270. )
  271. # Collection exists with this tenant ID if there are points
  272. return len(response.points) > 0
  273. except (UnexpectedResponse, grpc.RpcError) as e:
  274. if self._is_collection_not_found_error(e):
  275. log.debug(f"Collection {mt_collection} doesn't exist")
  276. return False
  277. else:
  278. # For other API errors, log and return False
  279. _, error_msg = self._extract_error_message(e)
  280. log.warning(f"Unexpected Qdrant error: {error_msg}")
  281. return False
  282. except Exception as e:
  283. # For any other errors, log and return False
  284. log.debug(f"Error checking collection {mt_collection}: {e}")
  285. return False
  286. def delete(
  287. self,
  288. collection_name: str,
  289. ids: Optional[list[str]] = None,
  290. filter: Optional[dict] = None,
  291. ):
  292. """
  293. Delete vectors by ID or filter from a collection with tenant isolation.
  294. """
  295. if not self.client:
  296. return None
  297. # Map to multi-tenant collection and tenant ID
  298. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  299. # Create tenant filter
  300. tenant_filter = models.FieldCondition(
  301. key="tenant_id", match=models.MatchValue(value=tenant_id)
  302. )
  303. must_conditions = [tenant_filter]
  304. should_conditions = []
  305. if ids:
  306. for id_value in ids:
  307. should_conditions.append(
  308. models.FieldCondition(
  309. key="metadata.id",
  310. match=models.MatchValue(value=id_value),
  311. ),
  312. )
  313. elif filter:
  314. for key, value in filter.items():
  315. must_conditions.append(
  316. models.FieldCondition(
  317. key=f"metadata.{key}",
  318. match=models.MatchValue(value=value),
  319. ),
  320. )
  321. try:
  322. # Try to delete directly - most of the time collection should exist
  323. update_result = self.client.delete(
  324. collection_name=mt_collection,
  325. points_selector=models.FilterSelector(
  326. filter=models.Filter(must=must_conditions, should=should_conditions)
  327. ),
  328. )
  329. return update_result
  330. except (UnexpectedResponse, grpc.RpcError) as e:
  331. if self._is_collection_not_found_error(e):
  332. log.debug(
  333. f"Collection {mt_collection} doesn't exist, nothing to delete"
  334. )
  335. return None
  336. else:
  337. # For other API errors, log and re-raise
  338. _, error_msg = self._extract_error_message(e)
  339. log.warning(f"Unexpected Qdrant error: {error_msg}")
  340. raise
  341. except Exception as e:
  342. # For non-Qdrant exceptions, re-raise
  343. raise
  344. def search(
  345. self, collection_name: str, vectors: list[list[float | int]], limit: int
  346. ) -> Optional[SearchResult]:
  347. """
  348. Search for the nearest neighbor items based on the vectors with tenant isolation.
  349. """
  350. if not self.client:
  351. return None
  352. # Map to multi-tenant collection and tenant ID
  353. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  354. # Get the vector dimension from the query vector
  355. dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
  356. try:
  357. # Try the search operation directly - most of the time collection should exist
  358. # Create tenant filter
  359. tenant_filter = models.FieldCondition(
  360. key="tenant_id", match=models.MatchValue(value=tenant_id)
  361. )
  362. # Ensure vector dimensions match the collection
  363. collection_dim = self.client.get_collection(
  364. mt_collection
  365. ).config.params.vectors.size
  366. if collection_dim != dimension:
  367. if collection_dim < dimension:
  368. vectors = [vector[:collection_dim] for vector in vectors]
  369. else:
  370. vectors = [
  371. vector + [0] * (collection_dim - dimension)
  372. for vector in vectors
  373. ]
  374. # Search with tenant filter
  375. prefetch_query = models.Prefetch(
  376. filter=models.Filter(must=[tenant_filter]),
  377. limit=NO_LIMIT,
  378. )
  379. query_response = self.client.query_points(
  380. collection_name=mt_collection,
  381. query=vectors[0],
  382. prefetch=prefetch_query,
  383. limit=limit,
  384. )
  385. get_result = self._result_to_get_result(query_response.points)
  386. return SearchResult(
  387. ids=get_result.ids,
  388. documents=get_result.documents,
  389. metadatas=get_result.metadatas,
  390. # qdrant distance is [-1, 1], normalize to [0, 1]
  391. distances=[
  392. [(point.score + 1.0) / 2.0 for point in query_response.points]
  393. ],
  394. )
  395. except (UnexpectedResponse, grpc.RpcError) as e:
  396. if self._is_collection_not_found_error(e):
  397. log.debug(
  398. f"Collection {mt_collection} doesn't exist, search returns None"
  399. )
  400. return None
  401. else:
  402. # For other API errors, log and re-raise
  403. _, error_msg = self._extract_error_message(e)
  404. log.warning(f"Unexpected Qdrant error during search: {error_msg}")
  405. raise
  406. except Exception as e:
  407. # For non-Qdrant exceptions, log and return None
  408. log.exception(f"Error searching collection '{collection_name}': {e}")
  409. return None
  410. def query(self, collection_name: str, filter: dict, limit: Optional[int] = None):
  411. """
  412. Query points with filters and tenant isolation.
  413. """
  414. if not self.client:
  415. return None
  416. # Map to multi-tenant collection and tenant ID
  417. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  418. # Set default limit if not provided
  419. if limit is None:
  420. limit = NO_LIMIT
  421. # Create tenant filter
  422. tenant_filter = models.FieldCondition(
  423. key="tenant_id", match=models.MatchValue(value=tenant_id)
  424. )
  425. # Create metadata filters
  426. field_conditions = []
  427. for key, value in filter.items():
  428. field_conditions.append(
  429. models.FieldCondition(
  430. key=f"metadata.{key}", match=models.MatchValue(value=value)
  431. )
  432. )
  433. # Combine tenant filter with metadata filters
  434. combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
  435. try:
  436. # Try the query directly - most of the time collection should exist
  437. points = self.client.query_points(
  438. collection_name=mt_collection,
  439. query_filter=combined_filter,
  440. limit=limit,
  441. )
  442. return self._result_to_get_result(points.points)
  443. except (UnexpectedResponse, grpc.RpcError) as e:
  444. if self._is_collection_not_found_error(e):
  445. log.debug(
  446. f"Collection {mt_collection} doesn't exist, query returns None"
  447. )
  448. return None
  449. else:
  450. # For other API errors, log and re-raise
  451. _, error_msg = self._extract_error_message(e)
  452. log.warning(f"Unexpected Qdrant error during query: {error_msg}")
  453. raise
  454. except Exception as e:
  455. # For non-Qdrant exceptions, log and re-raise
  456. log.exception(f"Error querying collection '{collection_name}': {e}")
  457. return None
  458. def get(self, collection_name: str) -> Optional[GetResult]:
  459. """
  460. Get all items in a collection with tenant isolation.
  461. """
  462. if not self.client:
  463. return None
  464. # Map to multi-tenant collection and tenant ID
  465. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  466. # Create tenant filter
  467. tenant_filter = models.FieldCondition(
  468. key="tenant_id", match=models.MatchValue(value=tenant_id)
  469. )
  470. try:
  471. # Try to get points directly - most of the time collection should exist
  472. points = self.client.query_points(
  473. collection_name=mt_collection,
  474. query_filter=models.Filter(must=[tenant_filter]),
  475. limit=NO_LIMIT,
  476. )
  477. return self._result_to_get_result(points.points)
  478. except (UnexpectedResponse, grpc.RpcError) as e:
  479. if self._is_collection_not_found_error(e):
  480. log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
  481. return None
  482. else:
  483. # For other API errors, log and re-raise
  484. _, error_msg = self._extract_error_message(e)
  485. log.warning(f"Unexpected Qdrant error during get: {error_msg}")
  486. raise
  487. except Exception as e:
  488. # For non-Qdrant exceptions, log and return None
  489. log.exception(f"Error getting collection '{collection_name}': {e}")
  490. return None
  491. def _handle_operation_with_error_retry(
  492. self, operation_name, mt_collection, points, dimension
  493. ):
  494. """
  495. Private helper to handle common error cases for insert and upsert operations.
  496. Args:
  497. operation_name: 'insert' or 'upsert'
  498. mt_collection: The multi-tenant collection name
  499. points: The vector points to insert/upsert
  500. dimension: The dimension of the vectors
  501. Returns:
  502. The operation result (for upsert) or None (for insert)
  503. """
  504. try:
  505. if operation_name == "insert":
  506. self.client.upload_points(mt_collection, points)
  507. return None
  508. else: # upsert
  509. return self.client.upsert(mt_collection, points)
  510. except (UnexpectedResponse, grpc.RpcError) as e:
  511. # Handle collection not found
  512. if self._is_collection_not_found_error(e):
  513. log.info(
  514. f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}."
  515. )
  516. # Create collection with correct dimensions from our vectors
  517. self._create_multi_tenant_collection_if_not_exists(
  518. mt_collection_name=mt_collection, dimension=dimension
  519. )
  520. # Try operation again - no need for dimension adjustment since we just created with correct dimensions
  521. if operation_name == "insert":
  522. self.client.upload_points(mt_collection, points)
  523. return None
  524. else: # upsert
  525. return self.client.upsert(mt_collection, points)
  526. # Handle dimension mismatch
  527. elif self._is_dimension_mismatch_error(e):
  528. # For dimension errors, the collection must exist, so get its configuration
  529. mt_collection_info = self.client.get_collection(mt_collection)
  530. existing_size = mt_collection_info.config.params.vectors.size
  531. log.info(
  532. f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}"
  533. )
  534. if existing_size < dimension:
  535. # Truncate vectors to fit
  536. log.info(
  537. f"Truncating vectors from {dimension} to {existing_size} dimensions"
  538. )
  539. points = [
  540. PointStruct(
  541. id=point.id,
  542. vector=point.vector[:existing_size],
  543. payload=point.payload,
  544. )
  545. for point in points
  546. ]
  547. elif existing_size > dimension:
  548. # Pad vectors with zeros
  549. log.info(
  550. f"Padding vectors from {dimension} to {existing_size} dimensions with zeros"
  551. )
  552. points = [
  553. PointStruct(
  554. id=point.id,
  555. vector=point.vector
  556. + [0] * (existing_size - len(point.vector)),
  557. payload=point.payload,
  558. )
  559. for point in points
  560. ]
  561. # Try operation again with adjusted dimensions
  562. if operation_name == "insert":
  563. self.client.upload_points(mt_collection, points)
  564. return None
  565. else: # upsert
  566. return self.client.upsert(mt_collection, points)
  567. else:
  568. # Not a known error we can handle, log and re-raise
  569. _, error_msg = self._extract_error_message(e)
  570. log.warning(f"Unhandled Qdrant error: {error_msg}")
  571. raise
  572. except Exception as e:
  573. # For non-Qdrant exceptions, re-raise
  574. raise
  575. def insert(self, collection_name: str, items: list[VectorItem]):
  576. """
  577. Insert items with tenant ID.
  578. """
  579. if not self.client or not items:
  580. return None
  581. # Map to multi-tenant collection and tenant ID
  582. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  583. # Get dimensions from the actual vectors
  584. dimension = len(items[0]["vector"]) if items else None
  585. # Create points with tenant ID
  586. points = self._create_points(items, tenant_id)
  587. # Handle the operation with error retry
  588. return self._handle_operation_with_error_retry(
  589. "insert", mt_collection, points, dimension
  590. )
  591. def upsert(self, collection_name: str, items: list[VectorItem]):
  592. """
  593. Upsert items with tenant ID.
  594. """
  595. if not self.client or not items:
  596. return None
  597. # Map to multi-tenant collection and tenant ID
  598. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  599. # Get dimensions from the actual vectors
  600. dimension = len(items[0]["vector"]) if items else None
  601. # Create points with tenant ID
  602. points = self._create_points(items, tenant_id)
  603. # Handle the operation with error retry
  604. return self._handle_operation_with_error_retry(
  605. "upsert", mt_collection, points, dimension
  606. )
  607. def reset(self):
  608. """
  609. Reset the database by deleting all collections.
  610. """
  611. if not self.client:
  612. return None
  613. collection_names = self.client.get_collections().collections
  614. for collection_name in collection_names:
  615. if collection_name.name.startswith(self.collection_prefix):
  616. self.client.delete_collection(collection_name=collection_name.name)
  617. def delete_collection(self, collection_name: str):
  618. """
  619. Delete a collection.
  620. """
  621. if not self.client:
  622. return None
  623. # Map to multi-tenant collection and tenant ID
  624. mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
  625. tenant_filter = models.FieldCondition(
  626. key="tenant_id", match=models.MatchValue(value=tenant_id)
  627. )
  628. field_conditions = [tenant_filter]
  629. update_result = self.client.delete(
  630. collection_name=mt_collection,
  631. points_selector=models.FilterSelector(
  632. filter=models.Filter(must=field_conditions)
  633. ),
  634. )
  635. if self.client.get_collection(mt_collection).points_count == 0:
  636. self.client.delete_collection(mt_collection)
  637. return update_result