Bladeren bron

feat: Implement Qdrant multi-tenancy support with collection management and tenant isolation

LoiTra 4 maanden geleden
bovenliggende
commit
184d8dfd7e

+ 1 - 0
backend/open_webui/config.py

@@ -1751,6 +1751,7 @@ QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", None)
 QDRANT_ON_DISK = os.environ.get("QDRANT_ON_DISK", "false").lower() == "true"
 QDRANT_PREFER_GRPC = os.environ.get("QDRANT_PREFER_GRPC", "False").lower() == "true"
 QDRANT_GRPC_PORT = int(os.environ.get("QDRANT_GRPC_PORT", "6334"))
+QDRANT_MULTI_TENANCY = os.environ.get("QDRANT_MULTI_TENANCY", "false").lower() == "true"
 
 # OpenSearch
 OPENSEARCH_URI = os.environ.get("OPENSEARCH_URI", "https://localhost:9200")

+ 712 - 0
backend/open_webui/retrieval/vector/dbs/qdrant_multitenancy.py

@@ -0,0 +1,712 @@
+import logging
+from typing import Optional, Tuple
+from urllib.parse import urlparse
+
+import grpc
+from open_webui.config import (
+    QDRANT_API_KEY,
+    QDRANT_GRPC_PORT,
+    QDRANT_ON_DISK,
+    QDRANT_PREFER_GRPC,
+    QDRANT_URI,
+)
+from open_webui.env import SRC_LOG_LEVELS
+from open_webui.retrieval.vector.main import (
+    GetResult,
+    SearchResult,
+    VectorDBBase,
+    VectorItem,
+)
+from qdrant_client import QdrantClient as Qclient
+from qdrant_client.http.exceptions import UnexpectedResponse
+from qdrant_client.http.models import PointStruct
+from qdrant_client.models import models
+
+NO_LIMIT = 999999999
+
+log = logging.getLogger(__name__)
+log.setLevel(SRC_LOG_LEVELS["RAG"])
+
+
+class QdrantClient(VectorDBBase):
+    def __init__(self):
+        self.collection_prefix = "open-webui"
+        self.QDRANT_URI = QDRANT_URI
+        self.QDRANT_API_KEY = QDRANT_API_KEY
+        self.QDRANT_ON_DISK = QDRANT_ON_DISK
+        self.PREFER_GRPC = QDRANT_PREFER_GRPC
+        self.GRPC_PORT = QDRANT_GRPC_PORT
+
+        if not self.QDRANT_URI:
+            self.client = None
+            return
+
+        # Unified handling for either scheme
+        parsed = urlparse(self.QDRANT_URI)
+        host = parsed.hostname or self.QDRANT_URI
+        http_port = parsed.port or 6333  # default REST port
+
+        if self.PREFER_GRPC:
+            self.client = Qclient(
+                host=host,
+                port=http_port,
+                grpc_port=self.GRPC_PORT,
+                prefer_grpc=self.PREFER_GRPC,
+                api_key=self.QDRANT_API_KEY,
+            )
+        else:
+            self.client = Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY)
+
+        # Main collection types for multi-tenancy
+        self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories"
+        self.KNOWLEDGE_COLLECTION = f"{self.collection_prefix}_knowledge"
+        self.FILE_COLLECTION = f"{self.collection_prefix}_files"
+        self.WEB_SEARCH_COLLECTION = f"{self.collection_prefix}_web-search"
+        self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based"
+
+    def _result_to_get_result(self, points) -> GetResult:
+        ids = []
+        documents = []
+        metadatas = []
+
+        for point in points:
+            payload = point.payload
+            ids.append(point.id)
+            documents.append(payload["text"])
+            metadatas.append(payload["metadata"])
+
+        return GetResult(
+            **{
+                "ids": [ids],
+                "documents": [documents],
+                "metadatas": [metadatas],
+            }
+        )
+
+    def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]:
+        """
+        Maps the traditional collection name to multi-tenant collection and tenant ID.
+
+        Returns:
+            tuple: (collection_name, tenant_id)
+        """
+        # Check for user memory collections
+        tenant_id = collection_name
+
+        if collection_name.startswith("user-memory-"):
+            return self.MEMORY_COLLECTION, tenant_id
+
+        # Check for file collections
+        elif collection_name.startswith("file-"):
+            return self.FILE_COLLECTION, tenant_id
+
+        # Check for web search collections
+        elif collection_name.startswith("web-search-"):
+            return self.WEB_SEARCH_COLLECTION, tenant_id
+
+        # Handle hash-based collections (YouTube and web URLs)
+        elif len(collection_name) == 63 and all(
+            c in "0123456789abcdef" for c in collection_name
+        ):
+            return self.HASH_BASED_COLLECTION, tenant_id
+
+        else:
+            return self.KNOWLEDGE_COLLECTION, tenant_id
+
+    def _extract_error_message(self, exception):
+        """
+        Extract error message from either HTTP or gRPC exceptions
+
+        Returns:
+            tuple: (status_code, error_message)
+        """
+        # Check if it's an HTTP exception
+        if isinstance(exception, UnexpectedResponse):
+            try:
+                error_data = exception.structured()
+                error_msg = error_data.get("status", {}).get("error", "")
+                return exception.status_code, error_msg
+            except Exception as inner_e:
+                log.error(f"Failed to parse HTTP error: {inner_e}")
+                return exception.status_code, str(exception)
+
+        # Check if it's a gRPC exception
+        elif isinstance(exception, grpc.RpcError):
+            # Extract status code from gRPC error
+            status_code = None
+            if hasattr(exception, "code") and callable(exception.code):
+                status_code = exception.code().value[0]
+
+            # Extract error message
+            error_msg = str(exception)
+            if "details =" in error_msg:
+                # Parse the details line which contains the actual error message
+                try:
+                    details_line = [
+                        line.strip()
+                        for line in error_msg.split("\n")
+                        if "details =" in line
+                    ][0]
+                    error_msg = details_line.split("details =")[1].strip(' "')
+                except (IndexError, AttributeError):
+                    # Fall back to full message if parsing fails
+                    pass
+
+            return status_code, error_msg
+
+        # For any other type of exception
+        return None, str(exception)
+
+    def _is_collection_not_found_error(self, exception):
+        """
+        Check if the exception is due to collection not found, supporting both HTTP and gRPC
+        """
+        status_code, error_msg = self._extract_error_message(exception)
+
+        # HTTP error (404)
+        if (
+            status_code == 404
+            and "Collection" in error_msg
+            and "doesn't exist" in error_msg
+        ):
+            return True
+
+        # gRPC error (NOT_FOUND status)
+        if (
+            isinstance(exception, grpc.RpcError)
+            and exception.code() == grpc.StatusCode.NOT_FOUND
+        ):
+            return True
+
+        return False
+
+    def _is_dimension_mismatch_error(self, exception):
+        """
+        Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC
+        """
+        status_code, error_msg = self._extract_error_message(exception)
+
+        # Common patterns in both HTTP and gRPC
+        return (
+            "Vector dimension error" in error_msg
+            or "dimensions mismatch" in error_msg
+            or "invalid vector size" in error_msg
+        )
+
+    def _create_multi_tenant_collection_if_not_exists(
+        self, mt_collection_name: str, dimension: int = 384
+    ):
+        """
+        Creates a collection with multi-tenancy configuration if it doesn't exist.
+        Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'.
+        When creating collections dynamically (insert/upsert), the actual vector dimensions will be used.
+        """
+        try:
+            # Try to create the collection directly - will fail if it already exists
+            self.client.create_collection(
+                collection_name=mt_collection_name,
+                vectors_config=models.VectorParams(
+                    size=dimension,
+                    distance=models.Distance.COSINE,
+                    on_disk=self.QDRANT_ON_DISK,
+                ),
+                hnsw_config=models.HnswConfigDiff(
+                    payload_m=16,  # Enable per-tenant indexing
+                    m=0,
+                    on_disk=self.QDRANT_ON_DISK,
+                ),
+            )
+
+            # Create tenant ID payload index
+            self.client.create_payload_index(
+                collection_name=mt_collection_name,
+                field_name="tenant_id",
+                field_schema=models.KeywordIndexParams(
+                    type=models.KeywordIndexType.KEYWORD,
+                    is_tenant=True,
+                    on_disk=self.QDRANT_ON_DISK,
+                ),
+                wait=True,
+            )
+
+            log.info(
+                f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!"
+            )
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            # Check for the specific error indicating collection already exists
+            status_code, error_msg = self._extract_error_message(e)
+
+            # HTTP status code 409 or gRPC ALREADY_EXISTS
+            if (isinstance(e, UnexpectedResponse) and status_code == 409) or (
+                isinstance(e, grpc.RpcError)
+                and e.code() == grpc.StatusCode.ALREADY_EXISTS
+            ):
+                if "already exists" in error_msg:
+                    log.debug(f"Collection {mt_collection_name} already exists")
+                    return
+            # If it's not an already exists error, re-raise
+            raise e
+        except Exception as e:
+            raise e
+
+    def _create_points(self, items: list[VectorItem], tenant_id: str):
+        """
+        Create point structs from vector items with tenant ID.
+        """
+        return [
+            PointStruct(
+                id=item["id"],
+                vector=item["vector"],
+                payload={
+                    "text": item["text"],
+                    "metadata": item["metadata"],
+                    "tenant_id": tenant_id,
+                },
+            )
+            for item in items
+        ]
+
+    def has_collection(self, collection_name: str) -> bool:
+        """
+        Check if a logical collection exists by checking for any points with the tenant ID.
+        """
+        if not self.client:
+            return False
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Create tenant filter
+        tenant_filter = models.FieldCondition(
+            key="tenant_id", match=models.MatchValue(value=tenant_id)
+        )
+
+        try:
+            # Try directly querying - most of the time collection should exist
+            response = self.client.query_points(
+                collection_name=mt_collection,
+                query_filter=models.Filter(must=[tenant_filter]),
+                limit=1,
+            )
+
+            # Collection exists with this tenant ID if there are points
+            return len(response.points) > 0
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            if self._is_collection_not_found_error(e):
+                log.debug(f"Collection {mt_collection} doesn't exist")
+                return False
+            else:
+                # For other API errors, log and return False
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unexpected Qdrant error: {error_msg}")
+                return False
+        except Exception as e:
+            # For any other errors, log and return False
+            log.debug(f"Error checking collection {mt_collection}: {e}")
+            return False
+
+    def delete(
+        self,
+        collection_name: str,
+        ids: Optional[list[str]] = None,
+        filter: Optional[dict] = None,
+    ):
+        """
+        Delete vectors by ID or filter from a collection with tenant isolation.
+        """
+        if not self.client:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Create tenant filter
+        tenant_filter = models.FieldCondition(
+            key="tenant_id", match=models.MatchValue(value=tenant_id)
+        )
+
+        must_conditions = [tenant_filter]
+        should_conditions = []
+
+        if ids:
+            for id_value in ids:
+                should_conditions.append(
+                    models.FieldCondition(
+                        key="metadata.id",
+                        match=models.MatchValue(value=id_value),
+                    ),
+                )
+        elif filter:
+            for key, value in filter.items():
+                must_conditions.append(
+                    models.FieldCondition(
+                        key=f"metadata.{key}",
+                        match=models.MatchValue(value=value),
+                    ),
+                )
+
+        try:
+            # Try to delete directly - most of the time collection should exist
+            update_result = self.client.delete(
+                collection_name=mt_collection,
+                points_selector=models.FilterSelector(
+                    filter=models.Filter(must=must_conditions, should=should_conditions)
+                ),
+            )
+
+            return update_result
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            if self._is_collection_not_found_error(e):
+                log.debug(
+                    f"Collection {mt_collection} doesn't exist, nothing to delete"
+                )
+                return None
+            else:
+                # For other API errors, log and re-raise
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unexpected Qdrant error: {error_msg}")
+                raise
+        except Exception as e:
+            # For non-Qdrant exceptions, re-raise
+            raise
+
+    def search(
+        self, collection_name: str, vectors: list[list[float | int]], limit: int
+    ) -> Optional[SearchResult]:
+        """
+        Search for the nearest neighbor items based on the vectors with tenant isolation.
+        """
+        if not self.client:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Get the vector dimension from the query vector
+        dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None
+
+        try:
+            # Try the search operation directly - most of the time collection should exist
+
+            # Create tenant filter
+            tenant_filter = models.FieldCondition(
+                key="tenant_id", match=models.MatchValue(value=tenant_id)
+            )
+
+            # Ensure vector dimensions match the collection
+            collection_dim = self.client.get_collection(
+                mt_collection
+            ).config.params.vectors.size
+
+            if collection_dim != dimension:
+                if collection_dim < dimension:
+                    vectors = [vector[:collection_dim] for vector in vectors]
+                else:
+                    vectors = [
+                        vector + [0] * (collection_dim - dimension)
+                        for vector in vectors
+                    ]
+
+            # Search with tenant filter
+            prefetch_query = models.Prefetch(
+                filter=models.Filter(must=[tenant_filter]),
+                limit=NO_LIMIT,
+            )
+            query_response = self.client.query_points(
+                collection_name=mt_collection,
+                query=vectors[0],
+                prefetch=prefetch_query,
+                limit=limit,
+            )
+
+            get_result = self._result_to_get_result(query_response.points)
+            return SearchResult(
+                ids=get_result.ids,
+                documents=get_result.documents,
+                metadatas=get_result.metadatas,
+                # qdrant distance is [-1, 1], normalize to [0, 1]
+                distances=[
+                    [(point.score + 1.0) / 2.0 for point in query_response.points]
+                ],
+            )
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            if self._is_collection_not_found_error(e):
+                log.debug(
+                    f"Collection {mt_collection} doesn't exist, search returns None"
+                )
+                return None
+            else:
+                # For other API errors, log and re-raise
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unexpected Qdrant error during search: {error_msg}")
+                raise
+        except Exception as e:
+            # For non-Qdrant exceptions, log and return None
+            log.exception(f"Error searching collection '{collection_name}': {e}")
+            return None
+
+    def query(self, collection_name: str, filter: dict, limit: Optional[int] = None):
+        """
+        Query points with filters and tenant isolation.
+        """
+        if not self.client:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Set default limit if not provided
+        if limit is None:
+            limit = NO_LIMIT
+
+        # Create tenant filter
+        tenant_filter = models.FieldCondition(
+            key="tenant_id", match=models.MatchValue(value=tenant_id)
+        )
+
+        # Create metadata filters
+        field_conditions = []
+        for key, value in filter.items():
+            field_conditions.append(
+                models.FieldCondition(
+                    key=f"metadata.{key}", match=models.MatchValue(value=value)
+                )
+            )
+
+        # Combine tenant filter with metadata filters
+        combined_filter = models.Filter(must=[tenant_filter, *field_conditions])
+
+        try:
+            # Try the query directly - most of the time collection should exist
+            points = self.client.query_points(
+                collection_name=mt_collection,
+                query_filter=combined_filter,
+                limit=limit,
+            )
+
+            return self._result_to_get_result(points.points)
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            if self._is_collection_not_found_error(e):
+                log.debug(
+                    f"Collection {mt_collection} doesn't exist, query returns None"
+                )
+                return None
+            else:
+                # For other API errors, log and re-raise
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unexpected Qdrant error during query: {error_msg}")
+                raise
+        except Exception as e:
+            # For non-Qdrant exceptions, log and re-raise
+            log.exception(f"Error querying collection '{collection_name}': {e}")
+            return None
+
+    def get(self, collection_name: str) -> Optional[GetResult]:
+        """
+        Get all items in a collection with tenant isolation.
+        """
+        if not self.client:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Create tenant filter
+        tenant_filter = models.FieldCondition(
+            key="tenant_id", match=models.MatchValue(value=tenant_id)
+        )
+
+        try:
+            # Try to get points directly - most of the time collection should exist
+            points = self.client.query_points(
+                collection_name=mt_collection,
+                query_filter=models.Filter(must=[tenant_filter]),
+                limit=NO_LIMIT,
+            )
+
+            return self._result_to_get_result(points.points)
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            if self._is_collection_not_found_error(e):
+                log.debug(f"Collection {mt_collection} doesn't exist, get returns None")
+                return None
+            else:
+                # For other API errors, log and re-raise
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unexpected Qdrant error during get: {error_msg}")
+                raise
+        except Exception as e:
+            # For non-Qdrant exceptions, log and return None
+            log.exception(f"Error getting collection '{collection_name}': {e}")
+            return None
+
+    def _handle_operation_with_error_retry(
+        self, operation_name, mt_collection, points, dimension
+    ):
+        """
+        Private helper to handle common error cases for insert and upsert operations.
+
+        Args:
+            operation_name: 'insert' or 'upsert'
+            mt_collection: The multi-tenant collection name
+            points: The vector points to insert/upsert
+            dimension: The dimension of the vectors
+
+        Returns:
+            The operation result (for upsert) or None (for insert)
+        """
+        try:
+            if operation_name == "insert":
+                self.client.upload_points(mt_collection, points)
+                return None
+            else:  # upsert
+                return self.client.upsert(mt_collection, points)
+        except (UnexpectedResponse, grpc.RpcError) as e:
+            # Handle collection not found
+            if self._is_collection_not_found_error(e):
+                log.info(
+                    f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}."
+                )
+                # Create collection with correct dimensions from our vectors
+                self._create_multi_tenant_collection_if_not_exists(
+                    mt_collection_name=mt_collection, dimension=dimension
+                )
+                # Try operation again - no need for dimension adjustment since we just created with correct dimensions
+                if operation_name == "insert":
+                    self.client.upload_points(mt_collection, points)
+                    return None
+                else:  # upsert
+                    return self.client.upsert(mt_collection, points)
+
+            # Handle dimension mismatch
+            elif self._is_dimension_mismatch_error(e):
+                # For dimension errors, the collection must exist, so get its configuration
+                mt_collection_info = self.client.get_collection(mt_collection)
+                existing_size = mt_collection_info.config.params.vectors.size
+
+                log.info(
+                    f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}"
+                )
+
+                if existing_size < dimension:
+                    # Truncate vectors to fit
+                    log.info(
+                        f"Truncating vectors from {dimension} to {existing_size} dimensions"
+                    )
+                    points = [
+                        PointStruct(
+                            id=point.id,
+                            vector=point.vector[:existing_size],
+                            payload=point.payload,
+                        )
+                        for point in points
+                    ]
+                elif existing_size > dimension:
+                    # Pad vectors with zeros
+                    log.info(
+                        f"Padding vectors from {dimension} to {existing_size} dimensions with zeros"
+                    )
+                    points = [
+                        PointStruct(
+                            id=point.id,
+                            vector=point.vector
+                            + [0] * (existing_size - len(point.vector)),
+                            payload=point.payload,
+                        )
+                        for point in points
+                    ]
+                # Try operation again with adjusted dimensions
+                if operation_name == "insert":
+                    self.client.upload_points(mt_collection, points)
+                    return None
+                else:  # upsert
+                    return self.client.upsert(mt_collection, points)
+            else:
+                # Not a known error we can handle, log and re-raise
+                _, error_msg = self._extract_error_message(e)
+                log.warning(f"Unhandled Qdrant error: {error_msg}")
+                raise
+        except Exception as e:
+            # For non-Qdrant exceptions, re-raise
+            raise
+
+    def insert(self, collection_name: str, items: list[VectorItem]):
+        """
+        Insert items with tenant ID.
+        """
+        if not self.client or not items:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Get dimensions from the actual vectors
+        dimension = len(items[0]["vector"]) if items else None
+
+        # Create points with tenant ID
+        points = self._create_points(items, tenant_id)
+
+        # Handle the operation with error retry
+        return self._handle_operation_with_error_retry(
+            "insert", mt_collection, points, dimension
+        )
+
+    def upsert(self, collection_name: str, items: list[VectorItem]):
+        """
+        Upsert items with tenant ID.
+        """
+        if not self.client or not items:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        # Get dimensions from the actual vectors
+        dimension = len(items[0]["vector"]) if items else None
+
+        # Create points with tenant ID
+        points = self._create_points(items, tenant_id)
+
+        # Handle the operation with error retry
+        return self._handle_operation_with_error_retry(
+            "upsert", mt_collection, points, dimension
+        )
+
+    def reset(self):
+        """
+        Reset the database by deleting all collections.
+        """
+        if not self.client:
+            return None
+
+        collection_names = self.client.get_collections().collections
+        for collection_name in collection_names:
+            if collection_name.name.startswith(self.collection_prefix):
+                self.client.delete_collection(collection_name=collection_name.name)
+
+    def delete_collection(self, collection_name: str):
+        """
+        Delete a collection.
+        """
+        if not self.client:
+            return None
+
+        # Map to multi-tenant collection and tenant ID
+        mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name)
+
+        tenant_filter = models.FieldCondition(
+            key="tenant_id", match=models.MatchValue(value=tenant_id)
+        )
+
+        field_conditions = [tenant_filter]
+
+        update_result = self.client.delete(
+            collection_name=mt_collection,
+            points_selector=models.FilterSelector(
+                filter=models.Filter(must=field_conditions)
+            ),
+        )
+
+        if self.client.get_collection(mt_collection).points_count == 0:
+            self.client.delete_collection(mt_collection)
+
+        return update_result

+ 8 - 3
backend/open_webui/retrieval/vector/factory.py

@@ -1,6 +1,6 @@
 from open_webui.retrieval.vector.main import VectorDBBase
 from open_webui.retrieval.vector.type import VectorType
-from open_webui.config import VECTOR_DB
+from open_webui.config import VECTOR_DB, QDRANT_MULTI_TENANCY
 
 
 class Vector:
@@ -16,9 +16,14 @@ class Vector:
 
                 return MilvusClient()
             case VectorType.QDRANT:
-                from open_webui.retrieval.vector.dbs.qdrant import QdrantClient
+                if QDRANT_MULTI_TENANCY:
+                    from open_webui.retrieval.vector.dbs.qdrant_multitenancy import QdrantClient
 
-                return QdrantClient()
+                    return QdrantClient()
+                else:
+                    from open_webui.retrieval.vector.dbs.qdrant import QdrantClient
+
+                    return QdrantClient()
             case VectorType.PINECONE:
                 from open_webui.retrieval.vector.dbs.pinecone import PineconeClient