|
@@ -11,19 +11,36 @@ log.setLevel(SRC_LOG_LEVELS["RAG"])
|
|
|
class S3VectorClient(VectorDBBase):
|
|
|
"""
|
|
|
AWS S3 Vector integration for Open WebUI Knowledge.
|
|
|
- Assumes AWS credentials are available via environment variables or IAM roles.
|
|
|
"""
|
|
|
+
|
|
|
def __init__(self):
|
|
|
self.bucket_name = S3_VECTOR_BUCKET_NAME
|
|
|
self.region = S3_VECTOR_REGION
|
|
|
- self.client = boto3.client("s3vectors", region_name=self.region)
|
|
|
+
|
|
|
+ # Simple validation - log warnings instead of raising exceptions
|
|
|
+ if not self.bucket_name:
|
|
|
+ log.warning("S3_VECTOR_BUCKET_NAME not set - S3Vector will not work")
|
|
|
+ if not self.region:
|
|
|
+ log.warning("S3_VECTOR_REGION not set - S3Vector will not work")
|
|
|
+
|
|
|
+ if self.bucket_name and self.region:
|
|
|
+ try:
|
|
|
+ self.client = boto3.client("s3vectors", region_name=self.region)
|
|
|
+ log.info(f"S3Vector client initialized for bucket '{self.bucket_name}' in region '{self.region}'")
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Failed to initialize S3Vector client: {e}")
|
|
|
+ self.client = None
|
|
|
+ else:
|
|
|
+ self.client = None
|
|
|
|
|
|
- def _create_index(self, index_name: str, dimension: int, data_type: str = "float32", distance_metric: str = "cosine"):
|
|
|
+ def _create_index(self, index_name: str, dimension: int, data_type: str = "float32", distance_metric: str = "cosine") -> None:
|
|
|
"""
|
|
|
Create a new index in the S3 vector bucket for the given collection if it does not exist.
|
|
|
"""
|
|
|
if self.has_collection(index_name):
|
|
|
+ log.debug(f"Index '{index_name}' already exists, skipping creation")
|
|
|
return
|
|
|
+
|
|
|
try:
|
|
|
self.client.create_index(
|
|
|
vectorBucketName=self.bucket_name,
|
|
@@ -35,12 +52,11 @@ class S3VectorClient(VectorDBBase):
|
|
|
log.info(f"Created S3 index: {index_name} (dim={dimension}, type={data_type}, metric={distance_metric})")
|
|
|
except Exception as e:
|
|
|
log.error(f"Error creating S3 index '{index_name}': {e}")
|
|
|
+ raise
|
|
|
|
|
|
def _filter_metadata(self, metadata: Dict[str, Any], item_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Filter vector metadata keys to comply with S3 Vector API limit of 10 keys maximum.
|
|
|
- If AWS S3 Vector feature starts supporting more than 10 keys, this should be adjusted, and preferably removed.
|
|
|
- Limitation is documented here: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-vectors-limitations.html
|
|
|
"""
|
|
|
if not isinstance(metadata, dict) or len(metadata) <= 10:
|
|
|
return metadata
|
|
@@ -82,6 +98,7 @@ class S3VectorClient(VectorDBBase):
|
|
|
"""
|
|
|
Check if a vector index (collection) exists in the S3 vector bucket.
|
|
|
"""
|
|
|
+
|
|
|
try:
|
|
|
response = self.client.list_indexes(vectorBucketName=self.bucket_name)
|
|
|
indexes = response.get("indexes", [])
|
|
@@ -89,10 +106,12 @@ class S3VectorClient(VectorDBBase):
|
|
|
except Exception as e:
|
|
|
log.error(f"Error listing indexes: {e}")
|
|
|
return False
|
|
|
+
|
|
|
def delete_collection(self, collection_name: str) -> None:
|
|
|
"""
|
|
|
Delete an entire S3 Vector index/collection.
|
|
|
"""
|
|
|
+
|
|
|
if not self.has_collection(collection_name):
|
|
|
log.warning(f"Collection '{collection_name}' does not exist, nothing to delete")
|
|
|
return
|
|
@@ -108,11 +127,9 @@ class S3VectorClient(VectorDBBase):
|
|
|
log.error(f"Error deleting collection '{collection_name}': {e}")
|
|
|
raise
|
|
|
|
|
|
- def insert(self, collection_name: str, items: List[Dict[str, Any]]) -> None:
|
|
|
+ def insert(self, collection_name: str, items: List[VectorItem]) -> None:
|
|
|
"""
|
|
|
Insert vector items into the S3 Vector index. Create index if it does not exist.
|
|
|
-
|
|
|
- Supports both knowledge collection indexes and file-specific indexes (file-{file_id}).
|
|
|
"""
|
|
|
if not items:
|
|
|
log.warning("No items to insert")
|
|
@@ -143,10 +160,7 @@ class S3VectorClient(VectorDBBase):
|
|
|
metadata = item.get("metadata", {}).copy()
|
|
|
|
|
|
# Add the text field to metadata so it's available for retrieval
|
|
|
- if "text" in item:
|
|
|
- metadata["text"] = item["text"]
|
|
|
- else:
|
|
|
- log.warning(f"No 'text' field found in item with ID: {item.get('id')}")
|
|
|
+ metadata["text"] = item["text"]
|
|
|
|
|
|
# Filter metadata to comply with S3 Vector API limit of 10 keys
|
|
|
metadata = self._filter_metadata(metadata, item["id"])
|
|
@@ -169,7 +183,7 @@ class S3VectorClient(VectorDBBase):
|
|
|
log.error(f"Error inserting vectors: {e}")
|
|
|
raise
|
|
|
|
|
|
- def upsert(self, collection_name: str, items: List[Dict[str, Any]]) -> None:
|
|
|
+ def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
|
|
|
"""
|
|
|
Insert or update vector items in the S3 Vector index. Create index if it does not exist.
|
|
|
"""
|
|
@@ -202,8 +216,7 @@ class S3VectorClient(VectorDBBase):
|
|
|
# Prepare metadata, ensuring the text field is preserved
|
|
|
metadata = item.get("metadata", {}).copy()
|
|
|
# Add the text field to metadata so it's available for retrieval
|
|
|
- if "text" in item:
|
|
|
- metadata["text"] = item["text"]
|
|
|
+ metadata["text"] = item["text"]
|
|
|
|
|
|
# Filter metadata to comply with S3 Vector API limit of 10 keys
|
|
|
metadata = self._filter_metadata(metadata, item["id"])
|
|
@@ -230,17 +243,8 @@ class S3VectorClient(VectorDBBase):
|
|
|
def search(self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int) -> Optional[SearchResult]:
|
|
|
"""
|
|
|
Search for similar vectors in a collection using multiple query vectors.
|
|
|
-
|
|
|
- Uses S3 Vector's query_vectors API to perform similarity search.
|
|
|
-
|
|
|
- Args:
|
|
|
- collection_name: Name of the collection to search in
|
|
|
- vectors: List of query vectors to search with
|
|
|
- limit: Maximum number of results to return per query
|
|
|
-
|
|
|
- Returns:
|
|
|
- SearchResult containing IDs, documents, metadatas, and distances
|
|
|
"""
|
|
|
+
|
|
|
if not self.has_collection(collection_name):
|
|
|
log.warning(f"Collection '{collection_name}' does not exist")
|
|
|
return None
|
|
@@ -343,18 +347,8 @@ class S3VectorClient(VectorDBBase):
|
|
|
def query(self, collection_name: str, filter: Dict, limit: Optional[int] = None) -> Optional[GetResult]:
|
|
|
"""
|
|
|
Query vectors from a collection using metadata filter.
|
|
|
-
|
|
|
- For S3 Vector, this uses the list_vectors API with metadata filters.
|
|
|
- Note: S3 Vector supports metadata filtering, but the exact filter syntax may vary.
|
|
|
-
|
|
|
- Args:
|
|
|
- collection_name: Name of the collection to query
|
|
|
- filter: Dictionary containing metadata filter conditions
|
|
|
- limit: Maximum number of results to return (optional)
|
|
|
-
|
|
|
- Returns:
|
|
|
- GetResult containing IDs, documents, and metadatas
|
|
|
"""
|
|
|
+
|
|
|
if not self.has_collection(collection_name):
|
|
|
log.warning(f"Collection '{collection_name}' does not exist")
|
|
|
return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
|
|
@@ -423,10 +417,8 @@ class S3VectorClient(VectorDBBase):
|
|
|
def get(self, collection_name: str) -> Optional[GetResult]:
|
|
|
"""
|
|
|
Retrieve all vectors from a collection.
|
|
|
-
|
|
|
- Uses S3 Vector's list_vectors API to get all vectors with their data and metadata.
|
|
|
- Handles pagination automatically to retrieve all vectors.
|
|
|
"""
|
|
|
+
|
|
|
if not self.has_collection(collection_name):
|
|
|
log.warning(f"Collection '{collection_name}' does not exist")
|
|
|
return GetResult(ids=[[]], documents=[[]], metadatas=[[]])
|
|
@@ -519,10 +511,8 @@ class S3VectorClient(VectorDBBase):
|
|
|
def delete(self, collection_name: str, ids: Optional[List[str]] = None, filter: Optional[Dict] = None) -> None:
|
|
|
"""
|
|
|
Delete vectors by ID or filter from a collection.
|
|
|
-
|
|
|
- For S3 Vector, we support deletion by IDs. Filter-based deletion requires querying first.
|
|
|
- For knowledge collections, also handles cleanup of related file-specific collections.
|
|
|
"""
|
|
|
+
|
|
|
if not self.has_collection(collection_name):
|
|
|
log.warning(f"Collection '{collection_name}' does not exist, nothing to delete")
|
|
|
return
|
|
@@ -578,9 +568,9 @@ class S3VectorClient(VectorDBBase):
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
"""
|
|
|
- Reset/clear all vector data. For S3 Vector, this would mean deleting all indexes.
|
|
|
- Use with caution as this is destructive.
|
|
|
+ Reset/clear all vector data. For S3 Vector, this deletes all indexes.
|
|
|
"""
|
|
|
+
|
|
|
try:
|
|
|
log.warning("Reset called - this will delete all vector indexes in the S3 bucket")
|
|
|
|
|
@@ -616,14 +606,6 @@ class S3VectorClient(VectorDBBase):
|
|
|
def _matches_filter(self, metadata: Dict[str, Any], filter: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
Check if metadata matches the given filter conditions.
|
|
|
- Supports basic equality matching and simple logical operations.
|
|
|
-
|
|
|
- Args:
|
|
|
- metadata: The metadata to check
|
|
|
- filter: The filter conditions to match against
|
|
|
-
|
|
|
- Returns:
|
|
|
- True if metadata matches all filter conditions, False otherwise
|
|
|
"""
|
|
|
if not isinstance(metadata, dict) or not isinstance(filter, dict):
|
|
|
return False
|