Skip to content

Tools

any_agent.tools

MCPClient

Bases: BaseModel

Unified MCP client that handles all transport types and frameworks.

Source code in src/any_agent/tools/mcp/mcp_client.py
class MCPClient(BaseModel):
    """Unified MCP client that handles all transport types and frameworks."""

    config: MCPParams
    framework: AgentFramework

    _session: ClientSession | None = PrivateAttr(default=None)
    _exit_stack: AsyncExitStack = PrivateAttr(default_factory=AsyncExitStack)
    _client: Any | None = PrivateAttr(default=None)
    _get_session_id_callback: Callable[[], str | None] | None = PrivateAttr(
        default=None
    )

    model_config = ConfigDict(arbitrary_types_allowed=True)

    def model_post_init(self, __context: Any, /) -> None:
        """Initialize the MCP client and check dependencies."""
        if missing_mcp_error:
            msg = "You need to `pip install 'any-agent[mcp]'` to use MCP."
            raise ImportError(msg) from missing_mcp_error

    async def connect(self) -> None:
        """Connect using the appropriate transport type."""
        if isinstance(self.config, MCPStdio):
            server_params = StdioServerParameters(
                command=self.config.command,
                args=list(self.config.args),
                env={**os.environ},
            )
            self._client = stdio_client(server_params)
            read, write = await self._exit_stack.enter_async_context(self._client)
        elif isinstance(self.config, MCPSse):
            self._client = sse_client(
                url=self.config.url,
                headers=dict(self.config.headers or {}),
            )
            read, write = await self._exit_stack.enter_async_context(self._client)
        elif isinstance(self.config, MCPStreamableHttp):
            self._client = streamablehttp_client(
                url=self.config.url,
                headers=dict(self.config.headers or {}),
            )
            transport = await self._exit_stack.enter_async_context(self._client)
            read, write, self._get_session_id_callback = transport
        else:
            msg = f"Unsupported MCP config type: {type(self.config)}"
            raise ValueError(msg)

        # Create and initialize session (common for all transports)
        timeout = (
            timedelta(seconds=self.config.client_session_timeout_seconds)
            if self.config.client_session_timeout_seconds
            else None
        )
        client_session = ClientSession(read, write, timeout)
        self._session = await self._exit_stack.enter_async_context(client_session)
        await self._session.initialize()

    async def list_raw_tools(self) -> list[MCPTool]:
        """Get raw MCP tools from the server."""
        if not self._session:
            msg = "Not connected to MCP server. Call connect() first."
            raise ValueError(msg)

        available_tools = await self._session.list_tools()
        return self._filter_tools(available_tools.tools)

    async def list_tools(self) -> list[Callable[..., Any]]:
        """Get tools converted to callable functions that work with any framework."""
        raw_tools = await self.list_raw_tools()
        return self._convert_tools_to_callables(raw_tools)

    def _filter_tools(self, tools: Sequence[MCPTool]) -> list[MCPTool]:
        """Filter tools based on config."""
        requested_tools = list(self.config.tools or [])
        if not requested_tools:
            return list(tools)

        name_to_tool = {tool.name: tool for tool in tools}
        missing_tools = [name for name in requested_tools if name not in name_to_tool]
        if missing_tools:
            error_message = dedent(
                f"""Could not find all requested tools in the MCP server:
                Requested ({len(requested_tools)}): {requested_tools}
                Available ({len(name_to_tool)}): {list(name_to_tool.keys())}
                Missing: {missing_tools}
                """
            )
            raise ValueError(error_message)

        return [name_to_tool[name] for name in requested_tools]

    def _convert_tools_to_callables(
        self, tools: list[MCPTool]
    ) -> list[Callable[..., Any]]:
        """Convert MCP tools to callable functions that work with any framework."""
        if not self._session:
            msg = "Session not available for tool conversion"
            raise ValueError(msg)
        tool_functions = []
        for tool in tools:
            tool_func = self._create_tool_function(tool)
            tool_functions.append(tool_func)
        return tool_functions

    def _create_tool_function(self, tool: MCPTool) -> Callable[..., Any]:
        """Create a properly typed function for an MCP tool."""
        name = tool.name
        description = tool.description or f"MCP tool: {name}"
        input_schema = tool.inputSchema

        # Extract parameters from schema
        parameters = []
        annotations = {}
        if input_schema and isinstance(input_schema, dict):
            properties = input_schema.get("properties", {})
            required = input_schema.get("required", [])

            for param_name, param_info in properties.items():
                # Use the improved schema conversion
                base_param_type = self._json_schema_to_python_type(param_info)

                if param_name not in required:
                    # For optional parameters, use Optional[T] for better framework compatibility
                    # Note: We use Optional instead of X | Y syntax because some frameworks
                    # (like Google) don't handle the union syntax properly
                    optional_param_type: Any = Optional[base_param_type]  # noqa: UP045
                    annotations[param_name] = optional_param_type
                    param = inspect.Parameter(
                        param_name,
                        inspect.Parameter.KEYWORD_ONLY,
                        default=None,
                        annotation=optional_param_type,
                    )
                else:
                    required_param_type: Any = base_param_type
                    annotations[param_name] = required_param_type
                    param = inspect.Parameter(
                        param_name,
                        inspect.Parameter.KEYWORD_ONLY,
                        annotation=required_param_type,
                    )
                parameters.append(param)

        # Create signature and enhanced docstring
        signature = inspect.Signature(parameters, return_annotation=str)
        enhanced_description = self._create_enhanced_description(
            description, input_schema
        )

        # Create the actual function
        async def mcp_tool_function(**kwargs: Any) -> str:
            """Dynamically created MCP tool function."""
            try:
                if not self._session:
                    return f"Error: MCP session not available for tool {name}"
                result = await self._session.call_tool(name, kwargs)
                if hasattr(result, "content") and result.content:
                    if hasattr(result.content[0], "text"):
                        return result.content[0].text
                    return str(result.content[0])
                return str(result)
            except Exception as e:
                return f"Error calling MCP tool {name}: {e!s}"

        # Set function metadata
        mcp_tool_function.__name__ = name
        mcp_tool_function.__doc__ = enhanced_description
        mcp_tool_function.__signature__ = signature  # type: ignore[attr-defined]
        mcp_tool_function.__annotations__ = {**annotations, "return": str}

        return mcp_tool_function

    TYPE_MAPPING: ClassVar[dict[str, type]] = {
        "string": str,
        "integer": int,
        "number": float,
        "boolean": bool,
        "array": list,
        "object": dict,
    }

    def _json_schema_to_python_type(self, schema: dict[str, Any]) -> type:
        """Convert JSON schema to Python type using robust conversion."""
        schema_type = schema.get("type", "string")
        return self.TYPE_MAPPING.get(schema_type, str)

    def _create_enhanced_description(self, description: str, input_schema: Any) -> str:
        """Create enhanced docstring with parameter descriptions."""
        enhanced_description = description
        if input_schema and isinstance(input_schema, dict):
            properties = input_schema.get("properties", {})
            if properties:
                param_descriptions = []
                for param_name, param_info in properties.items():
                    param_desc = param_info.get(
                        "description", f"Parameter {param_name}"
                    )
                    param_descriptions.append(f"    {param_name}: {param_desc}")

                if param_descriptions:
                    enhanced_description += "\n\nArgs:\n" + "\n".join(
                        param_descriptions
                    )
        return enhanced_description

    async def disconnect(self) -> None:
        """Clean up resources."""
        await self._exit_stack.aclose()
        self._session = None
        self._client = None
connect() async

Connect using the appropriate transport type.

Source code in src/any_agent/tools/mcp/mcp_client.py
async def connect(self) -> None:
    """Connect using the appropriate transport type."""
    if isinstance(self.config, MCPStdio):
        server_params = StdioServerParameters(
            command=self.config.command,
            args=list(self.config.args),
            env={**os.environ},
        )
        self._client = stdio_client(server_params)
        read, write = await self._exit_stack.enter_async_context(self._client)
    elif isinstance(self.config, MCPSse):
        self._client = sse_client(
            url=self.config.url,
            headers=dict(self.config.headers or {}),
        )
        read, write = await self._exit_stack.enter_async_context(self._client)
    elif isinstance(self.config, MCPStreamableHttp):
        self._client = streamablehttp_client(
            url=self.config.url,
            headers=dict(self.config.headers or {}),
        )
        transport = await self._exit_stack.enter_async_context(self._client)
        read, write, self._get_session_id_callback = transport
    else:
        msg = f"Unsupported MCP config type: {type(self.config)}"
        raise ValueError(msg)

    # Create and initialize session (common for all transports)
    timeout = (
        timedelta(seconds=self.config.client_session_timeout_seconds)
        if self.config.client_session_timeout_seconds
        else None
    )
    client_session = ClientSession(read, write, timeout)
    self._session = await self._exit_stack.enter_async_context(client_session)
    await self._session.initialize()
disconnect() async

Clean up resources.

Source code in src/any_agent/tools/mcp/mcp_client.py
async def disconnect(self) -> None:
    """Clean up resources."""
    await self._exit_stack.aclose()
    self._session = None
    self._client = None
list_raw_tools() async

Get raw MCP tools from the server.

Source code in src/any_agent/tools/mcp/mcp_client.py
async def list_raw_tools(self) -> list[MCPTool]:
    """Get raw MCP tools from the server."""
    if not self._session:
        msg = "Not connected to MCP server. Call connect() first."
        raise ValueError(msg)

    available_tools = await self._session.list_tools()
    return self._filter_tools(available_tools.tools)
list_tools() async

Get tools converted to callable functions that work with any framework.

Source code in src/any_agent/tools/mcp/mcp_client.py
async def list_tools(self) -> list[Callable[..., Any]]:
    """Get tools converted to callable functions that work with any framework."""
    raw_tools = await self.list_raw_tools()
    return self._convert_tools_to_callables(raw_tools)
model_post_init(__context)

Initialize the MCP client and check dependencies.

Source code in src/any_agent/tools/mcp/mcp_client.py
def model_post_init(self, __context: Any, /) -> None:
    """Initialize the MCP client and check dependencies."""
    if missing_mcp_error:
        msg = "You need to `pip install 'any-agent[mcp]'` to use MCP."
        raise ImportError(msg) from missing_mcp_error

a2a_tool(url, toolname=None, http_kwargs=None)

Perform a query using A2A to another agent (synchronous version).

Parameters:

Name Type Description Default
url str

The url in which the A2A agent is located.

required
toolname str

The name for the created tool. Defaults to call_{agent name in card}. Leading and trailing whitespace are removed. Whitespace in the middle is replaced by _.

None
http_kwargs dict

Additional kwargs to pass to the httpx client.

None

Returns:

Type Description
Callable[[str, Optional[str], Optional[str]], str]

A sync Callable that takes a query and returns the agent response.

Source code in src/any_agent/tools/a2a.py
def a2a_tool(
    url: str, toolname: Optional[str] = None, http_kwargs: dict[str, Any] | None = None
) -> Callable[[str, Optional[str], Optional[str]], str]:
    """Perform a query using A2A to another agent (synchronous version).

    Args:
        url (str): The url in which the A2A agent is located.
        toolname (str): The name for the created tool. Defaults to `call_{agent name in card}`.
            Leading and trailing whitespace are removed. Whitespace in the middle is replaced by `_`.
        http_kwargs (dict): Additional kwargs to pass to the httpx client.

    Returns:
        A sync `Callable` that takes a query and returns the agent response.

    """
    if not a2a_tool_available:
        msg = "You need to `pip install 'any-agent[a2a]'` to use this tool"
        raise ImportError(msg)

    # Fetch the async tool upfront to get proper name and documentation (otherwise the tool doesn't have the right name and documentation)
    async_tool = run_async_in_sync(a2a_tool_async(url, toolname, http_kwargs))

    def sync_wrapper(
        query: str, task_id: Optional[str] = None, context_id: Optional[str] = None
    ) -> Any:
        """Execute the A2A tool query synchronously."""
        return run_async_in_sync(async_tool(query, task_id, context_id))

    # Copy essential metadata from the async tool
    sync_wrapper.__name__ = async_tool.__name__
    sync_wrapper.__doc__ = async_tool.__doc__

    return sync_wrapper

a2a_tool_async(url, toolname=None, http_kwargs=None) async

Perform a query using A2A to another agent.

Parameters:

Name Type Description Default
url str

The url in which the A2A agent is located.

required
toolname str

The name for the created tool. Defaults to call_{agent name in card}. Leading and trailing whitespace are removed. Whitespace in the middle is replaced by _.

None
http_kwargs dict

Additional kwargs to pass to the httpx client.

None

Returns:

Type Description
Callable[[str, Optional[str], Optional[str]], Coroutine[Any, Any, dict[str, Any]]]

An async Callable that takes a query and returns the agent response.

Source code in src/any_agent/tools/a2a.py
async def a2a_tool_async(
    url: str, toolname: Optional[str] = None, http_kwargs: dict[str, Any] | None = None
) -> Callable[[str, Optional[str], Optional[str]], Coroutine[Any, Any, dict[str, Any]]]:
    """Perform a query using A2A to another agent.

    Args:
        url (str): The url in which the A2A agent is located.
        toolname (str): The name for the created tool. Defaults to `call_{agent name in card}`.
            Leading and trailing whitespace are removed. Whitespace in the middle is replaced by `_`.
        http_kwargs (dict): Additional kwargs to pass to the httpx client.

    Returns:
        An async `Callable` that takes a query and returns the agent response.

    """
    if not a2a_tool_available:
        msg = "You need to `pip install 'any-agent[a2a]'` to use this tool"
        raise ImportError(msg)

    if http_kwargs is None:
        http_kwargs = {}

    # Default timeout in httpx is 5 seconds. For an agent response, the default should be more lenient.
    if "timeout" not in http_kwargs:
        http_kwargs["timeout"] = 30.0

    async with httpx.AsyncClient(
        follow_redirects=True, **http_kwargs
    ) as resolver_client:
        a2a_agent_card: AgentCard = await (
            A2ACardResolver(httpx_client=resolver_client, base_url=url)
        ).get_agent_card()

    # NOTE: Use Optional[T] instead of T | None syntax throughout this module.
    # Google ADK's _parse_schema_from_parameter function has compatibility
    # with the traditional Optional[T] syntax for automatic function calling.
    # Using T | None syntax causes"Failed to parse the parameter ... for automatic function calling"
    async def _send_query(
        query: str, task_id: Optional[str] = None, context_id: Optional[str] = None
    ) -> dict[str, Any]:
        async with httpx.AsyncClient(follow_redirects=True) as query_client:
            client = A2AClient(httpx_client=query_client, agent_card=a2a_agent_card)
            send_message_payload = SendMessageRequest(
                id=str(uuid4()),
                params=MessageSendParams(
                    message=Message(
                        role=Role.user,
                        parts=[Part(root=TextPart(text=query))],
                        # the id is not currently tracked
                        message_id=str(uuid4().hex),
                        task_id=task_id,
                        context_id=context_id,
                    )
                ),
            )
            # TODO check how to capture exceptions and pass them on to the enclosing framework
            response = await client.send_message(
                send_message_payload, http_kwargs=http_kwargs
            )

            if not response.root:
                msg = (
                    "The A2A agent did not return a root. Are you using an A2A agent not managed by any-agent? "
                    "Please file an issue at https://github.com/mozilla-ai/any-agent/issues so we can help."
                )
                raise ValueError(msg)

            if isinstance(response.root, JSONRPCErrorResponse):
                response_dict = {
                    "error": response.root.error.message,
                    "code": response.root.error.code,
                    "data": response.root.error.data,
                }
            elif isinstance(response.root, SendMessageSuccessResponse):
                # Task
                if isinstance(response.root.result, Task):
                    task = response.root.result
                    response_dict = {
                        "timestamp": task.status.timestamp,
                        "status": task.status.state,
                    }
                    if task.status.message:
                        response_dict["task_id"] = task.status.message.task_id
                        response_dict["context_id"] = task.status.message.context_id
                        response_dict["message"] = {
                            " ".join(
                                [
                                    part.root.text
                                    for part in task.status.message.parts
                                    if isinstance(part.root, TextPart)
                                ]
                            )
                        }
                # Message
                else:
                    response_dict = {
                        "message": {
                            " ".join(
                                [
                                    part.root.text
                                    for part in response.root.result.parts
                                    if isinstance(part.root, TextPart)
                                ]
                            )
                        },
                        "task_id": response.root.result.task_id,
                    }
            else:
                msg = (
                    "The A2A agent did not return a error or a result. Are you using an A2A agent not managed by any-agent? "
                    "Please file an issue at https://github.com/mozilla-ai/any-agent/issues so we can help."
                )
                raise ValueError(msg)

            return response_dict

    new_name = toolname or a2a_agent_card.name
    new_name = re.sub(r"\s+", "_", new_name.strip())
    _send_query.__name__ = f"call_{new_name}"
    _send_query.__doc__ = f"""{a2a_agent_card.description}
        Send a query to the A2A hosted agent named {a2a_agent_card.name}.

        Agent description: {a2a_agent_card.description}

        Args:
            query (str): The query to send to the agent.
            task_id (str, optional): Task ID for continuing an incomplete task.
                If you want to start a new task, you should not provide a task_id (pass `task_id=None`).
                If you want to resume a task, use the same task_id from a previous response with TaskState.input_required.
            context_id (str, optional): Context ID for conversation continuity.
                If you want to start a new conversation, you should not provide a context_id (pass `context_id=None`).

        Returns:
            dict: Response from the A2A agent containing:
                - For successful responses: task_id, context_id, timestamp, status, and message
                - For errors: error message, code, and data

        Note:
            If TaskState is terminal (completed/failed), do not reuse the same task_id.
    """
    return _send_query

ask_user_verification(query)

Asks user to verify the given query.

Parameters:

Name Type Description Default
query str

The question that requires verification.

required
Source code in src/any_agent/tools/user_interaction.py
def ask_user_verification(query: str) -> str:
    """Asks user to verify the given `query`.

    Args:
        query: The question that requires verification.

    """
    return input(f"{query} => Type your answer here:")

prepare_final_output(output_type, instructions=None)

Prepare instructions and tools for structured output, returning the function directly.

Parameters:

Name Type Description Default
output_type type[BaseModel]

The Pydantic model type for structured output

required
instructions str | None

Original instructions to modify

None

Returns:

Type Description
tuple[str, Callable[[str], dict[str, str | bool | dict[str, Any] | list[Any]]]]

Tuple of (modified_instructions, final_output_function)

Source code in src/any_agent/tools/final_output.py
def prepare_final_output(
    output_type: type[BaseModel], instructions: str | None = None
) -> tuple[str, Callable[[str], dict[str, str | bool | dict[str, Any] | list[Any]]]]:
    """Prepare instructions and tools for structured output, returning the function directly.

    Args:
        output_type: The Pydantic model type for structured output
        instructions: Original instructions to modify

    Returns:
        Tuple of (modified_instructions, final_output_function)

    """
    tool_name = "final_output"
    modified_instructions = instructions or ""
    modified_instructions += (
        f"You must call the {tool_name} tool when finished."
        f"The 'answer' argument passed to the {tool_name} tool must be a JSON string that matches the following schema:\n"
        f"{output_type.model_json_schema()}"
    )

    def final_output_tool(
        answer: str,
    ) -> dict[str, str | bool | dict[str, Any] | list[Any]]:
        # First check if it's valid JSON
        try:
            parsed_answer = json.loads(answer)
        except json.JSONDecodeError as json_err:
            return {
                "success": False,
                "result": f"Invalid JSON format: {json_err}. Please fix the 'answer' parameter so that it is a valid JSON string and call this tool again.",
            }
        # Then validate against the Pydantic model
        try:
            output_type.model_validate_json(answer)
        except ValidationError as e:
            return {
                "success": False,
                "result": f"Please fix this validation error: {e}. The format must conform to {output_type.model_json_schema()}",
            }
        else:
            return {"success": True, "result": parsed_answer}

    # Set the function name and docstring
    final_output_tool.__name__ = tool_name
    final_output_tool.__doc__ = f"""This tool is used to validate the final output. It must be called when the final answer is ready in order to ensure that the output is valid.

    Args:
        answer: The final output that can be loaded as a Pydantic model. This must be a JSON compatible string that matches the following schema:
            {output_type.model_json_schema()}

    Returns:
        A dictionary with the following keys:
            - success: True if the output is valid, False otherwise.
            - result: The final output if success is True, otherwise an error message.

    """

    return modified_instructions, final_output_tool

search_tavily(query, include_images=False)

Perform a Tavily web search based on your query and return the top search results.

See https://blog.tavily.com/getting-started-with-the-tavily-search-api for more information.

Parameters:

Name Type Description Default
query str

The search query to perform.

required
include_images bool

Whether to include images in the results.

False

Returns:

Type Description
str

The top search results as a formatted string.

Source code in src/any_agent/tools/web_browsing.py
def search_tavily(query: str, include_images: bool = False) -> str:
    """Perform a Tavily web search based on your query and return the top search results.

    See https://blog.tavily.com/getting-started-with-the-tavily-search-api for more information.

    Args:
        query (str): The search query to perform.
        include_images (bool): Whether to include images in the results.

    Returns:
        The top search results as a formatted string.

    """
    try:
        from tavily.tavily import TavilyClient
    except ImportError as e:
        msg = "You need to `pip install 'tavily-python'` to use this tool"
        raise ImportError(msg) from e

    api_key = os.getenv("TAVILY_API_KEY")
    if not api_key:
        return "TAVILY_API_KEY environment variable not set."
    try:
        client = TavilyClient(api_key)
        response = client.search(query, include_images=include_images)
        results = response.get("results", [])
        output = []
        for result in results:
            output.append(
                f"[{result.get('title', 'No Title')}]({result.get('url', '#')})\n{result.get('content', '')}"
            )
        if include_images and "images" in response:
            output.append("\nImages:")
            for image in response["images"]:
                output.append(image)
        return "\n\n".join(output) if output else "No results found."
    except Exception as e:
        return f"Error performing Tavily search: {e!s}"

search_web(query)

Perform a duckduckgo web search based on your query (think a Google search) then returns the top search results.

Parameters:

Name Type Description Default
query str

The search query to perform.

required

Returns:

Type Description
str

The top search results.

Source code in src/any_agent/tools/web_browsing.py
def search_web(query: str) -> str:
    """Perform a duckduckgo web search based on your query (think a Google search) then returns the top search results.

    Args:
        query (str): The search query to perform.

    Returns:
        The top search results.

    """
    try:
        from duckduckgo_search import DDGS  # type: ignore[import-not-found]
    except ImportError as e:
        msg = "You need to `pip install 'duckduckgo_search'` to use this tool"
        raise ImportError(msg) from e

    ddgs = DDGS()
    results = ddgs.text(query, max_results=10)
    return "\n".join(
        f"[{result['title']}]({result['href']})\n{result['body']}" for result in results
    )

send_console_message(user, query)

Send the specified user a message via console and returns their response.

Parameters:

Name Type Description Default
query str

The question to ask the user.

required
user str

The user to ask the question to.

required

Returns:

Name Type Description
str str

The user's response.

Source code in src/any_agent/tools/user_interaction.py
def send_console_message(user: str, query: str) -> str:
    """Send the specified user a message via console and returns their response.

    Args:
        query: The question to ask the user.
        user: The user to ask the question to.

    Returns:
        str: The user's response.

    """
    return Prompt.ask(f"{query}\n{user}")

show_final_output(answer)

Show the final answer to the user.

Parameters:

Name Type Description Default
answer str

The final answer.

required
Source code in src/any_agent/tools/user_interaction.py
def show_final_output(answer: str) -> str:
    """Show the final answer to the user.

    Args:
        answer: The final answer.

    """
    logger.info(f"Final output: {answer}")
    return answer

show_plan(plan)

Show the current plan to the user.

Parameters:

Name Type Description Default
plan str

The current plan.

required
Source code in src/any_agent/tools/user_interaction.py
def show_plan(plan: str) -> str:
    """Show the current plan to the user.

    Args:
        plan: The current plan.

    """
    logger.info(f"Current plan: {plan}")
    return plan

visit_webpage(url, timeout=30, max_length=10000)

Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages.

Parameters:

Name Type Description Default
url str

The url of the webpage to visit.

required
timeout int

The timeout in seconds for the request.

30
max_length int

The maximum number of characters of text that can be returned (default=10000). If max_length==-1, text is not truncated and the full webpage is returned.

10000
Source code in src/any_agent/tools/web_browsing.py
def visit_webpage(url: str, timeout: int = 30, max_length: int = 10000) -> str:
    """Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages.

    Args:
        url: The url of the webpage to visit.
        timeout: The timeout in seconds for the request.
        max_length: The maximum number of characters of text that can be returned (default=10000).
                    If max_length==-1, text is not truncated and the full webpage is returned.

    """
    try:
        from markdownify import markdownify  # type: ignore[import-not-found]
    except ImportError as e:
        msg = "You need to `pip install 'markdownify'` to use this tool"
        raise ImportError(msg) from e

    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()

        markdown_content = markdownify(response.text).strip()

        markdown_content = re.sub(r"\n{2,}", "\n", markdown_content)

        if max_length == -1:
            return str(markdown_content)
        return _truncate_content(markdown_content, max_length)
    except RequestException as e:
        return f"Error fetching the webpage: {e!s}"
    except Exception as e:
        return f"An unexpected error occurred: {e!s}"