client.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import asyncio
  2. from typing import Optional
  3. from contextlib import AsyncExitStack
  4. from mcp import ClientSession
  5. from mcp.client.auth import OAuthClientProvider, TokenStorage
  6. from mcp.client.streamable_http import streamablehttp_client
  7. from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
  8. class MCPClient:
  9. def __init__(self):
  10. self.session: Optional[ClientSession] = None
  11. self.exit_stack = AsyncExitStack()
  12. async def connect(self, url: str, headers: Optional[dict] = None):
  13. try:
  14. self._streams_context = streamablehttp_client(url, headers=headers)
  15. transport = await self.exit_stack.enter_async_context(self._streams_context)
  16. read_stream, write_stream, _ = transport
  17. self._session_context = ClientSession(
  18. read_stream, write_stream
  19. ) # pylint: disable=W0201
  20. self.session = await self.exit_stack.enter_async_context(
  21. self._session_context
  22. )
  23. await self.session.initialize()
  24. except Exception as e:
  25. await self.disconnect()
  26. raise e
  27. async def list_tool_specs(self) -> Optional[dict]:
  28. if not self.session:
  29. raise RuntimeError("MCP client is not connected.")
  30. result = await self.session.list_tools()
  31. tools = result.tools
  32. tool_specs = []
  33. for tool in tools:
  34. name = tool.name
  35. description = tool.description
  36. inputSchema = tool.inputSchema
  37. # TODO: handle outputSchema if needed
  38. outputSchema = getattr(tool, "outputSchema", None)
  39. tool_specs.append(
  40. {"name": name, "description": description, "parameters": inputSchema}
  41. )
  42. return tool_specs
  43. async def call_tool(
  44. self, function_name: str, function_args: dict
  45. ) -> Optional[dict]:
  46. if not self.session:
  47. raise RuntimeError("MCP client is not connected.")
  48. result = await self.session.call_tool(function_name, function_args)
  49. if not result:
  50. raise Exception("No result returned from MCP tool call.")
  51. result_dict = result.model_dump(mode="json")
  52. result_content = result_dict.get("content", {})
  53. if result.isError:
  54. raise Exception(result_content)
  55. else:
  56. return result_content
  57. async def list_resources(self, cursor: Optional[str] = None) -> Optional[dict]:
  58. if not self.session:
  59. raise RuntimeError("MCP client is not connected.")
  60. result = await self.session.list_resources(cursor=cursor)
  61. if not result:
  62. raise Exception("No result returned from MCP list_resources call.")
  63. result_dict = result.model_dump()
  64. resources = result_dict.get("resources", [])
  65. return resources
  66. async def read_resource(self, uri: str) -> Optional[dict]:
  67. if not self.session:
  68. raise RuntimeError("MCP client is not connected.")
  69. result = await self.session.read_resource(uri)
  70. if not result:
  71. raise Exception("No result returned from MCP read_resource call.")
  72. result_dict = result.model_dump()
  73. return result_dict
  74. async def disconnect(self):
  75. # Clean up and close the session
  76. await self.exit_stack.aclose()
  77. async def __aenter__(self):
  78. await self.exit_stack.__aenter__()
  79. return self
  80. async def __aexit__(self, exc_type, exc_value, traceback):
  81. await self.exit_stack.__aexit__(exc_type, exc_value, traceback)
  82. await self.disconnect()