feat(Client): Introduce client side interceptors#796
feat(Client): Introduce client side interceptors#796guglielmo-san wants to merge 13 commits intoa2aproject:1.0-devfrom
Conversation
…k_client_interceptors
…hods for request and response processing.
…g and subscribe methods
|
/gemini review |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the client-side interceptor architecture. The primary goal is to centralize the application of interceptors within the Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request significantly refactors the client-side interceptor architecture, centralizing logic in BaseClient and introducing strongly-typed generic interfaces for hooks, which improves maintainability and type safety. However, the refactor is incomplete as it breaks the existing AuthInterceptor by not updating it to the new interface, rendering authentication non-functional. Additionally, a type mismatch in handling early returns for streaming methods (send_message and subscribe) could cause runtime errors. There are also a few minor areas for code simplification and a small bug in the public API exports.
|
|
||
|
|
||
| class AuthInterceptor(ClientCallInterceptor): | ||
| class AuthInterceptor: |
There was a problem hiding this comment.
The AuthInterceptor class has been modified, removing its inheritance from ClientCallInterceptor (line 11) and not updating it to the new before and after methods required by the refactored interceptor architecture in BaseClient. This oversight breaks the primary authentication mechanism for the client SDK, as any attempt to use it with the new BaseClient will result in an AttributeError. To resolve this, AuthInterceptor must be updated to implement the new ClientCallInterceptor interface from a2a.client.interceptors. This involves implementing async def before(self, args: UnionBeforeArgs) -> None: to adapt the existing authentication logic to modify args.context.service_parameters and a pass-through async def after(self, args: UnionAfterArgs) -> None: method.
from a2a.client.interceptors import ClientCallInterceptor, UnionBeforeArgs, UnionAfterArgs
class AuthInterceptor(ClientCallInterceptor):
# ... __init__ is the same ...
async def before(self, args: UnionBeforeArgs) -> None:
agent_card = args.agent_card
context = args.context
# ... existing logic to get credentials ...
# Create context and service_parameters if they don't exist
if context is None:
context = ClientCallContext()
args.context = context
if context.service_parameters is None:
context.service_parameters = {}
# Add headers to service_parameters
# e.g., context.service_parameters['Authorization'] = f'Bearer {credential}'
# ... rest of the logic ...
async def after(self, args: UnionAfterArgs) -> None:
pass
src/a2a/client/base_client.py
Outdated
| ) | ||
| yield after_args.result.value | ||
| return | ||
|
|
There was a problem hiding this comment.
In the send_message method, when an interceptor triggers an early return for a streaming request, the code yields after_args.result.value (line 137). However, for streaming requests, the interceptor's expected result type is StreamResponse, while the send_message method is expected to yield ClientEvent (a tuple of StreamResponse and Task). Yielding a single StreamResponse instead of a tuple will cause a TypeError or unexpected behavior in the calling code that consumes the iterator, potentially leading to a denial of service for applications using the SDK.
There was a problem hiding this comment.
Code Review
This pull request refactors the client-side interceptor (formerly middleware) system. It introduces a new interceptors.py module defining a more granular ClientCallInterceptor interface with before and after methods, replacing the single intercept method. The ClientCallContext class is moved to client.py. The BaseClient is updated to centralize interceptor execution logic, including handling early returns for both streaming and non-streaming calls, and all transport classes are modified to no longer directly manage interceptors. Review comments highlight a typo in the __all__ export list, suggest refactoring duplicated interceptor logic in streaming methods, and point out redundant checks in interceptor handling.
src/a2a/client/base_client.py
Outdated
| before_args: BeforeArgs[ | ||
| Literal['send_message_streaming'], | ||
| SendMessageRequest, | ||
| StreamResponse, | ||
| ] = BeforeArgs( | ||
| input=ClientCallInput( | ||
| method='send_message_streaming', value=request | ||
| ), | ||
| agent_card=self._card, | ||
| context=context, | ||
| ) | ||
| before_result = await self._intercept_before(before_args) | ||
|
|
||
| if before_result is not None: | ||
| after_args = AfterArgs( | ||
| result=ClientCallResult( | ||
| method=before_args.input.method, | ||
| value=before_result['early_return'].value, | ||
| ), | ||
| agent_card=self._card, | ||
| context=before_args.context, | ||
| ) | ||
| await self._intercept_after( | ||
| cast('UnionAfterArgs', after_args), before_result['executed'] | ||
| ) | ||
| yield after_args.result.value | ||
| return |
There was a problem hiding this comment.
This block of code for handling before interceptors and early returns is nearly identical to the block in the subscribe method (lines 390-412). This duplication makes the code harder to maintain and prone to inconsistencies if one block is updated but the other is not.
To improve maintainability, consider refactoring this logic into a private helper method or an async context manager. This would centralize the streaming interception logic, similar to how _execute_with_interceptors centralizes it for non-streaming calls.
…plify default list assignments in the `Client` constructor.
…xecute_stream_with_interceptors` and add `_format_stream_event`.
Description
This PR refactors the client interceptors architecture, centralizing their execution within the BaseClient rather than delegating them to the underlying transport implementations. It also introduces strict generic type definitions to strongly type the inputs and results for interceptor hooks across all supported client methods.