Explorar el Código

Merge pull request #14532 from PVBLIC-F/refac/pinecone

perf pinecone.py Improve Performance and Maintainability Using Current Best Practices
Tim Jaeryang Baek hace 4 meses
padre
commit
0ebe35c571
Se han modificado 1 ficheros con 81 adiciones y 8 borrados
  1. 81 8
      backend/open_webui/retrieval/vector/dbs/pinecone.py

+ 81 - 8
backend/open_webui/retrieval/vector/dbs/pinecone.py

@@ -3,10 +3,19 @@ import logging
 import time  # for measuring elapsed time
 from pinecone import Pinecone, ServerlessSpec
 
+# Add gRPC support for better performance (Pinecone best practice)
+try:
+    from pinecone.grpc import PineconeGRPC
+
+    GRPC_AVAILABLE = True
+except ImportError:
+    GRPC_AVAILABLE = False
+
 import asyncio  # for async upserts
 import functools  # for partial binding in async tasks
 
 import concurrent.futures  # for parallel batch upserts
+import random  # for jitter in retry backoff
 
 from open_webui.retrieval.vector.main import (
     VectorDBBase,
@@ -47,7 +56,24 @@ class PineconeClient(VectorDBBase):
         self.cloud = PINECONE_CLOUD
 
         # Initialize Pinecone client for improved performance
-        self.client = Pinecone(api_key=self.api_key)
+        if GRPC_AVAILABLE:
+            # Use gRPC client for better performance (Pinecone recommendation)
+            self.client = PineconeGRPC(
+                api_key=self.api_key,
+                pool_threads=20,  # Improved connection pool size
+                timeout=30,  # Reasonable timeout for operations
+            )
+            self.using_grpc = True
+            log.info("Using Pinecone gRPC client for optimal performance")
+        else:
+            # Fallback to HTTP client with enhanced connection pooling
+            self.client = Pinecone(
+                api_key=self.api_key,
+                pool_threads=20,  # Improved connection pool size
+                timeout=30,  # Reasonable timeout for operations
+            )
+            self.using_grpc = False
+            log.info("Using Pinecone HTTP client (gRPC not available)")
 
         # Persistent executor for batch operations
         self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@@ -91,12 +117,53 @@ class PineconeClient(VectorDBBase):
                 log.info(f"Using existing Pinecone index '{self.index_name}'")
 
             # Connect to the index
-            self.index = self.client.Index(self.index_name)
+            self.index = self.client.Index(
+                self.index_name,
+                pool_threads=20,  # Enhanced connection pool for index operations
+            )
 
         except Exception as e:
             log.error(f"Failed to initialize Pinecone index: {e}")
             raise RuntimeError(f"Failed to initialize Pinecone index: {e}")
 
+    def _retry_pinecone_operation(self, operation_func, max_retries=3):
+        """Retry Pinecone operations with exponential backoff for rate limits and network issues."""
+        for attempt in range(max_retries):
+            try:
+                return operation_func()
+            except Exception as e:
+                error_str = str(e).lower()
+                # Check if it's a retryable error (rate limits, network issues, timeouts)
+                is_retryable = any(
+                    keyword in error_str
+                    for keyword in [
+                        "rate limit",
+                        "quota",
+                        "timeout",
+                        "network",
+                        "connection",
+                        "unavailable",
+                        "internal error",
+                        "429",
+                        "500",
+                        "502",
+                        "503",
+                        "504",
+                    ]
+                )
+
+                if not is_retryable or attempt == max_retries - 1:
+                    # Don't retry for non-retryable errors or on final attempt
+                    raise
+
+                # Exponential backoff with jitter
+                delay = (2**attempt) + random.uniform(0, 1)
+                log.warning(
+                    f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), "
+                    f"retrying in {delay:.2f}s: {e}"
+                )
+                time.sleep(delay)
+
     def _create_points(
         self, items: List[VectorItem], collection_name_with_prefix: str
     ) -> List[Dict[str, Any]]:
@@ -223,7 +290,8 @@ class PineconeClient(VectorDBBase):
         elapsed = time.time() - start_time
         log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds")
         log.info(
-            f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'"
+            f"Successfully inserted {len(points)} vectors in parallel batches "
+            f"into '{collection_name_with_prefix}'"
         )
 
     def upsert(self, collection_name: str, items: List[VectorItem]) -> None:
@@ -254,7 +322,8 @@ class PineconeClient(VectorDBBase):
         elapsed = time.time() - start_time
         log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds")
         log.info(
-            f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'"
+            f"Successfully upserted {len(points)} vectors in parallel batches "
+            f"into '{collection_name_with_prefix}'"
         )
 
     async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None:
@@ -285,7 +354,8 @@ class PineconeClient(VectorDBBase):
                 log.error(f"Error in async insert batch: {result}")
                 raise result
         log.info(
-            f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'"
+            f"Successfully async inserted {len(points)} vectors in batches "
+            f"into '{collection_name_with_prefix}'"
         )
 
     async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None:
@@ -316,7 +386,8 @@ class PineconeClient(VectorDBBase):
                 log.error(f"Error in async upsert batch: {result}")
                 raise result
         log.info(
-            f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'"
+            f"Successfully async upserted {len(points)} vectors in batches "
+            f"into '{collection_name_with_prefix}'"
         )
 
     def search(
@@ -457,10 +528,12 @@ class PineconeClient(VectorDBBase):
                     # This is a limitation of Pinecone - be careful with ID uniqueness
                     self.index.delete(ids=batch_ids)
                     log.debug(
-                        f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'"
+                        f"Deleted batch of {len(batch_ids)} vectors by ID "
+                        f"from '{collection_name_with_prefix}'"
                     )
                 log.info(
-                    f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'"
+                    f"Successfully deleted {len(ids)} vectors by ID "
+                    f"from '{collection_name_with_prefix}'"
                 )
 
             elif filter: