X Tutup
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions src/apify_client/_resource_clients/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@
RunOrigin,
RunResponse,
UpdateActorRequest,
WebhookCreate,
)
from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync
from apify_client._utils import (
encode_key_value_store_record_value,
encode_webhook_list_to_base64,
response_to_dict,
to_seconds,
)
from apify_client._types import WebhookRepresentationList
from apify_client._utils import encode_key_value_store_record_value, response_to_dict, to_seconds

if TYPE_CHECKING:
from datetime import timedelta
Expand Down Expand Up @@ -232,7 +229,7 @@ def start(
run_timeout: timedelta | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
timeout: Timeout = 'long',
) -> Run:
"""Start the Actor and immediately return the Run object.
Expand Down Expand Up @@ -271,6 +268,10 @@ def start(
"""
run_input, content_type = encode_key_value_store_record_value(run_input, content_type)

validated_webhooks = (
[WebhookCreate.model_validate(w) if isinstance(w, dict) else w for w in webhooks] if webhooks else []
)
Comment on lines +271 to +273
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

webhooks docstring describes ad-hoc webhook dicts with only event_types, request_url, and optional templates, but dict inputs are validated as WebhookCreate, which requires condition and other fields. This is a backward-incompatible tightening and will reject previously valid inputs. Consider validating dict inputs against the minimal ad-hoc webhook shape (e.g., WebhookRepresentation) or making the conversion logic accept the minimal dict and only validate/serialize the fields actually sent to the API.

Copilot uses AI. Check for mistakes.

request_params = self._build_params(
build=build,
maxItems=max_items,
Expand All @@ -280,7 +281,7 @@ def start(
timeout=to_seconds(run_timeout, as_int=True),
waitForFinish=wait_for_finish,
forcePermissionLevel=force_permission_level.value if force_permission_level is not None else None,
webhooks=encode_webhook_list_to_base64(webhooks) if webhooks is not None else None,
webhooks=WebhookRepresentationList.from_webhooks(validated_webhooks).to_base64(),
)

response = self._http_client.call(
Expand All @@ -306,7 +307,7 @@ def call(
restart_on_error: bool | None = None,
memory_mbytes: int | None = None,
run_timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_duration: timedelta | None = None,
logger: Logger | None | Literal['default'] = 'default',
Expand Down Expand Up @@ -728,7 +729,7 @@ async def start(
run_timeout: timedelta | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_for_finish: int | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
timeout: Timeout = 'long',
) -> Run:
"""Start the Actor and immediately return the Run object.
Expand Down Expand Up @@ -767,6 +768,10 @@ async def start(
"""
run_input, content_type = encode_key_value_store_record_value(run_input, content_type)

validated_webhooks = (
[WebhookCreate.model_validate(w) if isinstance(w, dict) else w for w in webhooks] if webhooks else []
)

request_params = self._build_params(
build=build,
maxItems=max_items,
Expand All @@ -776,7 +781,7 @@ async def start(
timeout=to_seconds(run_timeout, as_int=True),
waitForFinish=wait_for_finish,
forcePermissionLevel=force_permission_level.value if force_permission_level is not None else None,
webhooks=encode_webhook_list_to_base64(webhooks) if webhooks is not None else None,
webhooks=WebhookRepresentationList.from_webhooks(validated_webhooks).to_base64(),
)

response = await self._http_client.call(
Expand All @@ -802,7 +807,7 @@ async def call(
restart_on_error: bool | None = None,
memory_mbytes: int | None = None,
run_timeout: timedelta | None = None,
webhooks: list[dict] | None = None,
webhooks: list[dict | WebhookCreate] | None = None,
force_permission_level: ActorPermissionLevel | None = None,
wait_duration: timedelta | None = None,
logger: Logger | None | Literal['default'] = 'default',
Expand Down
73 changes: 53 additions & 20 deletions src/apify_client/_resource_clients/request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
UnlockRequestsResult,
)
from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync
from apify_client._types import RequestDeleteInput, RequestInput
from apify_client._utils import catch_not_found_or_throw, response_to_dict, to_seconds
from apify_client.errors import ApifyApiError

Expand Down Expand Up @@ -189,7 +190,7 @@ def list_and_lock_head(

def add_request(
self,
request: dict,
request: dict | RequestInput,
*,
forefront: bool | None = None,
timeout: Timeout = 'short',
Expand All @@ -206,12 +207,15 @@ def add_request(
Returns:
The added request.
"""
if isinstance(request, dict):
request = RequestInput.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url('requests'),
method='POST',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=timeout,
)
Expand Down Expand Up @@ -248,7 +252,7 @@ def get_request(self, request_id: str, *, timeout: Timeout = 'short') -> Request

def update_request(
self,
request: dict,
request: dict | Request,
*,
forefront: bool | None = None,
timeout: Timeout = 'medium',
Expand All @@ -265,14 +269,15 @@ def update_request(
Returns:
The updated request.
"""
request_id = request['id']
if isinstance(request, dict):
request = Request.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url(f'requests/{request_id}'),
url=self._build_url(f'requests/{request.id}'),
method='PUT',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=timeout,
)
Expand Down Expand Up @@ -361,7 +366,7 @@ def delete_request_lock(

def batch_add_requests(
self,
requests: list[dict],
requests: list[dict | RequestInput],
*,
forefront: bool = False,
max_parallel: int = 1,
Expand Down Expand Up @@ -396,14 +401,19 @@ def batch_add_requests(
if max_parallel != 1:
raise NotImplementedError('max_parallel is only supported in async client')

requests_as_dicts = [
(RequestInput.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key, forefront=forefront)

# Compute the payload size limit to ensure it doesn't exceed the maximum allowed size.
payload_size_limit_bytes = _MAX_PAYLOAD_SIZE_BYTES - math.ceil(_MAX_PAYLOAD_SIZE_BYTES * _SAFETY_BUFFER_PERCENT)

# Split the requests into batches, constrained by the max payload size and max requests per batch.
batches = constrained_batches(
requests,
requests_as_dicts,
max_size=payload_size_limit_bytes,
max_count=_RQ_MAX_REQUESTS_PER_BATCH,
)
Expand Down Expand Up @@ -444,7 +454,7 @@ def batch_add_requests(

def batch_delete_requests(
self,
requests: list[dict],
requests: list[dict | RequestDeleteInput],
*,
timeout: Timeout = 'short',
) -> BatchDeleteResult:
Expand All @@ -456,13 +466,20 @@ def batch_delete_requests(
requests: List of the requests to delete.
timeout: Timeout for the API HTTP request.
"""
requests_as_dicts = [
(RequestDeleteInput.model_validate(r) if isinstance(r, dict) else r).model_dump(
by_alias=True, exclude_none=True
)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key)

response = self._http_client.call(
url=self._build_url('requests/batch'),
method='DELETE',
params=request_params,
json=requests,
json=requests_as_dicts,
timeout=timeout,
)

Expand Down Expand Up @@ -658,7 +675,7 @@ async def list_and_lock_head(

async def add_request(
self,
request: dict,
request: dict | RequestInput,
*,
forefront: bool | None = None,
timeout: Timeout = 'short',
Expand All @@ -675,12 +692,15 @@ async def add_request(
Returns:
The added request.
"""
if isinstance(request, dict):
request = RequestInput.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url('requests'),
method='POST',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=timeout,
)
Expand Down Expand Up @@ -715,7 +735,7 @@ async def get_request(self, request_id: str, *, timeout: Timeout = 'short') -> R

async def update_request(
self,
request: dict,
request: dict | Request,
*,
forefront: bool | None = None,
timeout: Timeout = 'medium',
Expand All @@ -732,14 +752,15 @@ async def update_request(
Returns:
The updated request.
"""
request_id = request['id']
if isinstance(request, dict):
request = Request.model_validate(request)

request_params = self._build_params(forefront=forefront, clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url(f'requests/{request_id}'),
url=self._build_url(f'requests/{request.id}'),
method='PUT',
json=request,
json=request.model_dump(by_alias=True, exclude_none=True),
params=request_params,
timeout=timeout,
)
Expand Down Expand Up @@ -874,7 +895,7 @@ async def _batch_add_requests_worker(

async def batch_add_requests(
self,
requests: list[dict],
requests: list[dict | RequestInput],
*,
forefront: bool = False,
max_parallel: int = 5,
Expand Down Expand Up @@ -906,6 +927,11 @@ async def batch_add_requests(
if min_delay_between_unprocessed_requests_retries:
logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.')

requests_as_dicts = [
(RequestInput.model_validate(r) if isinstance(r, dict) else r).model_dump(by_alias=True, exclude_none=True)
for r in requests
]

asyncio_queue: asyncio.Queue[Iterable[dict]] = asyncio.Queue()
request_params = self._build_params(clientKey=self.client_key, forefront=forefront)

Expand All @@ -914,7 +940,7 @@ async def batch_add_requests(

# Split the requests into batches, constrained by the max payload size and max requests per batch.
batches = constrained_batches(
requests,
requests_as_dicts,
max_size=payload_size_limit_bytes,
max_count=_RQ_MAX_REQUESTS_PER_BATCH,
)
Expand Down Expand Up @@ -959,7 +985,7 @@ async def batch_add_requests(

async def batch_delete_requests(
self,
requests: list[dict],
requests: list[dict | RequestDeleteInput],
*,
timeout: Timeout = 'short',
) -> BatchDeleteResult:
Expand All @@ -971,13 +997,20 @@ async def batch_delete_requests(
requests: List of the requests to delete.
timeout: Timeout for the API HTTP request.
"""
requests_as_dicts = [
(RequestDeleteInput.model_validate(r) if isinstance(r, dict) else r).model_dump(
by_alias=True, exclude_none=True
)
for r in requests
]

request_params = self._build_params(clientKey=self.client_key)

response = await self._http_client.call(
url=self._build_url('requests/batch'),
method='DELETE',
params=request_params,
json=requests,
json=requests_as_dicts,
timeout=timeout,
)
result = response_to_dict(response)
Expand Down
7 changes: 1 addition & 6 deletions src/apify_client/_resource_clients/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@
from apify_client._resource_clients._resource_client import ResourceClient, ResourceClientAsync
from apify_client._status_message_watcher import StatusMessageWatcher, StatusMessageWatcherAsync
from apify_client._streamed_log import StreamedLog, StreamedLogAsync
from apify_client._utils import (
encode_key_value_store_record_value,
response_to_dict,
to_safe_id,
to_seconds,
)
from apify_client._utils import encode_key_value_store_record_value, response_to_dict, to_safe_id, to_seconds

if TYPE_CHECKING:
import logging
Expand Down
Loading
X Tutup