Browse Source

Update pinecone.py

May 2025 Latest Pinecone Best Practices
PVBLIC Foundation 4 months ago
parent
commit
4ecf2a8685
1 changed files with 52 additions and 2 deletions
  1. 52 2
      backend/open_webui/retrieval/vector/dbs/pinecone.py

+ 52 - 2
backend/open_webui/retrieval/vector/dbs/pinecone.py

@@ -3,10 +3,18 @@ import logging
 import time  # for measuring elapsed time
 from pinecone import Pinecone, ServerlessSpec
 
+# Add gRPC support for better performance (Pinecone best practice)
+try:
+    from pinecone.grpc import PineconeGRPC
+    GRPC_AVAILABLE = True
+except ImportError:
+    GRPC_AVAILABLE = False
+
 import asyncio  # for async upserts
 import functools  # for partial binding in async tasks
 
 import concurrent.futures  # for parallel batch upserts
+import random  # for jitter in retry backoff
 
 from open_webui.retrieval.vector.main import (
     VectorDBBase,
@@ -47,7 +55,24 @@ class PineconeClient(VectorDBBase):
         self.cloud = PINECONE_CLOUD
 
         # Initialize Pinecone client for improved performance
-        self.client = Pinecone(api_key=self.api_key)
+        if GRPC_AVAILABLE:
+            # Use gRPC client for better performance (Pinecone recommendation)
+            self.client = PineconeGRPC(
+                api_key=self.api_key,
+                pool_threads=20,  # Improved connection pool size
+                timeout=30        # Reasonable timeout for operations
+            )
+            self.using_grpc = True
+            log.info("Using Pinecone gRPC client for optimal performance")
+        else:
+            # Fallback to HTTP client with enhanced connection pooling
+            self.client = Pinecone(
+                api_key=self.api_key,
+                pool_threads=20,  # Improved connection pool size
+                timeout=30        # Reasonable timeout for operations
+            )
+            self.using_grpc = False
+            log.info("Using Pinecone HTTP client (gRPC not available)")
 
         # Persistent executor for batch operations
         self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
@@ -91,12 +116,37 @@ class PineconeClient(VectorDBBase):
                 log.info(f"Using existing Pinecone index '{self.index_name}'")
 
             # Connect to the index
-            self.index = self.client.Index(self.index_name)
+            self.index = self.client.Index(
+                self.index_name,
+                pool_threads=20,  # Enhanced connection pool for index operations
+            )
 
         except Exception as e:
             log.error(f"Failed to initialize Pinecone index: {e}")
             raise RuntimeError(f"Failed to initialize Pinecone index: {e}")
 
+    def _retry_pinecone_operation(self, operation_func, max_retries=3):
+        """Retry Pinecone operations with exponential backoff for rate limits and network issues."""
+        for attempt in range(max_retries):
+            try:
+                return operation_func()
+            except Exception as e:
+                error_str = str(e).lower()
+                # Check if it's a retryable error (rate limits, network issues, timeouts)
+                is_retryable = any(keyword in error_str for keyword in [
+                    'rate limit', 'quota', 'timeout', 'network', 'connection', 
+                    'unavailable', 'internal error', '429', '500', '502', '503', '504'
+                ])
+                
+                if not is_retryable or attempt == max_retries - 1:
+                    # Don't retry for non-retryable errors or on final attempt
+                    raise
+                
+                # Exponential backoff with jitter
+                delay = (2 ** attempt) + random.uniform(0, 1)
+                log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}")
+                time.sleep(delay)
+
     def _create_points(
         self, items: List[VectorItem], collection_name_with_prefix: str
     ) -> List[Dict[str, Any]]: