Procházet zdrojové kódy

fix: batch S3 vectors in groups of 500 to comply with API limitations

0xThresh.eth před 1 měsícem
rodič
revize
7fcc545672
1 změnil soubory, kde provedl 32 přidání a 17 odebrání
  1. 32 17
      backend/open_webui/retrieval/vector/dbs/s3vector.py

+ 32 - 17
backend/open_webui/retrieval/vector/dbs/s3vector.py

@@ -193,13 +193,19 @@ class S3VectorClient(VectorDBBase):
                         "metadata": metadata,
                     }
                 )
-            # Insert vectors
-            self.client.put_vectors(
-                vectorBucketName=self.bucket_name,
-                indexName=collection_name,
-                vectors=vectors,
-            )
-            log.info(f"Inserted {len(vectors)} vectors into index '{collection_name}'.")
+            
+            # Insert vectors in batches of 500 (S3 Vector API limit)
+            batch_size = 500
+            for i in range(0, len(vectors), batch_size):
+                batch = vectors[i:i + batch_size]
+                self.client.put_vectors(
+                    vectorBucketName=self.bucket_name,
+                    indexName=collection_name,
+                    vectors=batch,
+                )
+                log.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} vectors into index '{collection_name}'.")
+            
+            log.info(f"Completed insertion of {len(vectors)} vectors into index '{collection_name}'.")
         except Exception as e:
             log.error(f"Error inserting vectors: {e}")
             raise
@@ -251,16 +257,25 @@ class S3VectorClient(VectorDBBase):
                         "metadata": metadata,
                     }
                 )
-            # Upsert vectors (using put_vectors for upsert semantics)
-            log.info(
-                f"Upserting {len(vectors)} vectors. First vector sample: key={vectors[0]['key']}, data_type={type(vectors[0]['data']['float32'])}, data_len={len(vectors[0]['data']['float32'])}"
-            )
-            self.client.put_vectors(
-                vectorBucketName=self.bucket_name,
-                indexName=collection_name,
-                vectors=vectors,
-            )
-            log.info(f"Upserted {len(vectors)} vectors into index '{collection_name}'.")
+            
+            # Upsert vectors in batches of 500 (S3 Vector API limit)
+            batch_size = 500
+            for i in range(0, len(vectors), batch_size):
+                batch = vectors[i:i + batch_size]
+                if i == 0:  # Log sample info for first batch only
+                    log.info(
+                        f"Upserting batch 1: {len(batch)} vectors. First vector sample: key={batch[0]['key']}, data_type={type(batch[0]['data']['float32'])}, data_len={len(batch[0]['data']['float32'])}"
+                    )
+                else:
+                    log.info(f"Upserting batch {i//batch_size + 1}: {len(batch)} vectors.")
+                
+                self.client.put_vectors(
+                    vectorBucketName=self.bucket_name,
+                    indexName=collection_name,
+                    vectors=batch,
+                )
+            
+            log.info(f"Completed upsert of {len(vectors)} vectors into index '{collection_name}'.")
         except Exception as e:
             log.error(f"Error upserting vectors: {e}")
             raise