"""MCPServer for CrewAI."""

from __future__ import annotations

from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any

from crewai.tools import BaseTool
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
from crewai.utilities.string_utils import sanitize_tool_name
from pydantic import BaseModel

from crewai_tools.adapters.tool_collection import ToolCollection


if TYPE_CHECKING:
    from mcp import StdioServerParameters
    from mcp.types import CallToolResult, TextContent, Tool
    from mcpadapt.core import MCPAdapt, ToolAdapter


logger = logging.getLogger(__name__)


try:
    from mcp import StdioServerParameters
    from mcp.types import CallToolResult, TextContent, Tool
    from mcpadapt.core import MCPAdapt, ToolAdapter

    class CrewAIToolAdapter(ToolAdapter):
        """Adapter that creates CrewAI tools with properly normalized JSON schemas.

        This adapter bypasses mcpadapt's model creation which adds invalid null values
        to field schemas, instead using CrewAI's own schema utilities.
        """

        def adapt(
            self,
            func: Callable[[dict[str, Any] | None], CallToolResult],
            mcp_tool: Tool,
        ) -> BaseTool:
            """Adapt a MCP tool to a CrewAI tool.

            Args:
                func: The function to call when the tool is invoked.
                mcp_tool: The MCP tool definition to adapt.

            Returns:
                A CrewAI BaseTool instance.
            """
            tool_name = sanitize_tool_name(mcp_tool.name)
            tool_description = mcp_tool.description or ""
            args_model = create_model_from_schema(mcp_tool.inputSchema)

            class CrewAIMCPTool(BaseTool):
                name: str = tool_name
                description: str = tool_description
                args_schema: type[BaseModel] = args_model

                def _run(self, **kwargs: Any) -> Any:
                    result = func(kwargs)
                    if len(result.content) == 1:
                        first_content = result.content[0]
                        if isinstance(first_content, TextContent):
                            return first_content.text
                        return str(first_content)
                    return str(
                        [
                            content.text
                            for content in result.content
                            if isinstance(content, TextContent)
                        ]
                    )

                def _generate_description(self) -> None:
                    schema = self.args_schema.model_json_schema()
                    schema.pop("$defs", None)
                    self.description = (
                        f"Tool Name: {self.name}\n"
                        f"Tool Arguments: {schema}\n"
                        f"Tool Description: {self.description}"
                    )

            return CrewAIMCPTool()

        async def async_adapt(self, afunc: Any, mcp_tool: Tool) -> Any:
            """Async adaptation is not supported by CrewAI."""
            raise NotImplementedError("async is not supported by the CrewAI framework.")

    MCP_AVAILABLE = True
except ImportError as e:
    logger.debug(f"MCP packages not available: {e}")
    MCP_AVAILABLE = False


class MCPServerAdapter:
    """Manages the lifecycle of an MCP server and make its tools available to CrewAI.

    Note: tools can only be accessed after the server has been started with the
        `start()` method.

    Usage:
        # context manager + stdio
        with MCPServerAdapter(...) as tools:
            # tools is now available

        # context manager + sse
        with MCPServerAdapter({"url": "http://localhost:8000/sse"}) as tools:
            # tools is now available

        # context manager with filtered tools
        with MCPServerAdapter(..., "tool1", "tool2") as filtered_tools:
            # only tool1 and tool2 are available

        # context manager with custom connect timeout (60 seconds)
        with MCPServerAdapter(..., connect_timeout=60) as tools:
            # tools is now available with longer timeout

        # manually stop mcp server
        try:
            mcp_server = MCPServerAdapter(...)
            tools = mcp_server.tools  # all tools

            # or with filtered tools and custom timeout
            mcp_server = MCPServerAdapter(..., "tool1", "tool2", connect_timeout=45)
            filtered_tools = mcp_server.tools  # only tool1 and tool2
            ...
        finally:
            mcp_server.stop()

        # Best practice is ensure cleanup is done after use.
        mcp_server.stop() # run after crew().kickoff()
    """

    def __init__(
        self,
        serverparams: StdioServerParameters | dict[str, Any],
        *tool_names: str,
        connect_timeout: int = 30,
    ) -> None:
        """Initialize the MCP Server.

        Args:
            serverparams: The parameters for the MCP server it supports either a
                `StdioServerParameters` or a `dict` respectively for STDIO and SSE.
            *tool_names: Optional names of tools to filter. If provided, only tools with
                matching names will be available.
            connect_timeout: Connection timeout in seconds to the MCP server (default is 30s).

        """
        super().__init__()
        self._adapter = None
        self._tools = None
        self._tool_names = (
            [sanitize_tool_name(name) for name in tool_names] if tool_names else None
        )

        if not MCP_AVAILABLE:
            import click

            if click.confirm(
                "You are missing the 'mcp' package. Would you like to install it?"
            ):
                import subprocess

                try:
                    subprocess.run(["uv", "add", "mcp crewai-tools'[mcp]'"], check=True)  # noqa: S607

                except subprocess.CalledProcessError as e:
                    raise ImportError("Failed to install mcp package") from e
            else:
                raise ImportError(
                    "`mcp` package not found, please run `uv add crewai-tools[mcp]`"
                )

        try:
            self._serverparams = serverparams
            self._adapter = MCPAdapt(
                self._serverparams, CrewAIToolAdapter(), connect_timeout
            )
            self.start()

        except Exception as e:
            if self._adapter is not None:
                try:
                    self.stop()
                except Exception as stop_e:
                    logger.error(f"Error during stop cleanup: {stop_e}")
            raise RuntimeError(f"Failed to initialize MCP Adapter: {e}") from e

    def start(self) -> None:
        """Start the MCP server and initialize the tools."""
        self._tools = self._adapter.__enter__()  # type: ignore[union-attr]

    def stop(self) -> None:
        """Stop the MCP server."""
        self._adapter.__exit__(None, None, None)  # type: ignore[union-attr]

    @property
    def tools(self) -> ToolCollection[BaseTool]:
        """The CrewAI tools available from the MCP server.

        Raises:
            ValueError: If the MCP server is not started.

        Returns:
            The CrewAI tools available from the MCP server.
        """
        if self._tools is None:
            raise ValueError(
                "MCP server not started, run `mcp_server.start()` first before accessing `tools`"
            )

        tools_collection = ToolCollection(self._tools)
        if self._tool_names:
            return tools_collection.filter_by_names(self._tool_names)
        return tools_collection

    def __enter__(self) -> ToolCollection[BaseTool]:
        """Enter the context manager.

        Note that `__init__()` already starts the MCP server,
        so tools should already be available.
        """
        return self.tools

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: Any,
    ) -> None:
        """Exit the context manager."""
        self._adapter.__exit__(exc_type, exc_value, traceback)  # type: ignore[union-attr]
