s3vector.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. from open_webui.retrieval.vector.utils import process_metadata
  2. from open_webui.retrieval.vector.main import (
  3. VectorDBBase,
  4. VectorItem,
  5. GetResult,
  6. SearchResult,
  7. )
  8. from open_webui.config import S3_VECTOR_BUCKET_NAME, S3_VECTOR_REGION
  9. from open_webui.env import SRC_LOG_LEVELS
  10. from typing import List, Optional, Dict, Any, Union
  11. import logging
  12. import boto3
  13. log = logging.getLogger(__name__)
  14. log.setLevel(SRC_LOG_LEVELS["RAG"])
  15. class S3VectorClient(VectorDBBase):
  16. """
  17. AWS S3 Vector integration for Open WebUI Knowledge.
  18. """
  19. def __init__(self):
  20. self.bucket_name = S3_VECTOR_BUCKET_NAME
  21. self.region = S3_VECTOR_REGION
  22. # Simple validation - log warnings instead of raising exceptions
  23. if not self.bucket_name:
  24. log.warning("S3_VECTOR_BUCKET_NAME not set - S3Vector will not work")
  25. if not self.region:
  26. log.warning("S3_VECTOR_REGION not set - S3Vector will not work")
  27. if self.bucket_name and self.region:
  28. try:
  29. self.client = boto3.client("s3vectors", region_name=self.region)
  30. log.info(
  31. f"S3Vector client initialized for bucket '{self.bucket_name}' in region '{self.region}'"
  32. )
  33. except Exception as e:
  34. log.error(f"Failed to initialize S3Vector client: {e}")
  35. self.client = None
  36. else:
  37. self.client = None
  38. def _create_index(
  39. self,
  40. index_name: str,
  41. dimension: int,
  42. data_type: str = "float32",
  43. distance_metric: str = "cosine",
  44. ) -> None:
  45. """
  46. Create a new index in the S3 vector bucket for the given collection if it does not exist.
  47. """
  48. if self.has_collection(index_name):
  49. log.debug(f"Index '{index_name}' already exists, skipping creation")
  50. return
  51. try:
  52. self.client.create_index(
  53. vectorBucketName=self.bucket_name,
  54. indexName=index_name,
  55. dataType=data_type,
  56. dimension=dimension,
  57. distanceMetric=distance_metric,
  58. )
  59. log.info(
  60. f"Created S3 index: {index_name} (dim={dimension}, type={data_type}, metric={distance_metric})"
  61. )
  62. except Exception as e:
  63. log.error(f"Error creating S3 index '{index_name}': {e}")
  64. raise
  65. def _filter_metadata(
  66. self, metadata: Dict[str, Any], item_id: str
  67. ) -> Dict[str, Any]:
  68. """
  69. Filter vector metadata keys to comply with S3 Vector API limit of 10 keys maximum.
  70. """
  71. if not isinstance(metadata, dict) or len(metadata) <= 10:
  72. return metadata
  73. # Keep only the first 10 keys, prioritizing important ones based on actual Open WebUI metadata
  74. important_keys = [
  75. "text", # The actual document content
  76. "file_id", # File ID
  77. "source", # Document source file
  78. "title", # Document title
  79. "page", # Page number
  80. "total_pages", # Total pages in document
  81. "embedding_config", # Embedding configuration
  82. "created_by", # User who created it
  83. "name", # Document name
  84. "hash", # Content hash
  85. ]
  86. filtered_metadata = {}
  87. # First, add important keys if they exist
  88. for key in important_keys:
  89. if key in metadata:
  90. filtered_metadata[key] = metadata[key]
  91. if len(filtered_metadata) >= 10:
  92. break
  93. # If we still have room, add other keys
  94. if len(filtered_metadata) < 10:
  95. for key, value in metadata.items():
  96. if key not in filtered_metadata:
  97. filtered_metadata[key] = value
  98. if len(filtered_metadata) >= 10:
  99. break
  100. log.warning(
  101. f"Metadata for key '{item_id}' had {len(metadata)} keys, limited to 10 keys"
  102. )
  103. return filtered_metadata
  104. def has_collection(self, collection_name: str) -> bool:
  105. """
  106. Check if a vector index (collection) exists in the S3 vector bucket.
  107. """
  108. try:
  109. response = self.client.list_indexes(vectorBucketName=self.bucket_name)
  110. indexes = response.get("indexes", [])
  111. return any(idx.get("indexName") == collection_name for idx in indexes)
  112. except Exception as e:
  113. log.error(f"Error listing indexes: {e}")
  114. return False
  115. def delete_collection(self, collection_name: str) -> None:
  116. """
  117. Delete an entire S3 Vector index/collection.
  118. """
  119. if not self.has_collection(collection_name):
  120. log.warning(
  121. f"Collection '{collection_name}' does not exist, nothing to delete"
  122. )
  123. return
  124. try:
  125. log.info(f"Deleting collection '{collection_name}'")
  126. self.client.delete_index(
  127. vectorBucketName=self.bucket_name, indexName=collection_name
  128. )
  129. log.info(f"Successfully deleted collection '{collection_name}'")
  130. except Exception as e:
  131. log.error(f"Error deleting collection '{collection_name}': {e}")
  132. raise
  133. def insert(self, collection_name: str, items: List[VectorItem]) -> None:
  134. """
  135. Insert vector items into the S3 Vector index. Create index if it does not exist.
  136. """
  137. if not items:
  138. log.warning("No items to insert")
  139. return
  140. dimension = len(items[0]["vector"])
  141. try:
  142. if not self.has_collection(collection_name):
  143. log.info(f"Index '{collection_name}' does not exist. Creating index.")
  144. self._create_index(
  145. index_name=collection_name,
  146. dimension=dimension,
  147. data_type="float32",
  148. distance_metric="cosine",
  149. )
  150. # Prepare vectors for insertion
  151. vectors = []
  152. for item in items:
  153. # Ensure vector data is in the correct format for S3 Vector API
  154. vector_data = item["vector"]
  155. if isinstance(vector_data, list):
  156. # Convert list to float32 values as required by S3 Vector API
  157. vector_data = [float(x) for x in vector_data]
  158. # Prepare metadata, ensuring the text field is preserved
  159. metadata = item.get("metadata", {}).copy()
  160. # Add the text field to metadata so it's available for retrieval
  161. metadata["text"] = item["text"]
  162. # Convert metadata to string format for consistency
  163. metadata = process_metadata(metadata)
  164. # Filter metadata to comply with S3 Vector API limit of 10 keys
  165. metadata = self._filter_metadata(metadata, item["id"])
  166. vectors.append(
  167. {
  168. "key": item["id"],
  169. "data": {"float32": vector_data},
  170. "metadata": metadata,
  171. }
  172. )
  173. # Insert vectors in batches of 500 (S3 Vector API limit)
  174. batch_size = 500
  175. for i in range(0, len(vectors), batch_size):
  176. batch = vectors[i : i + batch_size]
  177. self.client.put_vectors(
  178. vectorBucketName=self.bucket_name,
  179. indexName=collection_name,
  180. vectors=batch,
  181. )
  182. log.info(
  183. f"Inserted batch {i//batch_size + 1}: {len(batch)} vectors into index '{collection_name}'."
  184. )
  185. log.info(
  186. f"Completed insertion of {len(vectors)} vectors into index '{collection_name}'."
  187. )
  188. except Exception as e:
  189. log.error(f"Error inserting vectors: {e}")
  190. raise
  191. def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
  192. """
  193. Insert or update vector items in the S3 Vector index. Create index if it does not exist.
  194. """
  195. if not items:
  196. log.warning("No items to upsert")
  197. return
  198. dimension = len(items[0]["vector"])
  199. log.info(f"Upsert dimension: {dimension}")
  200. try:
  201. if not self.has_collection(collection_name):
  202. log.info(
  203. f"Index '{collection_name}' does not exist. Creating index for upsert."
  204. )
  205. self._create_index(
  206. index_name=collection_name,
  207. dimension=dimension,
  208. data_type="float32",
  209. distance_metric="cosine",
  210. )
  211. # Prepare vectors for upsert
  212. vectors = []
  213. for item in items:
  214. # Ensure vector data is in the correct format for S3 Vector API
  215. vector_data = item["vector"]
  216. if isinstance(vector_data, list):
  217. # Convert list to float32 values as required by S3 Vector API
  218. vector_data = [float(x) for x in vector_data]
  219. # Prepare metadata, ensuring the text field is preserved
  220. metadata = item.get("metadata", {}).copy()
  221. # Add the text field to metadata so it's available for retrieval
  222. metadata["text"] = item["text"]
  223. # Convert metadata to string format for consistency
  224. metadata = process_metadata(metadata)
  225. # Filter metadata to comply with S3 Vector API limit of 10 keys
  226. metadata = self._filter_metadata(metadata, item["id"])
  227. vectors.append(
  228. {
  229. "key": item["id"],
  230. "data": {"float32": vector_data},
  231. "metadata": metadata,
  232. }
  233. )
  234. # Upsert vectors in batches of 500 (S3 Vector API limit)
  235. batch_size = 500
  236. for i in range(0, len(vectors), batch_size):
  237. batch = vectors[i : i + batch_size]
  238. if i == 0: # Log sample info for first batch only
  239. log.info(
  240. f"Upserting batch 1: {len(batch)} vectors. First vector sample: key={batch[0]['key']}, data_type={type(batch[0]['data']['float32'])}, data_len={len(batch[0]['data']['float32'])}"
  241. )
  242. else:
  243. log.info(
  244. f"Upserting batch {i//batch_size + 1}: {len(batch)} vectors."
  245. )
  246. self.client.put_vectors(
  247. vectorBucketName=self.bucket_name,
  248. indexName=collection_name,
  249. vectors=batch,
  250. )
  251. log.info(
  252. f"Completed upsert of {len(vectors)} vectors into index '{collection_name}'."
  253. )
  254. except Exception as e:
  255. log.error(f"Error upserting vectors: {e}")
  256. raise
  257. def search(
  258. self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int
  259. ) -> Optional[SearchResult]:
  260. """
  261. Search for similar vectors in a collection using multiple query vectors.
  262. """
  263. if not self.has_collection(collection_name):
  264. log.warning(f"Collection '{collection_name}' does not exist")
  265. return None
  266. if not vectors:
  267. log.warning("No query vectors provided")
  268. return None
  269. try:
  270. log.info(
  271. f"Searching collection '{collection_name}' with {len(vectors)} query vectors, limit={limit}"
  272. )
  273. # Initialize result lists
  274. all_ids = []
  275. all_documents = []
  276. all_metadatas = []
  277. all_distances = []
  278. # Process each query vector
  279. for i, query_vector in enumerate(vectors):
  280. log.debug(f"Processing query vector {i+1}/{len(vectors)}")
  281. # Prepare the query vector in S3 Vector format
  282. query_vector_dict = {"float32": [float(x) for x in query_vector]}
  283. # Call S3 Vector query API
  284. response = self.client.query_vectors(
  285. vectorBucketName=self.bucket_name,
  286. indexName=collection_name,
  287. topK=limit,
  288. queryVector=query_vector_dict,
  289. returnMetadata=True,
  290. returnDistance=True,
  291. )
  292. # Process results for this query
  293. query_ids = []
  294. query_documents = []
  295. query_metadatas = []
  296. query_distances = []
  297. result_vectors = response.get("vectors", [])
  298. for vector in result_vectors:
  299. vector_id = vector.get("key")
  300. vector_metadata = vector.get("metadata", {})
  301. vector_distance = vector.get("distance", 0.0)
  302. # Extract document text from metadata
  303. document_text = ""
  304. if isinstance(vector_metadata, dict):
  305. # Get the text field first (highest priority)
  306. document_text = vector_metadata.get("text")
  307. if not document_text:
  308. # Fallback to other possible text fields
  309. document_text = (
  310. vector_metadata.get("content")
  311. or vector_metadata.get("document")
  312. or vector_id
  313. )
  314. else:
  315. document_text = vector_id
  316. query_ids.append(vector_id)
  317. query_documents.append(document_text)
  318. query_metadatas.append(vector_metadata)
  319. query_distances.append(vector_distance)
  320. # Add this query's results to the overall results
  321. all_ids.append(query_ids)
  322. all_documents.append(query_documents)
  323. all_metadatas.append(query_metadatas)
  324. all_distances.append(query_distances)
  325. log.info(f"Search completed. Found results for {len(all_ids)} queries")
  326. # Return SearchResult format
  327. return SearchResult(
  328. ids=all_ids if all_ids else None,
  329. documents=all_documents if all_documents else None,
  330. metadatas=all_metadatas if all_metadatas else None,
  331. distances=all_distances if all_distances else None,
  332. )
  333. except Exception as e:
  334. log.error(f"Error searching collection '{collection_name}': {str(e)}")
  335. # Handle specific AWS exceptions
  336. if hasattr(e, "response") and "Error" in e.response:
  337. error_code = e.response["Error"]["Code"]
  338. if error_code == "NotFoundException":
  339. log.warning(f"Collection '{collection_name}' not found")
  340. return None
  341. elif error_code == "ValidationException":
  342. log.error(f"Invalid query vector dimensions or parameters")
  343. return None
  344. elif error_code == "AccessDeniedException":
  345. log.error(
  346. f"Access denied for collection '{collection_name}'. Check permissions."
  347. )
  348. return None
  349. raise
  350. def query(
  351. self, collection_name: str, filter: Dict, limit: Optional[int] = None
  352. ) -> Optional[GetResult]:
  353. """
  354. Query vectors from a collection using metadata filter.
  355. """
  356. if not self.has_collection(collection_name):
  357. log.warning(f"Collection '{collection_name}' does not exist")
  358. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  359. if not filter:
  360. log.warning("No filter provided, returning all vectors")
  361. return self.get(collection_name)
  362. try:
  363. log.info(f"Querying collection '{collection_name}' with filter: {filter}")
  364. # For S3 Vector, we need to use list_vectors and then filter results
  365. # Since S3 Vector may not support complex server-side filtering,
  366. # we'll retrieve all vectors and filter client-side
  367. # Get all vectors first
  368. all_vectors_result = self.get(collection_name)
  369. if not all_vectors_result or not all_vectors_result.ids:
  370. log.warning("No vectors found in collection")
  371. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  372. # Extract the lists from the result
  373. all_ids = all_vectors_result.ids[0] if all_vectors_result.ids else []
  374. all_documents = (
  375. all_vectors_result.documents[0] if all_vectors_result.documents else []
  376. )
  377. all_metadatas = (
  378. all_vectors_result.metadatas[0] if all_vectors_result.metadatas else []
  379. )
  380. # Apply client-side filtering
  381. filtered_ids = []
  382. filtered_documents = []
  383. filtered_metadatas = []
  384. for i, metadata in enumerate(all_metadatas):
  385. if self._matches_filter(metadata, filter):
  386. if i < len(all_ids):
  387. filtered_ids.append(all_ids[i])
  388. if i < len(all_documents):
  389. filtered_documents.append(all_documents[i])
  390. filtered_metadatas.append(metadata)
  391. # Apply limit if specified
  392. if limit and len(filtered_ids) >= limit:
  393. break
  394. log.info(
  395. f"Filter applied: {len(filtered_ids)} vectors match out of {len(all_ids)} total"
  396. )
  397. # Return GetResult format
  398. if filtered_ids:
  399. return GetResult(
  400. ids=[filtered_ids],
  401. documents=[filtered_documents],
  402. metadatas=[filtered_metadatas],
  403. )
  404. else:
  405. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  406. except Exception as e:
  407. log.error(f"Error querying collection '{collection_name}': {str(e)}")
  408. # Handle specific AWS exceptions
  409. if hasattr(e, "response") and "Error" in e.response:
  410. error_code = e.response["Error"]["Code"]
  411. if error_code == "NotFoundException":
  412. log.warning(f"Collection '{collection_name}' not found")
  413. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  414. elif error_code == "AccessDeniedException":
  415. log.error(
  416. f"Access denied for collection '{collection_name}'. Check permissions."
  417. )
  418. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  419. raise
  420. def get(self, collection_name: str) -> Optional[GetResult]:
  421. """
  422. Retrieve all vectors from a collection.
  423. """
  424. if not self.has_collection(collection_name):
  425. log.warning(f"Collection '{collection_name}' does not exist")
  426. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  427. try:
  428. log.info(f"Retrieving all vectors from collection '{collection_name}'")
  429. # Initialize result lists
  430. all_ids = []
  431. all_documents = []
  432. all_metadatas = []
  433. # Handle pagination
  434. next_token = None
  435. while True:
  436. # Prepare request parameters
  437. request_params = {
  438. "vectorBucketName": self.bucket_name,
  439. "indexName": collection_name,
  440. "returnData": False, # Don't include vector data (not needed for get)
  441. "returnMetadata": True, # Include metadata
  442. "maxResults": 500, # Use reasonable page size
  443. }
  444. if next_token:
  445. request_params["nextToken"] = next_token
  446. # Call S3 Vector API
  447. response = self.client.list_vectors(**request_params)
  448. # Process vectors in this page
  449. vectors = response.get("vectors", [])
  450. for vector in vectors:
  451. vector_id = vector.get("key")
  452. vector_data = vector.get("data", {})
  453. vector_metadata = vector.get("metadata", {})
  454. # Extract the actual vector array
  455. vector_array = vector_data.get("float32", [])
  456. # For documents, we try to extract text from metadata or use the vector ID
  457. document_text = ""
  458. if isinstance(vector_metadata, dict):
  459. # Get the text field first (highest priority)
  460. document_text = vector_metadata.get("text")
  461. if not document_text:
  462. # Fallback to other possible text fields
  463. document_text = (
  464. vector_metadata.get("content")
  465. or vector_metadata.get("document")
  466. or vector_id
  467. )
  468. # Log the actual content for debugging
  469. log.debug(
  470. f"Document text preview (first 200 chars): {str(document_text)[:200]}"
  471. )
  472. else:
  473. document_text = vector_id
  474. all_ids.append(vector_id)
  475. all_documents.append(document_text)
  476. all_metadatas.append(vector_metadata)
  477. # Check if there are more pages
  478. next_token = response.get("nextToken")
  479. if not next_token:
  480. break
  481. log.info(
  482. f"Retrieved {len(all_ids)} vectors from collection '{collection_name}'"
  483. )
  484. # Return in GetResult format
  485. # The Open WebUI GetResult expects lists of lists, so we wrap each list
  486. if all_ids:
  487. return GetResult(
  488. ids=[all_ids], documents=[all_documents], metadatas=[all_metadatas]
  489. )
  490. else:
  491. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  492. except Exception as e:
  493. log.error(
  494. f"Error retrieving vectors from collection '{collection_name}': {str(e)}"
  495. )
  496. # Handle specific AWS exceptions
  497. if hasattr(e, "response") and "Error" in e.response:
  498. error_code = e.response["Error"]["Code"]
  499. if error_code == "NotFoundException":
  500. log.warning(f"Collection '{collection_name}' not found")
  501. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  502. elif error_code == "AccessDeniedException":
  503. log.error(
  504. f"Access denied for collection '{collection_name}'. Check permissions."
  505. )
  506. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  507. raise
  508. def delete(
  509. self,
  510. collection_name: str,
  511. ids: Optional[List[str]] = None,
  512. filter: Optional[Dict] = None,
  513. ) -> None:
  514. """
  515. Delete vectors by ID or filter from a collection.
  516. """
  517. if not self.has_collection(collection_name):
  518. log.warning(
  519. f"Collection '{collection_name}' does not exist, nothing to delete"
  520. )
  521. return
  522. # Check if this is a knowledge collection (not file-specific)
  523. is_knowledge_collection = not collection_name.startswith("file-")
  524. try:
  525. if ids:
  526. # Delete by specific vector IDs/keys
  527. log.info(
  528. f"Deleting {len(ids)} vectors by IDs from collection '{collection_name}'"
  529. )
  530. self.client.delete_vectors(
  531. vectorBucketName=self.bucket_name,
  532. indexName=collection_name,
  533. keys=ids,
  534. )
  535. log.info(f"Deleted {len(ids)} vectors from index '{collection_name}'")
  536. elif filter:
  537. # Handle filter-based deletion
  538. log.info(
  539. f"Deleting vectors by filter from collection '{collection_name}': {filter}"
  540. )
  541. # If this is a knowledge collection and we have a file_id filter,
  542. # also clean up the corresponding file-specific collection
  543. if is_knowledge_collection and "file_id" in filter:
  544. file_id = filter["file_id"]
  545. file_collection_name = f"file-{file_id}"
  546. if self.has_collection(file_collection_name):
  547. log.info(
  548. f"Found related file-specific collection '{file_collection_name}', deleting it to prevent duplicates"
  549. )
  550. self.delete_collection(file_collection_name)
  551. # For the main collection, implement query-then-delete
  552. # First, query to get IDs matching the filter
  553. query_result = self.query(collection_name, filter)
  554. if query_result and query_result.ids and query_result.ids[0]:
  555. matching_ids = query_result.ids[0]
  556. log.info(
  557. f"Found {len(matching_ids)} vectors matching filter, deleting them"
  558. )
  559. # Delete the matching vectors by ID
  560. self.client.delete_vectors(
  561. vectorBucketName=self.bucket_name,
  562. indexName=collection_name,
  563. keys=matching_ids,
  564. )
  565. log.info(
  566. f"Deleted {len(matching_ids)} vectors from index '{collection_name}' using filter"
  567. )
  568. else:
  569. log.warning("No vectors found matching the filter criteria")
  570. else:
  571. log.warning("No IDs or filter provided for deletion")
  572. except Exception as e:
  573. log.error(
  574. f"Error deleting vectors from collection '{collection_name}': {e}"
  575. )
  576. raise
  577. def reset(self) -> None:
  578. """
  579. Reset/clear all vector data. For S3 Vector, this deletes all indexes.
  580. """
  581. try:
  582. log.warning(
  583. "Reset called - this will delete all vector indexes in the S3 bucket"
  584. )
  585. # List all indexes
  586. response = self.client.list_indexes(vectorBucketName=self.bucket_name)
  587. indexes = response.get("indexes", [])
  588. if not indexes:
  589. log.warning("No indexes found to delete")
  590. return
  591. # Delete all indexes
  592. deleted_count = 0
  593. for index in indexes:
  594. index_name = index.get("indexName")
  595. if index_name:
  596. try:
  597. self.client.delete_index(
  598. vectorBucketName=self.bucket_name, indexName=index_name
  599. )
  600. deleted_count += 1
  601. log.info(f"Deleted index: {index_name}")
  602. except Exception as e:
  603. log.error(f"Error deleting index '{index_name}': {e}")
  604. log.info(f"Reset completed: deleted {deleted_count} indexes")
  605. except Exception as e:
  606. log.error(f"Error during reset: {e}")
  607. raise
  608. def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool:
  609. """
  610. Check if metadata matches the given filter conditions.
  611. """
  612. if not isinstance(metadata, dict) or not isinstance(filter, dict):
  613. return False
  614. # Check each filter condition
  615. for key, expected_value in filter.items():
  616. # Handle special operators
  617. if key.startswith("$"):
  618. if key == "$and":
  619. # All conditions must match
  620. if not isinstance(expected_value, list):
  621. continue
  622. for condition in expected_value:
  623. if not self._matches_filter(metadata, condition):
  624. return False
  625. elif key == "$or":
  626. # At least one condition must match
  627. if not isinstance(expected_value, list):
  628. continue
  629. any_match = False
  630. for condition in expected_value:
  631. if self._matches_filter(metadata, condition):
  632. any_match = True
  633. break
  634. if not any_match:
  635. return False
  636. continue
  637. # Get the actual value from metadata
  638. actual_value = metadata.get(key)
  639. # Handle different types of expected values
  640. if isinstance(expected_value, dict):
  641. # Handle comparison operators
  642. for op, op_value in expected_value.items():
  643. if op == "$eq":
  644. if actual_value != op_value:
  645. return False
  646. elif op == "$ne":
  647. if actual_value == op_value:
  648. return False
  649. elif op == "$in":
  650. if (
  651. not isinstance(op_value, list)
  652. or actual_value not in op_value
  653. ):
  654. return False
  655. elif op == "$nin":
  656. if isinstance(op_value, list) and actual_value in op_value:
  657. return False
  658. elif op == "$exists":
  659. if bool(op_value) != (key in metadata):
  660. return False
  661. # Add more operators as needed
  662. else:
  663. # Simple equality check
  664. if actual_value != expected_value:
  665. return False
  666. return True