Skip to content

Agent

Agent

any_agent.AnyAgent

Bases: ABC

Base abstract class for all agent implementations.

This provides a unified interface for different agent frameworks.

Source code in src/any_agent/frameworks/any_agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
class AnyAgent(ABC):
    """Base abstract class for all agent implementations.

    This provides a unified interface for different agent frameworks.
    """

    def __init__(self, config: AgentConfig):
        self.config = config

        self._mcp_servers: list[_MCPServerBase[Any]] = []
        self._tools: list[Any] = []

        self._instrumentor = _get_instrumentor_by_framework(self.framework)
        self._tracer: Tracer = otel_trace.get_tracer(SCOPE_NAME)

        self._lock = asyncio.Lock()
        self._running_traces: dict[int, AgentTrace] = {}

    @staticmethod
    def _get_agent_type_by_framework(
        framework_raw: AgentFramework | str,
    ) -> type[AnyAgent]:
        framework = AgentFramework.from_string(framework_raw)

        if framework is AgentFramework.SMOLAGENTS:
            from any_agent.frameworks.smolagents import SmolagentsAgent

            return SmolagentsAgent

        if framework is AgentFramework.LANGCHAIN:
            from any_agent.frameworks.langchain import LangchainAgent

            return LangchainAgent

        if framework is AgentFramework.OPENAI:
            from any_agent.frameworks.openai import OpenAIAgent

            return OpenAIAgent

        if framework is AgentFramework.LLAMA_INDEX:
            from any_agent.frameworks.llama_index import LlamaIndexAgent

            return LlamaIndexAgent

        if framework is AgentFramework.GOOGLE:
            from any_agent.frameworks.google import GoogleAgent

            return GoogleAgent

        if framework is AgentFramework.AGNO:
            from any_agent.frameworks.agno import AgnoAgent

            return AgnoAgent

        if framework is AgentFramework.TINYAGENT:
            from any_agent.frameworks.tinyagent import TinyAgent

            return TinyAgent

        assert_never(framework)

    @classmethod
    def create(
        cls,
        agent_framework: AgentFramework | str,
        agent_config: AgentConfig,
    ) -> AnyAgent:
        """Create an agent using the given framework and config."""
        return run_async_in_sync(
            cls.create_async(
                agent_framework=agent_framework,
                agent_config=agent_config,
            )
        )

    @classmethod
    async def create_async(
        cls,
        agent_framework: AgentFramework | str,
        agent_config: AgentConfig,
    ) -> AnyAgent:
        """Create an agent using the given framework and config."""
        agent_cls = cls._get_agent_type_by_framework(agent_framework)
        agent = agent_cls(agent_config)
        await agent._load_agent()
        return agent

    async def _load_tools(
        self, tools: Sequence[Tool]
    ) -> tuple[list[Any], list[_MCPServerBase[Any]]]:
        tools, mcp_servers = await _wrap_tools(tools, self.framework)
        # Add to agent so that it doesn't get garbage collected
        self._mcp_servers.extend(mcp_servers)
        for mcp_server in mcp_servers:
            tools.extend(mcp_server.tools)
        return tools, mcp_servers

    def run(self, prompt: str, **kwargs: Any) -> AgentTrace:
        """Run the agent with the given prompt."""
        return run_async_in_sync(self.run_async(prompt, **kwargs))

    async def run_async(
        self, prompt: str, instrument: bool = True, **kwargs: Any
    ) -> AgentTrace:
        """Run the agent asynchronously with the given prompt.

        Args:
            prompt: The user prompt to be passed to the agent.
            instrument: Whether to instrument the underlying framework
                to generate LLM Calls and Tool Execution Spans.

                If `False` the returned `AgentTrace` will only
                contain a single `invoke_agent` span.

            kwargs: Will be passed to the underlying runner used
                by the framework.

        Returns:
            The `AgentTrace` containing information about the
                steps taken by the agent.

        """
        trace = AgentTrace()
        trace_id: int
        instrumentation_enabled = instrument and self._instrumentor is not None

        # This design is so that we only catch exceptions thrown by _run_async. All other exceptions will not be caught.
        try:
            with self._tracer.start_as_current_span(
                f"invoke_agent [{self.config.name}]"
            ) as invoke_span:
                if instrumentation_enabled:
                    trace_id = invoke_span.get_span_context().trace_id
                    async with self._lock:
                        # We check the locked `_running_traces` inside `instrument`.
                        # If there is more than 1 entry in `running_traces`, it means that the agent has
                        # already being instrumented so we won't instrument it again.
                        self._running_traces[trace_id] = AgentTrace()
                        self._instrumentor.instrument(
                            agent=self,  # type: ignore[arg-type]
                        )

                invoke_span.set_attributes(
                    {
                        "gen_ai.operation.name": "invoke_agent",
                        "gen_ai.agent.name": self.config.name,
                        "gen_ai.agent.description": self.config.description
                        or "No description.",
                        "gen_ai.request.model": self.config.model_id,
                    }
                )
                final_output = await self._run_async(prompt, **kwargs)
        except Exception as e:
            # Clean up instrumentation if it was enabled
            if instrumentation_enabled:
                async with self._lock:
                    self._instrumentor.uninstrument(self)  # type: ignore[arg-type]
                    # Get the instrumented trace if available, otherwise use the original trace
                    instrumented_trace = self._running_traces.pop(trace_id)
                    if instrumented_trace is not None:
                        trace = instrumented_trace
            trace.add_span(invoke_span)
            raise AgentRunError(trace, e) from e

        if instrumentation_enabled:
            async with self._lock:
                self._instrumentor.uninstrument(self)  # type: ignore[arg-type]
                trace = self._running_traces.pop(trace_id)

        trace.add_span(invoke_span)
        trace.final_output = final_output
        return trace

    def _serve_a2a(self, serving_config: A2AServingConfig | None) -> None:
        from any_agent.serving import A2AServingConfig, _get_a2a_app, serve_a2a

        if serving_config is None:
            serving_config = A2AServingConfig()

        app = _get_a2a_app(self, serving_config=serving_config)

        serve_a2a(
            app,
            host=serving_config.host,
            port=serving_config.port,
            endpoint=serving_config.endpoint,
            log_level=serving_config.log_level,
        )

    def _serve_mcp(self, serving_config: MCPServingConfig) -> None:
        from any_agent.serving import (
            serve_mcp,
        )

        serve_mcp(
            self,
            host=serving_config.host,
            port=serving_config.port,
            endpoint=serving_config.endpoint,
            log_level=serving_config.log_level,
        )

    @overload
    def serve(self, serving_config: MCPServingConfig) -> None: ...

    @overload
    def serve(self, serving_config: A2AServingConfig | None = None) -> None: ...

    def serve(
        self, serving_config: MCPServingConfig | A2AServingConfig | None = None
    ) -> None:
        """Serve this agent using the protocol defined in the serving_config.

        Args:
            serving_config: Configuration for serving the agent. If None, uses default A2AServingConfig.
                          Must be an instance of A2AServingConfig or MCPServingConfig.

        Raises:
            ImportError: If the `a2a` dependencies are not installed and an `A2AServingConfig` is used.

        Example:
            ```
            agent = AnyAgent.create("tinyagent", AgentConfig(...))
            config = A2AServingConfig(port=8080, endpoint="/my-agent")
            agent.serve(config)
            ```

        """
        from any_agent.serving import MCPServingConfig

        if isinstance(serving_config, MCPServingConfig):
            self._serve_mcp(serving_config)
        else:
            self._serve_a2a(serving_config)

    async def _serve_a2a_async(
        self, serving_config: A2AServingConfig | None
    ) -> tuple[asyncio.Task[Any], uvicorn.Server]:
        from any_agent.serving import (
            A2AServingConfig,
            _get_a2a_app_async,
            serve_a2a_async,
        )

        if serving_config is None:
            serving_config = A2AServingConfig()

        app = await _get_a2a_app_async(self, serving_config=serving_config)

        return await serve_a2a_async(
            app,
            host=serving_config.host,
            port=serving_config.port,
            endpoint=serving_config.endpoint,
            log_level=serving_config.log_level,
        )

    async def _serve_mcp_async(
        self, serving_config: MCPServingConfig
    ) -> tuple[asyncio.Task[Any], uvicorn.Server]:
        from any_agent.serving import serve_mcp_async

        return await serve_mcp_async(
            self,
            host=serving_config.host,
            port=serving_config.port,
            endpoint=serving_config.endpoint,
            log_level=serving_config.log_level,
        )

    @overload
    async def serve_async(
        self, serving_config: MCPServingConfig
    ) -> tuple[asyncio.Task[Any], uvicorn.Server]: ...

    @overload
    async def serve_async(
        self, serving_config: A2AServingConfig | None = None
    ) -> tuple[asyncio.Task[Any], uvicorn.Server]: ...

    async def serve_async(
        self, serving_config: MCPServingConfig | A2AServingConfig | None = None
    ) -> tuple[asyncio.Task[Any], uvicorn.Server]:
        """Serve this agent asynchronously using the protocol defined in the serving_config.

        Args:
            serving_config: Configuration for serving the agent. If None, uses default A2AServingConfig.
                          Must be an instance of A2AServingConfig or MCPServingConfig.

        Raises:
            ImportError: If the `a2a` dependencies are not installed and an `A2AServingConfig` is used.

        Example:
            ```
            agent = await AnyAgent.create_async("tinyagent", AgentConfig(...))
            config = MCPServingConfig(port=8080)
            task, server = await agent.serve_async(config)
            try:
                # Server is running
                await asyncio.sleep(10)
            finally:
                server.should_exit = True
                await task
            ```

        """
        from any_agent.serving import MCPServingConfig

        if isinstance(serving_config, MCPServingConfig):
            return await self._serve_mcp_async(serving_config)
        return await self._serve_a2a_async(serving_config)

    def _recreate_with_config(self, new_config: AgentConfig) -> AnyAgent:
        """Create a new agent instance with the given config, preserving MCP servers and tools.

        This method creates a new agent with the modified configuration while transferring
        the MCP servers and tools from the current agent to avoid recreating them, but only
        if the tools configuration hasn't changed.

        Args:
            new_config: The new configuration to use for the recreated agent.

        Returns:
            A new agent instance with the modified config and transferred state (if tools unchanged)
            or a completely new agent (if tools changed).

        """
        return run_async_in_sync(self._recreate_with_config_async(new_config))

    async def _recreate_with_config_async(self, new_config: AgentConfig) -> AnyAgent:
        """Async version of _recreate_with_config.

        This method creates a new agent with the modified configuration while transferring
        the MCP servers and tools from the current agent to avoid recreating them, but only
        if the tools configuration hasn't changed.

        Args:
            new_config: The new configuration to use for the recreated agent.

        Returns:
            A new agent instance with the modified config and transferred state (if tools unchanged)
            or a completely new agent (if tools changed).

        """
        # Check if tools configuration has changed
        if self.config.tools != new_config.tools:
            # Tools have changed, so we need to recreate everything from scratch
            logger.info(
                "Tools have changed, so we need to recreate everything from scratch"
            )
            return await self.create_async(self.framework, new_config)

        # Tools haven't changed, so we can safely preserve MCP servers and tools
        # Create the new agent with the modified config
        # Don't use AnyAgent.create_async(), because it will recreate the MCP servers and tools
        new_agent = self.__class__(new_config)

        # Transfer MCP servers and tools from the original agent to avoid recreating them
        new_agent._mcp_servers = self._mcp_servers
        new_agent._tools = self._tools

        # Load the agent with the existing MCP servers and tools
        await new_agent._load_agent()

        return new_agent

    @abstractmethod
    async def _load_agent(self) -> None:
        """Load the agent instance."""

    @abstractmethod
    async def _run_async(self, prompt: str, **kwargs: Any) -> str | BaseModel:
        """To be implemented by each framework."""

    @property
    @abstractmethod
    def framework(self) -> AgentFramework:
        """The Agent Framework used."""

    @property
    def agent(self) -> Any:
        """The underlying agent implementation from the framework.

        This property is intentionally restricted to maintain framework abstraction
        and prevent direct dependency on specific agent implementations.

        If you need functionality that relies on accessing the underlying agent:
        1. Consider if the functionality can be added to the AnyAgent interface
        2. Submit a GitHub issue describing your use case
        3. Contribute a PR implementing the needed functionality

        Raises:
            NotImplementedError: Always raised when this property is accessed

        """
        msg = "Cannot access the 'agent' property of AnyAgent, if you need to use functionality that relies on the underlying agent framework, please file a Github Issue or we welcome a PR to add the functionality to the AnyAgent class"
        raise NotImplementedError(msg)

agent property

The underlying agent implementation from the framework.

This property is intentionally restricted to maintain framework abstraction and prevent direct dependency on specific agent implementations.

If you need functionality that relies on accessing the underlying agent: 1. Consider if the functionality can be added to the AnyAgent interface 2. Submit a GitHub issue describing your use case 3. Contribute a PR implementing the needed functionality

Raises:

Type Description
NotImplementedError

Always raised when this property is accessed

framework abstractmethod property

The Agent Framework used.

create(agent_framework, agent_config) classmethod

Create an agent using the given framework and config.

Source code in src/any_agent/frameworks/any_agent.py
@classmethod
def create(
    cls,
    agent_framework: AgentFramework | str,
    agent_config: AgentConfig,
) -> AnyAgent:
    """Create an agent using the given framework and config."""
    return run_async_in_sync(
        cls.create_async(
            agent_framework=agent_framework,
            agent_config=agent_config,
        )
    )

create_async(agent_framework, agent_config) async classmethod

Create an agent using the given framework and config.

Source code in src/any_agent/frameworks/any_agent.py
@classmethod
async def create_async(
    cls,
    agent_framework: AgentFramework | str,
    agent_config: AgentConfig,
) -> AnyAgent:
    """Create an agent using the given framework and config."""
    agent_cls = cls._get_agent_type_by_framework(agent_framework)
    agent = agent_cls(agent_config)
    await agent._load_agent()
    return agent

run(prompt, **kwargs)

Run the agent with the given prompt.

Source code in src/any_agent/frameworks/any_agent.py
def run(self, prompt: str, **kwargs: Any) -> AgentTrace:
    """Run the agent with the given prompt."""
    return run_async_in_sync(self.run_async(prompt, **kwargs))

run_async(prompt, instrument=True, **kwargs) async

Run the agent asynchronously with the given prompt.

Parameters:

Name Type Description Default
prompt str

The user prompt to be passed to the agent.

required
instrument bool

Whether to instrument the underlying framework to generate LLM Calls and Tool Execution Spans.

If False the returned AgentTrace will only contain a single invoke_agent span.

True
kwargs Any

Will be passed to the underlying runner used by the framework.

{}

Returns:

Type Description
AgentTrace

The AgentTrace containing information about the steps taken by the agent.

Source code in src/any_agent/frameworks/any_agent.py
async def run_async(
    self, prompt: str, instrument: bool = True, **kwargs: Any
) -> AgentTrace:
    """Run the agent asynchronously with the given prompt.

    Args:
        prompt: The user prompt to be passed to the agent.
        instrument: Whether to instrument the underlying framework
            to generate LLM Calls and Tool Execution Spans.

            If `False` the returned `AgentTrace` will only
            contain a single `invoke_agent` span.

        kwargs: Will be passed to the underlying runner used
            by the framework.

    Returns:
        The `AgentTrace` containing information about the
            steps taken by the agent.

    """
    trace = AgentTrace()
    trace_id: int
    instrumentation_enabled = instrument and self._instrumentor is not None

    # This design is so that we only catch exceptions thrown by _run_async. All other exceptions will not be caught.
    try:
        with self._tracer.start_as_current_span(
            f"invoke_agent [{self.config.name}]"
        ) as invoke_span:
            if instrumentation_enabled:
                trace_id = invoke_span.get_span_context().trace_id
                async with self._lock:
                    # We check the locked `_running_traces` inside `instrument`.
                    # If there is more than 1 entry in `running_traces`, it means that the agent has
                    # already being instrumented so we won't instrument it again.
                    self._running_traces[trace_id] = AgentTrace()
                    self._instrumentor.instrument(
                        agent=self,  # type: ignore[arg-type]
                    )

            invoke_span.set_attributes(
                {
                    "gen_ai.operation.name": "invoke_agent",
                    "gen_ai.agent.name": self.config.name,
                    "gen_ai.agent.description": self.config.description
                    or "No description.",
                    "gen_ai.request.model": self.config.model_id,
                }
            )
            final_output = await self._run_async(prompt, **kwargs)
    except Exception as e:
        # Clean up instrumentation if it was enabled
        if instrumentation_enabled:
            async with self._lock:
                self._instrumentor.uninstrument(self)  # type: ignore[arg-type]
                # Get the instrumented trace if available, otherwise use the original trace
                instrumented_trace = self._running_traces.pop(trace_id)
                if instrumented_trace is not None:
                    trace = instrumented_trace
        trace.add_span(invoke_span)
        raise AgentRunError(trace, e) from e

    if instrumentation_enabled:
        async with self._lock:
            self._instrumentor.uninstrument(self)  # type: ignore[arg-type]
            trace = self._running_traces.pop(trace_id)

    trace.add_span(invoke_span)
    trace.final_output = final_output
    return trace

serve(serving_config=None)

serve(serving_config: MCPServingConfig) -> None
serve(serving_config: A2AServingConfig | None = None) -> None

Serve this agent using the protocol defined in the serving_config.

Parameters:

Name Type Description Default
serving_config MCPServingConfig | A2AServingConfig | None

Configuration for serving the agent. If None, uses default A2AServingConfig. Must be an instance of A2AServingConfig or MCPServingConfig.

None

Raises:

Type Description
ImportError

If the a2a dependencies are not installed and an A2AServingConfig is used.

Example
agent = AnyAgent.create("tinyagent", AgentConfig(...))
config = A2AServingConfig(port=8080, endpoint="/my-agent")
agent.serve(config)
Source code in src/any_agent/frameworks/any_agent.py
def serve(
    self, serving_config: MCPServingConfig | A2AServingConfig | None = None
) -> None:
    """Serve this agent using the protocol defined in the serving_config.

    Args:
        serving_config: Configuration for serving the agent. If None, uses default A2AServingConfig.
                      Must be an instance of A2AServingConfig or MCPServingConfig.

    Raises:
        ImportError: If the `a2a` dependencies are not installed and an `A2AServingConfig` is used.

    Example:
        ```
        agent = AnyAgent.create("tinyagent", AgentConfig(...))
        config = A2AServingConfig(port=8080, endpoint="/my-agent")
        agent.serve(config)
        ```

    """
    from any_agent.serving import MCPServingConfig

    if isinstance(serving_config, MCPServingConfig):
        self._serve_mcp(serving_config)
    else:
        self._serve_a2a(serving_config)

serve_async(serving_config=None) async

serve_async(serving_config: MCPServingConfig) -> tuple[asyncio.Task[Any], uvicorn.Server]
serve_async(serving_config: A2AServingConfig | None = None) -> tuple[asyncio.Task[Any], uvicorn.Server]

Serve this agent asynchronously using the protocol defined in the serving_config.

Parameters:

Name Type Description Default
serving_config MCPServingConfig | A2AServingConfig | None

Configuration for serving the agent. If None, uses default A2AServingConfig. Must be an instance of A2AServingConfig or MCPServingConfig.

None

Raises:

Type Description
ImportError

If the a2a dependencies are not installed and an A2AServingConfig is used.

Example
agent = await AnyAgent.create_async("tinyagent", AgentConfig(...))
config = MCPServingConfig(port=8080)
task, server = await agent.serve_async(config)
try:
    # Server is running
    await asyncio.sleep(10)
finally:
    server.should_exit = True
    await task
Source code in src/any_agent/frameworks/any_agent.py
async def serve_async(
    self, serving_config: MCPServingConfig | A2AServingConfig | None = None
) -> tuple[asyncio.Task[Any], uvicorn.Server]:
    """Serve this agent asynchronously using the protocol defined in the serving_config.

    Args:
        serving_config: Configuration for serving the agent. If None, uses default A2AServingConfig.
                      Must be an instance of A2AServingConfig or MCPServingConfig.

    Raises:
        ImportError: If the `a2a` dependencies are not installed and an `A2AServingConfig` is used.

    Example:
        ```
        agent = await AnyAgent.create_async("tinyagent", AgentConfig(...))
        config = MCPServingConfig(port=8080)
        task, server = await agent.serve_async(config)
        try:
            # Server is running
            await asyncio.sleep(10)
        finally:
            server.should_exit = True
            await task
        ```

    """
    from any_agent.serving import MCPServingConfig

    if isinstance(serving_config, MCPServingConfig):
        return await self._serve_mcp_async(serving_config)
    return await self._serve_a2a_async(serving_config)

any_agent.AgentRunError

Bases: Exception

Error that wraps underlying framework specific errors and carries spans.

Source code in src/any_agent/frameworks/any_agent.py
class AgentRunError(Exception):
    """Error that wraps underlying framework specific errors and carries spans."""

    _trace: AgentTrace
    _original_exception: Exception

    def __init__(self, trace: AgentTrace, original_exception: Exception):
        self._trace = trace
        self._original_exception = original_exception
        # Set the exception message to be the original exception's message
        super().__init__(str(original_exception))

    @property
    def trace(self) -> AgentTrace:
        return self._trace

    @property
    def original_exception(self) -> Exception:
        return self._original_exception

    def __str__(self) -> str:
        """Return the string representation of the original exception."""
        return str(self._original_exception)

    def __repr__(self) -> str:
        """Return the detailed representation of the AgentRunError."""
        return f"AgentRunError({self._original_exception!r})"

__repr__()

Return the detailed representation of the AgentRunError.

Source code in src/any_agent/frameworks/any_agent.py
def __repr__(self) -> str:
    """Return the detailed representation of the AgentRunError."""
    return f"AgentRunError({self._original_exception!r})"

__str__()

Return the string representation of the original exception.

Source code in src/any_agent/frameworks/any_agent.py
def __str__(self) -> str:
    """Return the string representation of the original exception."""
    return str(self._original_exception)