Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 148 additions & 173 deletions agentplatform/_genai/_agent_engines_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,30 +111,18 @@


try:
from a2a.types import (
AgentCard,
TransportProtocol,
Message,
TaskIdParams,
TaskQueryParams,
)
from a2a.client import ClientConfig, ClientFactory

AgentCard = AgentCard
TransportProtocol = TransportProtocol
Message = Message
ClientConfig = ClientConfig
ClientFactory = ClientFactory
TaskIdParams = TaskIdParams
TaskQueryParams = TaskQueryParams
from a2a.types import AgentCard
from a2a.client import ClientConfig, ClientFactory
from a2a.utils.constants import TransportProtocol
except (ImportError, AttributeError):
AgentCard = None
TransportProtocol = None
Message = None
ClientConfig = None
ClientFactory = None
TaskIdParams = None
TaskQueryParams = None
AgentCard = None
TransportProtocol = None
ClientConfig = None
ClientFactory = None
SendMessageRequest = None
GetTaskRequest = None
CancelTaskRequest = None
GetExtendedAgentCardRequest = None
try:
from autogen.agentchat import chat

Expand Down Expand Up @@ -1760,162 +1748,149 @@ def _method(self: genai_types.AgentEngine, **kwargs) -> Iterator[Any]: # type:
def _wrap_async_stream_query_operation(
*, method_name: str
) -> Callable[..., AsyncIterator[Any]]:
"""Wraps an Agent Engine method, creating an async callable for `stream_query` API.

This function creates a callable object that executes the specified
Agent Engine method using the `stream_query` API. It handles the
creation of the API request and the processing of the API response.

The reserved keyword argument `http_options` is consumed by this
wrapper (rather than being forwarded to the deployed agent as part of
`input`) and is propagated to the underlying HTTP call.
"""Wraps an Agent Engine method, creating an async callable for `stream_query` API.

This function creates a callable object that executes the specified
Agent Engine method using the `stream_query` API. It handles the
creation of the API request and the processing of the API response.

The reserved keyword argument `http_options` is consumed by this
wrapper (rather than being forwarded to the deployed agent as part of
`input`) and is propagated to the underlying HTTP call.

Args:
method_name: The name of the Agent Engine method to call.
doc: Documentation string for the method.

Returns:
A callable object that executes the method on the Agent Engine via
the `stream_query` API.
"""

async def _method(self: genai_types.AgentEngine, **kwargs) -> AsyncIterator[Any]: # type: ignore[no-untyped-def]
if not self.api_client:
raise ValueError("api_client is not initialized.")
if not self.api_resource:
raise ValueError("api_resource is not initialized.")
http_options = kwargs.pop("http_options", None)
async for http_response in self.api_client._async_stream_query(
name=self.api_resource.name,
config={
"class_method": method_name,
"input": kwargs,
"include_all_fields": True,
"http_options": http_options,
},
):
for line in _yield_parsed_json(http_response=http_response):
if line is not None:
yield line

return _method


def _wrap_a2a_operation(
method_name: str, agent_card: str
) -> Callable[..., list[Any]]:
"""Wraps an Agent Engine method, creating a callable for A2A API.

Args:
method_name: The name of the Agent Engine method to call.
agent_card: The agent card to use for the A2A API call.
Example: { 'name': 'Sample Agent', 'description': ( 'A helpful
assistant agent that can answer questions.' ),
'supportedInterfaces': [{ 'url': 'http://localhost:8080/a2a/rest/',
'protocolBinding': 'HTTP+JSON', 'protocolVersion': '1.0', }],
'version': '1.0.0', 'capabilities': { 'streaming': True,
'pushNotifications': False, 'extendedAgentCard': True, },
'defaultInputModes': ['text'], 'defaultOutputModes': ['text'],
'skills': [{ 'id': 'question_answer', 'name': 'Q&A Agent',
'description': ( 'A helpful assistant agent that can answer
questions.' ), 'tags': ['Question-Answer'], 'examples': [ 'Who is
leading 2025 F1 Standings?', 'Where can i find an active volcano?',
], 'inputModes': ['text'], 'outputModes': ['text'], }], }

Returns:
A callable object that executes the method on the Agent Engine via
the A2A API.
"""

async def _method(self, **kwargs) -> Any: # type: ignore[no-untyped-def]
if not self.api_client:
raise ValueError("api_client is not initialized.")
if not self.api_resource:
raise ValueError("api_resource is not initialized.")

a2a_agent_card = AgentCard()
json_format.ParseDict(
json.loads(agent_card), a2a_agent_card, ignore_unknown_fields=True
)

Args:
method_name: The name of the Agent Engine method to call.
doc: Documentation string for the method.
if a2a_agent_card.supported_interfaces:
interface = a2a_agent_card.supported_interfaces[0]
if interface.protocol_binding != TransportProtocol.HTTP_JSON:
raise ValueError(
"Only HTTP+JSON is supported for preferred transport on agent card"
)
else:
raise ValueError("Agent card does not define any supported interfaces.")

Returns:
A callable object that executes the method on the Agent Engine via
the `stream_query` API.
"""
base_url = self.api_client._api_client._http_options.base_url.rstrip("/")
api_version = self.api_client._api_client._http_options.api_version
a2a_agent_card.supported_interfaces[0].url = (
f"{base_url}/{api_version}/{self.api_resource.name}/a2a"
)

async def _method(self: genai_types.AgentEngine, **kwargs) -> AsyncIterator[Any]: # type: ignore[no-untyped-def]
if not self.api_client:
raise ValueError("api_client is not initialized.")
if not self.api_resource:
raise ValueError("api_resource is not initialized.")
http_options = kwargs.pop("http_options", None)
async for http_response in self.api_client._async_stream_query(
name=self.api_resource.name,
config={
"class_method": method_name,
"input": kwargs,
"include_all_fields": True,
"http_options": http_options,
config = ClientConfig(
supported_protocol_bindings=[
TransportProtocol.HTTP_JSON,
],
use_client_preference=True,
httpx_client=httpx.AsyncClient(
headers={
"Authorization": (
f"Bearer {self.api_client._api_client._credentials.token}"
)
},
):
for line in _yield_parsed_json(http_response=http_response):
if line is not None:
yield line

return _method


def _wrap_a2a_operation(method_name: str, agent_card: str) -> Callable[..., list[Any]]:
"""Wraps an Agent Engine method, creating a callable for A2A API.

Args:
method_name: The name of the Agent Engine method to call.
agent_card: The agent card to use for the A2A API call.
Example:
{'additionalInterfaces': None,
'capabilities': {'extensions': None,
'pushNotifications': None,
'stateTransitionHistory': None,
'streaming': False},
'defaultInputModes': ['text'],
'defaultOutputModes': ['text'],
'description': (
'A helpful assistant agent that can answer questions.'
),
'documentationUrl': None,
'iconUrl': None,
'name': 'Q&A Agent',
'preferredTransport': 'JSONRPC',
'protocolVersion': '0.3.0',
'provider': None,
'security': None,
'securitySchemes': None,
'signatures': None,
'skills': [{
'description': (
'A helpful assistant agent that can answer questions.'
),
'examples': ['Who is leading 2025 F1 Standings?',
'Where can i find an active volcano?'],
'id': 'question_answer',
'inputModes': None,
'name': 'Q&A Agent',
'outputModes': None,
'security': None,
'tags': ['Question-Answer']}],
'supportsAuthenticatedExtendedCard': True,
'url': 'http://localhost:8080/',
'version': '1.0.0'}
Returns:
A callable object that executes the method on the Agent Engine via
the A2A API.
"""

async def _method(self, **kwargs) -> Any: # type: ignore[no-untyped-def]
"""Wraps an Agent Engine method, creating a callable for A2A API."""
if not self.api_client:
raise ValueError("api_client is not initialized.")
if not self.api_resource:
raise ValueError("api_resource is not initialized.")
a2a_agent_card = AgentCard(**json.loads(agent_card))
# A2A + AE integration currently only supports Rest API.
if (
a2a_agent_card.preferred_transport
and a2a_agent_card.preferred_transport != TransportProtocol.http_json
):
raise ValueError(
"Only HTTP+JSON is supported for preferred transport on agent card "
)

# Set preferred transport to HTTP+JSON if not set.
if not hasattr(a2a_agent_card, "preferred_transport"):
a2a_agent_card.preferred_transport = TransportProtocol.http_json

if not hasattr(a2a_agent_card.capabilities, "streaming"):
a2a_agent_card.capabilities.streaming = False

# agent_card is set on the class_methods before set_up is invoked.
# Ensure that the agent_card url is set correctly before the client is created.
base_url = self.api_client._api_client._http_options.base_url.rstrip("/")
api_version = self.api_client._api_client._http_options.api_version
a2a_agent_card.url = f"{base_url}/{api_version}/{self.api_resource.name}/a2a"

# Using a2a client, inject the auth token from the global config.
config = ClientConfig(
supported_transports=[
TransportProtocol.http_json,
],
use_client_preference=True,
httpx_client=httpx.AsyncClient(
headers={
"Authorization": (
f"Bearer {self.api_client._api_client._credentials.token}"
)
},
timeout=(
self.api_client._api_client._http_options.timeout / 1000.0
if self.api_client._api_client._http_options.timeout
else None
),
timeout=(
self.api_client._api_client._http_options.timeout / 1000.0
if self.api_client._api_client._http_options.timeout
else None
),
)
factory = ClientFactory(config)
client = factory.create(a2a_agent_card)

if method_name == "on_message_send":
response = client.send_message(Message(**kwargs))
chunks = []
async for chunk in response:
chunks.append(chunk)
return chunks
elif method_name == "on_get_task":
response = await client.get_task(TaskQueryParams(**kwargs))
elif method_name == "on_cancel_task":
response = await client.cancel_task(TaskIdParams(**kwargs))
elif method_name == "handle_authenticated_agent_card":
response = await client.get_card()
else:
raise ValueError(f"Unknown method name: {method_name}")

return response
),
)
factory = ClientFactory(config)
client = factory.create(a2a_agent_card)

context = kwargs.pop("context", None)
if context is not None:
from a2a.client.client import ClientCallContext

if not isinstance(context, ClientCallContext):
actual_context = ClientCallContext()
if hasattr(context, "state"):
actual_context.state = context.state
elif isinstance(context, dict):
actual_context.state = context
context = actual_context

req = kwargs["request"]
if method_name == "on_message_send":
response = client.send_message(req, context=context)
chunks = []
async for chunk in response:
chunks.append(chunk)
return chunks
elif method_name == "on_get_task":
return await client.get_task(req, context=context)
elif method_name == "on_cancel_task":
return await client.cancel_task(req, context=context)
elif method_name == "on_get_extended_agent_card":
return await client.get_extended_agent_card(req, context=context)
else:
raise ValueError(f"Unknown method name: {method_name}")

return _method # type: ignore[return-value]
return _method # type: ignore[return-value]


def _yield_parsed_json(http_response: google_genai_types.HttpResponse) -> Iterator[Any]:
Expand Down
Loading
Loading