s3vector.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. from open_webui.retrieval.vector.main import VectorDBBase, VectorItem, GetResult, SearchResult
  2. from open_webui.config import S3_VECTOR_BUCKET_NAME, S3_VECTOR_REGION
  3. from open_webui.env import SRC_LOG_LEVELS
  4. from typing import List, Optional, Dict, Any, Union
  5. import logging
  6. import boto3
  7. log = logging.getLogger(__name__)
  8. log.setLevel(SRC_LOG_LEVELS["RAG"])
  9. class S3VectorClient(VectorDBBase):
  10. """
  11. AWS S3 Vector integration for Open WebUI Knowledge.
  12. """
  13. def __init__(self):
  14. self.bucket_name = S3_VECTOR_BUCKET_NAME
  15. self.region = S3_VECTOR_REGION
  16. # Simple validation - log warnings instead of raising exceptions
  17. if not self.bucket_name:
  18. log.warning("S3_VECTOR_BUCKET_NAME not set - S3Vector will not work")
  19. if not self.region:
  20. log.warning("S3_VECTOR_REGION not set - S3Vector will not work")
  21. if self.bucket_name and self.region:
  22. try:
  23. self.client = boto3.client("s3vectors", region_name=self.region)
  24. log.info(f"S3Vector client initialized for bucket '{self.bucket_name}' in region '{self.region}'")
  25. except Exception as e:
  26. log.error(f"Failed to initialize S3Vector client: {e}")
  27. self.client = None
  28. else:
  29. self.client = None
  30. def _create_index(self, index_name: str, dimension: int, data_type: str = "float32", distance_metric: str = "cosine") -> None:
  31. """
  32. Create a new index in the S3 vector bucket for the given collection if it does not exist.
  33. """
  34. if self.has_collection(index_name):
  35. log.debug(f"Index '{index_name}' already exists, skipping creation")
  36. return
  37. try:
  38. self.client.create_index(
  39. vectorBucketName=self.bucket_name,
  40. indexName=index_name,
  41. dataType=data_type,
  42. dimension=dimension,
  43. distanceMetric=distance_metric,
  44. )
  45. log.info(f"Created S3 index: {index_name} (dim={dimension}, type={data_type}, metric={distance_metric})")
  46. except Exception as e:
  47. log.error(f"Error creating S3 index '{index_name}': {e}")
  48. raise
  49. def _filter_metadata(self, metadata: Dict[str, Any], item_id: str) -> Dict[str, Any]:
  50. """
  51. Filter vector metadata keys to comply with S3 Vector API limit of 10 keys maximum.
  52. """
  53. if not isinstance(metadata, dict) or len(metadata) <= 10:
  54. return metadata
  55. # Keep only the first 10 keys, prioritizing important ones based on actual Open WebUI metadata
  56. important_keys = [
  57. 'text', # The actual document content
  58. 'file_id', # File ID
  59. 'source', # Document source file
  60. 'title', # Document title
  61. 'page', # Page number
  62. 'total_pages', # Total pages in document
  63. 'embedding_config', # Embedding configuration
  64. 'created_by', # User who created it
  65. 'name', # Document name
  66. 'hash', # Content hash
  67. ]
  68. filtered_metadata = {}
  69. # First, add important keys if they exist
  70. for key in important_keys:
  71. if key in metadata:
  72. filtered_metadata[key] = metadata[key]
  73. if len(filtered_metadata) >= 10:
  74. break
  75. # If we still have room, add other keys
  76. if len(filtered_metadata) < 10:
  77. for key, value in metadata.items():
  78. if key not in filtered_metadata:
  79. filtered_metadata[key] = value
  80. if len(filtered_metadata) >= 10:
  81. break
  82. log.warning(f"Metadata for key '{item_id}' had {len(metadata)} keys, limited to 10 keys")
  83. return filtered_metadata
  84. def has_collection(self, collection_name: str) -> bool:
  85. """
  86. Check if a vector index (collection) exists in the S3 vector bucket.
  87. """
  88. try:
  89. response = self.client.list_indexes(vectorBucketName=self.bucket_name)
  90. indexes = response.get("indexes", [])
  91. return any(idx.get("indexName") == collection_name for idx in indexes)
  92. except Exception as e:
  93. log.error(f"Error listing indexes: {e}")
  94. return False
  95. def delete_collection(self, collection_name: str) -> None:
  96. """
  97. Delete an entire S3 Vector index/collection.
  98. """
  99. if not self.has_collection(collection_name):
  100. log.warning(f"Collection '{collection_name}' does not exist, nothing to delete")
  101. return
  102. try:
  103. log.info(f"Deleting collection '{collection_name}'")
  104. self.client.delete_index(
  105. vectorBucketName=self.bucket_name,
  106. indexName=collection_name
  107. )
  108. log.info(f"Successfully deleted collection '{collection_name}'")
  109. except Exception as e:
  110. log.error(f"Error deleting collection '{collection_name}': {e}")
  111. raise
  112. def insert(self, collection_name: str, items: List[VectorItem]) -> None:
  113. """
  114. Insert vector items into the S3 Vector index. Create index if it does not exist.
  115. """
  116. if not items:
  117. log.warning("No items to insert")
  118. return
  119. dimension = len(items[0]["vector"])
  120. try:
  121. if not self.has_collection(collection_name):
  122. log.info(f"Index '{collection_name}' does not exist. Creating index.")
  123. self._create_index(
  124. index_name=collection_name,
  125. dimension=dimension,
  126. data_type="float32",
  127. distance_metric="cosine",
  128. )
  129. # Prepare vectors for insertion
  130. vectors = []
  131. for item in items:
  132. # Ensure vector data is in the correct format for S3 Vector API
  133. vector_data = item["vector"]
  134. if isinstance(vector_data, list):
  135. # Convert list to float32 values as required by S3 Vector API
  136. vector_data = [float(x) for x in vector_data]
  137. # Prepare metadata, ensuring the text field is preserved
  138. metadata = item.get("metadata", {}).copy()
  139. # Add the text field to metadata so it's available for retrieval
  140. metadata["text"] = item["text"]
  141. # Filter metadata to comply with S3 Vector API limit of 10 keys
  142. metadata = self._filter_metadata(metadata, item["id"])
  143. vectors.append({
  144. "key": item["id"],
  145. "data": {
  146. "float32": vector_data
  147. },
  148. "metadata": metadata
  149. })
  150. # Insert vectors
  151. self.client.put_vectors(
  152. vectorBucketName=self.bucket_name,
  153. indexName=collection_name,
  154. vectors=vectors
  155. )
  156. log.info(f"Inserted {len(vectors)} vectors into index '{collection_name}'.")
  157. except Exception as e:
  158. log.error(f"Error inserting vectors: {e}")
  159. raise
  160. def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
  161. """
  162. Insert or update vector items in the S3 Vector index. Create index if it does not exist.
  163. """
  164. if not items:
  165. log.warning("No items to upsert")
  166. return
  167. dimension = len(items[0]["vector"])
  168. log.info(f"Upsert dimension: {dimension}")
  169. try:
  170. if not self.has_collection(collection_name):
  171. log.info(f"Index '{collection_name}' does not exist. Creating index for upsert.")
  172. self._create_index(
  173. index_name=collection_name,
  174. dimension=dimension,
  175. data_type="float32",
  176. distance_metric="cosine",
  177. )
  178. # Prepare vectors for upsert
  179. vectors = []
  180. for item in items:
  181. # Ensure vector data is in the correct format for S3 Vector API
  182. vector_data = item["vector"]
  183. if isinstance(vector_data, list):
  184. # Convert list to float32 values as required by S3 Vector API
  185. vector_data = [float(x) for x in vector_data]
  186. # Prepare metadata, ensuring the text field is preserved
  187. metadata = item.get("metadata", {}).copy()
  188. # Add the text field to metadata so it's available for retrieval
  189. metadata["text"] = item["text"]
  190. # Filter metadata to comply with S3 Vector API limit of 10 keys
  191. metadata = self._filter_metadata(metadata, item["id"])
  192. vectors.append({
  193. "key": item["id"],
  194. "data": {
  195. "float32": vector_data
  196. },
  197. "metadata": metadata
  198. })
  199. # Upsert vectors (using put_vectors for upsert semantics)
  200. log.info(f"Upserting {len(vectors)} vectors. First vector sample: key={vectors[0]['key']}, data_type={type(vectors[0]['data']['float32'])}, data_len={len(vectors[0]['data']['float32'])}")
  201. self.client.put_vectors(
  202. vectorBucketName=self.bucket_name,
  203. indexName=collection_name,
  204. vectors=vectors
  205. )
  206. log.info(f"Upserted {len(vectors)} vectors into index '{collection_name}'.")
  207. except Exception as e:
  208. log.error(f"Error upserting vectors: {e}")
  209. raise
  210. def search(self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int) -> Optional[SearchResult]:
  211. """
  212. Search for similar vectors in a collection using multiple query vectors.
  213. """
  214. if not self.has_collection(collection_name):
  215. log.warning(f"Collection '{collection_name}' does not exist")
  216. return None
  217. if not vectors:
  218. log.warning("No query vectors provided")
  219. return None
  220. try:
  221. log.info(f"Searching collection '{collection_name}' with {len(vectors)} query vectors, limit={limit}")
  222. # Initialize result lists
  223. all_ids = []
  224. all_documents = []
  225. all_metadatas = []
  226. all_distances = []
  227. # Process each query vector
  228. for i, query_vector in enumerate(vectors):
  229. log.debug(f"Processing query vector {i+1}/{len(vectors)}")
  230. # Prepare the query vector in S3 Vector format
  231. query_vector_dict = {
  232. 'float32': [float(x) for x in query_vector]
  233. }
  234. # Call S3 Vector query API
  235. response = self.client.query_vectors(
  236. vectorBucketName=self.bucket_name,
  237. indexName=collection_name,
  238. topK=limit,
  239. queryVector=query_vector_dict,
  240. returnMetadata=True,
  241. returnDistance=True
  242. )
  243. # Process results for this query
  244. query_ids = []
  245. query_documents = []
  246. query_metadatas = []
  247. query_distances = []
  248. result_vectors = response.get('vectors', [])
  249. for vector in result_vectors:
  250. vector_id = vector.get('key')
  251. vector_metadata = vector.get('metadata', {})
  252. vector_distance = vector.get('distance', 0.0)
  253. # Extract document text from metadata
  254. document_text = ""
  255. if isinstance(vector_metadata, dict):
  256. # Get the text field first (highest priority)
  257. document_text = vector_metadata.get('text')
  258. if not document_text:
  259. # Fallback to other possible text fields
  260. document_text = (vector_metadata.get('content') or
  261. vector_metadata.get('document') or
  262. vector_id)
  263. else:
  264. document_text = vector_id
  265. query_ids.append(vector_id)
  266. query_documents.append(document_text)
  267. query_metadatas.append(vector_metadata)
  268. query_distances.append(vector_distance)
  269. # Add this query's results to the overall results
  270. all_ids.append(query_ids)
  271. all_documents.append(query_documents)
  272. all_metadatas.append(query_metadatas)
  273. all_distances.append(query_distances)
  274. log.info(f"Search completed. Found results for {len(all_ids)} queries")
  275. # Return SearchResult format
  276. return SearchResult(
  277. ids=all_ids if all_ids else None,
  278. documents=all_documents if all_documents else None,
  279. metadatas=all_metadatas if all_metadatas else None,
  280. distances=all_distances if all_distances else None
  281. )
  282. except Exception as e:
  283. log.error(f"Error searching collection '{collection_name}': {str(e)}")
  284. # Handle specific AWS exceptions
  285. if hasattr(e, 'response') and 'Error' in e.response:
  286. error_code = e.response['Error']['Code']
  287. if error_code == 'NotFoundException':
  288. log.warning(f"Collection '{collection_name}' not found")
  289. return None
  290. elif error_code == 'ValidationException':
  291. log.error(f"Invalid query vector dimensions or parameters")
  292. return None
  293. elif error_code == 'AccessDeniedException':
  294. log.error(f"Access denied for collection '{collection_name}'. Check permissions.")
  295. return None
  296. raise
  297. def query(self, collection_name: str, filter: Dict, limit: Optional[int] = None) -> Optional[GetResult]:
  298. """
  299. Query vectors from a collection using metadata filter.
  300. """
  301. if not self.has_collection(collection_name):
  302. log.warning(f"Collection '{collection_name}' does not exist")
  303. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  304. if not filter:
  305. log.warning("No filter provided, returning all vectors")
  306. return self.get(collection_name)
  307. try:
  308. log.info(f"Querying collection '{collection_name}' with filter: {filter}")
  309. # For S3 Vector, we need to use list_vectors and then filter results
  310. # Since S3 Vector may not support complex server-side filtering,
  311. # we'll retrieve all vectors and filter client-side
  312. # Get all vectors first
  313. all_vectors_result = self.get(collection_name)
  314. if not all_vectors_result or not all_vectors_result.ids:
  315. log.warning("No vectors found in collection")
  316. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  317. # Extract the lists from the result
  318. all_ids = all_vectors_result.ids[0] if all_vectors_result.ids else []
  319. all_documents = all_vectors_result.documents[0] if all_vectors_result.documents else []
  320. all_metadatas = all_vectors_result.metadatas[0] if all_vectors_result.metadatas else []
  321. # Apply client-side filtering
  322. filtered_ids = []
  323. filtered_documents = []
  324. filtered_metadatas = []
  325. for i, metadata in enumerate(all_metadatas):
  326. if self._matches_filter(metadata, filter):
  327. if i < len(all_ids):
  328. filtered_ids.append(all_ids[i])
  329. if i < len(all_documents):
  330. filtered_documents.append(all_documents[i])
  331. filtered_metadatas.append(metadata)
  332. # Apply limit if specified
  333. if limit and len(filtered_ids) >= limit:
  334. break
  335. log.info(f"Filter applied: {len(filtered_ids)} vectors match out of {len(all_ids)} total")
  336. # Return GetResult format
  337. if filtered_ids:
  338. return GetResult(ids=[filtered_ids], documents=[filtered_documents], metadatas=[filtered_metadatas])
  339. else:
  340. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  341. except Exception as e:
  342. log.error(f"Error querying collection '{collection_name}': {str(e)}")
  343. # Handle specific AWS exceptions
  344. if hasattr(e, 'response') and 'Error' in e.response:
  345. error_code = e.response['Error']['Code']
  346. if error_code == 'NotFoundException':
  347. log.warning(f"Collection '{collection_name}' not found")
  348. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  349. elif error_code == 'AccessDeniedException':
  350. log.error(f"Access denied for collection '{collection_name}'. Check permissions.")
  351. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  352. raise
  353. def get(self, collection_name: str) -> Optional[GetResult]:
  354. """
  355. Retrieve all vectors from a collection.
  356. """
  357. if not self.has_collection(collection_name):
  358. log.warning(f"Collection '{collection_name}' does not exist")
  359. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  360. try:
  361. log.info(f"Retrieving all vectors from collection '{collection_name}'")
  362. # Initialize result lists
  363. all_ids = []
  364. all_documents = []
  365. all_metadatas = []
  366. # Handle pagination
  367. next_token = None
  368. while True:
  369. # Prepare request parameters
  370. request_params = {
  371. 'vectorBucketName': self.bucket_name,
  372. 'indexName': collection_name,
  373. 'returnData': False, # Don't include vector data (not needed for get)
  374. 'returnMetadata': True, # Include metadata
  375. 'maxResults': 500 # Use reasonable page size
  376. }
  377. if next_token:
  378. request_params['nextToken'] = next_token
  379. # Call S3 Vector API
  380. response = self.client.list_vectors(**request_params)
  381. # Process vectors in this page
  382. vectors = response.get('vectors', [])
  383. for vector in vectors:
  384. vector_id = vector.get('key')
  385. vector_data = vector.get('data', {})
  386. vector_metadata = vector.get('metadata', {})
  387. # Extract the actual vector array
  388. vector_array = vector_data.get('float32', [])
  389. # For documents, we try to extract text from metadata or use the vector ID
  390. document_text = ""
  391. if isinstance(vector_metadata, dict):
  392. # Get the text field first (highest priority)
  393. document_text = vector_metadata.get('text')
  394. if not document_text:
  395. # Fallback to other possible text fields
  396. document_text = (vector_metadata.get('content') or
  397. vector_metadata.get('document') or
  398. vector_id)
  399. # Log the actual content for debugging
  400. log.debug(f"Document text preview (first 200 chars): {str(document_text)[:200]}")
  401. else:
  402. document_text = vector_id
  403. all_ids.append(vector_id)
  404. all_documents.append(document_text)
  405. all_metadatas.append(vector_metadata)
  406. # Check if there are more pages
  407. next_token = response.get('nextToken')
  408. if not next_token:
  409. break
  410. log.info(f"Retrieved {len(all_ids)} vectors from collection '{collection_name}'")
  411. # Return in GetResult format
  412. # The Open WebUI GetResult expects lists of lists, so we wrap each list
  413. if all_ids:
  414. return GetResult(ids=[all_ids], documents=[all_documents], metadatas=[all_metadatas])
  415. else:
  416. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  417. except Exception as e:
  418. log.error(f"Error retrieving vectors from collection '{collection_name}': {str(e)}")
  419. # Handle specific AWS exceptions
  420. if hasattr(e, 'response') and 'Error' in e.response:
  421. error_code = e.response['Error']['Code']
  422. if error_code == 'NotFoundException':
  423. log.warning(f"Collection '{collection_name}' not found")
  424. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  425. elif error_code == 'AccessDeniedException':
  426. log.error(f"Access denied for collection '{collection_name}'. Check permissions.")
  427. return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
  428. raise
  429. def delete(self, collection_name: str, ids: Optional[List[str]] = None, filter: Optional[Dict] = None) -> None:
  430. """
  431. Delete vectors by ID or filter from a collection.
  432. """
  433. if not self.has_collection(collection_name):
  434. log.warning(f"Collection '{collection_name}' does not exist, nothing to delete")
  435. return
  436. # Check if this is a knowledge collection (not file-specific)
  437. is_knowledge_collection = not collection_name.startswith("file-")
  438. try:
  439. if ids:
  440. # Delete by specific vector IDs/keys
  441. log.info(f"Deleting {len(ids)} vectors by IDs from collection '{collection_name}'")
  442. self.client.delete_vectors(
  443. vectorBucketName=self.bucket_name,
  444. indexName=collection_name,
  445. keys=ids
  446. )
  447. log.info(f"Deleted {len(ids)} vectors from index '{collection_name}'")
  448. elif filter:
  449. # Handle filter-based deletion
  450. log.info(f"Deleting vectors by filter from collection '{collection_name}': {filter}")
  451. # If this is a knowledge collection and we have a file_id filter,
  452. # also clean up the corresponding file-specific collection
  453. if is_knowledge_collection and "file_id" in filter:
  454. file_id = filter["file_id"]
  455. file_collection_name = f"file-{file_id}"
  456. if self.has_collection(file_collection_name):
  457. log.info(f"Found related file-specific collection '{file_collection_name}', deleting it to prevent duplicates")
  458. self.delete_collection(file_collection_name)
  459. # For the main collection, implement query-then-delete
  460. # First, query to get IDs matching the filter
  461. query_result = self.query(collection_name, filter)
  462. if query_result and query_result.ids and query_result.ids[0]:
  463. matching_ids = query_result.ids[0]
  464. log.info(f"Found {len(matching_ids)} vectors matching filter, deleting them")
  465. # Delete the matching vectors by ID
  466. self.client.delete_vectors(
  467. vectorBucketName=self.bucket_name,
  468. indexName=collection_name,
  469. keys=matching_ids
  470. )
  471. log.info(f"Deleted {len(matching_ids)} vectors from index '{collection_name}' using filter")
  472. else:
  473. log.warning("No vectors found matching the filter criteria")
  474. else:
  475. log.warning("No IDs or filter provided for deletion")
  476. except Exception as e:
  477. log.error(f"Error deleting vectors from collection '{collection_name}': {e}")
  478. raise
  479. def reset(self) -> None:
  480. """
  481. Reset/clear all vector data. For S3 Vector, this deletes all indexes.
  482. """
  483. try:
  484. log.warning("Reset called - this will delete all vector indexes in the S3 bucket")
  485. # List all indexes
  486. response = self.client.list_indexes(vectorBucketName=self.bucket_name)
  487. indexes = response.get("indexes", [])
  488. if not indexes:
  489. log.warning("No indexes found to delete")
  490. return
  491. # Delete all indexes
  492. deleted_count = 0
  493. for index in indexes:
  494. index_name = index.get("indexName")
  495. if index_name:
  496. try:
  497. self.client.delete_index(
  498. vectorBucketName=self.bucket_name,
  499. indexName=index_name
  500. )
  501. deleted_count += 1
  502. log.info(f"Deleted index: {index_name}")
  503. except Exception as e:
  504. log.error(f"Error deleting index '{index_name}': {e}")
  505. log.info(f"Reset completed: deleted {deleted_count} indexes")
  506. except Exception as e:
  507. log.error(f"Error during reset: {e}")
  508. raise
  509. def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool:
  510. """
  511. Check if metadata matches the given filter conditions.
  512. """
  513. if not isinstance(metadata, dict) or not isinstance(filter, dict):
  514. return False
  515. # Check each filter condition
  516. for key, expected_value in filter.items():
  517. # Handle special operators
  518. if key.startswith('$'):
  519. if key == '$and':
  520. # All conditions must match
  521. if not isinstance(expected_value, list):
  522. continue
  523. for condition in expected_value:
  524. if not self._matches_filter(metadata, condition):
  525. return False
  526. elif key == '$or':
  527. # At least one condition must match
  528. if not isinstance(expected_value, list):
  529. continue
  530. any_match = False
  531. for condition in expected_value:
  532. if self._matches_filter(metadata, condition):
  533. any_match = True
  534. break
  535. if not any_match:
  536. return False
  537. continue
  538. # Get the actual value from metadata
  539. actual_value = metadata.get(key)
  540. # Handle different types of expected values
  541. if isinstance(expected_value, dict):
  542. # Handle comparison operators
  543. for op, op_value in expected_value.items():
  544. if op == '$eq':
  545. if actual_value != op_value:
  546. return False
  547. elif op == '$ne':
  548. if actual_value == op_value:
  549. return False
  550. elif op == '$in':
  551. if not isinstance(op_value, list) or actual_value not in op_value:
  552. return False
  553. elif op == '$nin':
  554. if isinstance(op_value, list) and actual_value in op_value:
  555. return False
  556. elif op == '$exists':
  557. if bool(op_value) != (key in metadata):
  558. return False
  559. # Add more operators as needed
  560. else:
  561. # Simple equality check
  562. if actual_value != expected_value:
  563. return False
  564. return True