123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import asyncio
- from typing import Optional
- from contextlib import AsyncExitStack
- from mcp import ClientSession
- from mcp.client.auth import OAuthClientProvider, TokenStorage
- from mcp.client.streamable_http import streamablehttp_client
- from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
- class MCPClient:
- def __init__(self):
- self.session: Optional[ClientSession] = None
- self.exit_stack = AsyncExitStack()
- async def connect(self, url: str, headers: Optional[dict] = None):
- try:
- self._streams_context = streamablehttp_client(url, headers=headers)
- transport = await self.exit_stack.enter_async_context(self._streams_context)
- read_stream, write_stream, _ = transport
- self._session_context = ClientSession(
- read_stream, write_stream
- ) # pylint: disable=W0201
- self.session = await self.exit_stack.enter_async_context(
- self._session_context
- )
- await self.session.initialize()
- except Exception as e:
- await self.disconnect()
- raise e
- async def list_tool_specs(self) -> Optional[dict]:
- if not self.session:
- raise RuntimeError("MCP client is not connected.")
- result = await self.session.list_tools()
- tools = result.tools
- tool_specs = []
- for tool in tools:
- name = tool.name
- description = tool.description
- inputSchema = tool.inputSchema
- # TODO: handle outputSchema if needed
- outputSchema = getattr(tool, "outputSchema", None)
- tool_specs.append(
- {"name": name, "description": description, "parameters": inputSchema}
- )
- return tool_specs
- async def call_tool(
- self, function_name: str, function_args: dict
- ) -> Optional[dict]:
- if not self.session:
- raise RuntimeError("MCP client is not connected.")
- result = await self.session.call_tool(function_name, function_args)
- if not result:
- raise Exception("No result returned from MCP tool call.")
- result_dict = result.model_dump(mode="json")
- result_content = result_dict.get("content", {})
- if result.isError:
- raise Exception(result_content)
- else:
- return result_content
- async def list_resources(self, cursor: Optional[str] = None) -> Optional[dict]:
- if not self.session:
- raise RuntimeError("MCP client is not connected.")
- result = await self.session.list_resources(cursor=cursor)
- if not result:
- raise Exception("No result returned from MCP list_resources call.")
- result_dict = result.model_dump()
- resources = result_dict.get("resources", [])
- return resources
- async def read_resource(self, uri: str) -> Optional[dict]:
- if not self.session:
- raise RuntimeError("MCP client is not connected.")
- result = await self.session.read_resource(uri)
- if not result:
- raise Exception("No result returned from MCP read_resource call.")
- result_dict = result.model_dump()
- return result_dict
- async def disconnect(self):
- # Clean up and close the session
- await self.exit_stack.aclose()
- async def __aenter__(self):
- await self.exit_stack.__aenter__()
- return self
- async def __aexit__(self, exc_type, exc_value, traceback):
- await self.exit_stack.__aexit__(exc_type, exc_value, traceback)
- await self.disconnect()
|