client.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import asyncio
  2. from typing import Optional
  3. from contextlib import AsyncExitStack
  4. from mcp import ClientSession
  5. from mcp.client.auth import OAuthClientProvider, TokenStorage
  6. from mcp.client.streamable_http import streamablehttp_client
  7. from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
  8. class MCPClient:
  9. def __init__(self):
  10. self.session: Optional[ClientSession] = None
  11. self.exit_stack = AsyncExitStack()
  12. async def connect(
  13. self, url: str, headers: Optional[dict] = None, auth: Optional[any] = None
  14. ):
  15. self._streams_context = streamablehttp_client(url, headers=headers, auth=auth)
  16. read_stream, write_stream, _ = (
  17. await self._streams_context.__aenter__()
  18. ) # pylint: disable=E1101
  19. self._session_context = ClientSession(
  20. read_stream, write_stream
  21. ) # pylint: disable=W0201
  22. self.session: ClientSession = (
  23. await self._session_context.__aenter__()
  24. ) # pylint: disable=C2801
  25. await self.session.initialize()
  26. async def list_tool_specs(self) -> Optional[dict]:
  27. if not self.session:
  28. raise RuntimeError("MCP client is not connected.")
  29. result = await self.session.list_tools()
  30. tools = result.tools
  31. tool_specs = []
  32. for tool in tools:
  33. name = tool.name
  34. description = tool.description
  35. inputSchema = tool.inputSchema
  36. # TODO: handle outputSchema if needed
  37. outputSchema = getattr(tool, "outputSchema", None)
  38. tool_specs.append(
  39. {"name": name, "description": description, "parameters": inputSchema}
  40. )
  41. return tool_specs
  42. async def call_tool(
  43. self, function_name: str, function_args: dict
  44. ) -> Optional[dict]:
  45. if not self.session:
  46. raise RuntimeError("MCP client is not connected.")
  47. result = await self.session.call_tool(function_name, function_args)
  48. if not result:
  49. raise Exception("No result returned from MCP tool call.")
  50. result_dict = result.model_dump()
  51. result_content = result_dict.get("content", {})
  52. if result.isError:
  53. raise Exception(result_content)
  54. else:
  55. return result_content
  56. async def disconnect(self):
  57. # Clean up and close the session
  58. if self.session:
  59. await self._session_context.__aexit__(
  60. None, None, None
  61. ) # pylint: disable=E1101
  62. if self._streams_context:
  63. await self._streams_context.__aexit__(
  64. None, None, None
  65. ) # pylint: disable=E1101
  66. self.session = None
  67. async def __aenter__(self):
  68. await self.exit_stack.__aenter__()
  69. return self
  70. async def __aexit__(self, exc_type, exc_value, traceback):
  71. await self.exit_stack.__aexit__(exc_type, exc_value, traceback)
  72. await self.disconnect()