Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ keywords = [
"scraping",
]
dependencies = [
"apify-client>=2.3.0,<3.0.0",
"apify-client @ git+https://github.com/apify/apify-client-python.git@typed-clients",
"apify-shared>=2.0.0,<3.0.0",
"crawlee>=1.0.4,<2.0.0",
"cachetools>=5.5.0",
Expand Down
6 changes: 3 additions & 3 deletions src/apify/storage_clients/_apify/_alias_resolving.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from collections.abc import Callable
from types import TracebackType

from apify_client.clients import (
from apify_client._resource_clients import (
DatasetClientAsync,
DatasetCollectionClientAsync,
KeyValueStoreClientAsync,
Expand Down Expand Up @@ -105,8 +105,8 @@ async def open_by_alias(
# Create new unnamed storage and store alias mapping
raw_metadata = await collection_client.get_or_create()

await alias_resolver.store_mapping(storage_id=raw_metadata['id'])
return get_resource_client_by_id(raw_metadata['id'])
await alias_resolver.store_mapping(storage_id=raw_metadata.id)
return get_resource_client_by_id(raw_metadata.id)


class AliasResolver:
Expand Down
6 changes: 3 additions & 3 deletions src/apify/storage_clients/_apify/_api_client_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from apify.storage_clients._apify._alias_resolving import open_by_alias

if TYPE_CHECKING:
from apify_client.clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync
from apify_client._resource_clients import DatasetClientAsync, KeyValueStoreClientAsync, RequestQueueClientAsync

from apify._configuration import Configuration

Expand Down Expand Up @@ -137,13 +137,13 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync:
# Default storage does not exist. Create a new one.
if not raw_metadata:
raw_metadata = await collection_client.get_or_create()
resource_client = get_resource_client(raw_metadata['id'])
resource_client = get_resource_client(raw_metadata.id)
return resource_client

# Open by name.
case (None, str(), None, _):
raw_metadata = await collection_client.get_or_create(name=name)
return get_resource_client(raw_metadata['id'])
return get_resource_client(raw_metadata.id)

# Open by ID.
case (None, None, str(), _):
Expand Down
2 changes: 1 addition & 1 deletion src/apify/storage_clients/_apify/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import DatasetClientAsync
from apify_client._resource_clients import DatasetClientAsync
from crawlee._types import JsonSerializable

from apify import Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator

from apify_client.clients import KeyValueStoreClientAsync
from apify_client._resource_clients import KeyValueStoreClientAsync

from apify import Configuration

Expand Down
35 changes: 22 additions & 13 deletions src/apify/storage_clients/_apify/_request_queue_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime
from logging import getLogger
from typing import TYPE_CHECKING, Final, Literal

Expand All @@ -15,7 +16,7 @@
if TYPE_CHECKING:
from collections.abc import Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client._resource_clients import RequestQueueClientAsync
from crawlee import Request
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata

Expand Down Expand Up @@ -82,21 +83,29 @@ async def get_metadata(self) -> ApifyRequestQueueMetadata:
if response is None:
raise ValueError('Failed to fetch request queue metadata from the API.')

total_request_count = int(response.total_request_count)
handled_request_count = int(response.handled_request_count)
pending_request_count = int(response.pending_request_count)
created_at = datetime.fromisoformat(response.created_at.replace('Z', '+00:00'))
modified_at = datetime.fromisoformat(response.modified_at.replace('Z', '+00:00'))
accessed_at = datetime.fromisoformat(response.accessed_at.replace('Z', '+00:00'))

# Enhance API response with local estimations to account for propagation delays (API data can be delayed
# by a few seconds, while local estimates are immediately accurate).
return ApifyRequestQueueMetadata(
id=response['id'],
name=response['name'],
total_request_count=max(response['totalRequestCount'], self._implementation.metadata.total_request_count),
handled_request_count=max(
response['handledRequestCount'], self._implementation.metadata.handled_request_count
id=response.id,
name=response.name,
total_request_count=max(total_request_count, self._implementation.metadata.total_request_count),
handled_request_count=max(handled_request_count, self._implementation.metadata.handled_request_count),
pending_request_count=pending_request_count,
created_at=min(created_at, self._implementation.metadata.created_at),
modified_at=max(modified_at, self._implementation.metadata.modified_at),
accessed_at=max(accessed_at, self._implementation.metadata.accessed_at),
had_multiple_clients=response.had_multiple_clients or self._implementation.metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(
response.stats.model_dump(by_alias=True) if response.stats else {},
by_alias=True,
),
pending_request_count=response['pendingRequestCount'],
created_at=min(response['createdAt'], self._implementation.metadata.created_at),
modified_at=max(response['modifiedAt'], self._implementation.metadata.modified_at),
accessed_at=max(response['accessedAt'], self._implementation.metadata.accessed_at),
had_multiple_clients=response['hadMultipleClients'] or self._implementation.metadata.had_multiple_clients,
stats=RequestQueueStats.model_validate(response['stats'], by_alias=True),
)

@classmethod
Expand Down Expand Up @@ -145,7 +154,7 @@ async def open(
raw_metadata = await api_client.get()
if raw_metadata is None:
raise ValueError('Failed to retrieve request queue metadata from the API.')
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata)
metadata = ApifyRequestQueueMetadata.model_validate(raw_metadata.model_dump(by_alias=True))

return cls(
api_client=api_client,
Expand Down
16 changes: 8 additions & 8 deletions src/apify/storage_clients/_apify/_request_queue_shared_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client._resource_clients import RequestQueueClientAsync

logger = getLogger(__name__)

Expand Down Expand Up @@ -388,7 +388,7 @@ async def _update_request(
)

return ProcessedRequest.model_validate(
{'uniqueKey': request.unique_key} | response,
{'uniqueKey': request.unique_key} | response.model_dump(by_alias=True),
)

async def _list_head(
Expand Down Expand Up @@ -431,19 +431,19 @@ async def _list_head(
self._should_check_for_forefront_requests = False

# Otherwise fetch from API
response = await self._api_client.list_and_lock_head(
list_and_lost_data = await self._api_client.list_and_lock_head(
lock_secs=int(self._DEFAULT_LOCK_TIME.total_seconds()),
limit=limit,
)

# Update the queue head cache
self._queue_has_locked_requests = response.get('queueHasLockedRequests', False)
self._queue_has_locked_requests = list_and_lost_data.queue_has_locked_requests
# Check if there is another client working with the RequestQueue
self.metadata.had_multiple_clients = response.get('hadMultipleClients', False)
self.metadata.had_multiple_clients = list_and_lost_data.had_multiple_clients

for request_data in response.get('items', []):
for request_data in list_and_lost_data.items:
request = Request.model_validate(request_data)
request_id = request_data.get('id')
request_id = request_data.id

# Skip requests without ID or unique key
if not request.unique_key or not request_id:
Expand Down Expand Up @@ -473,7 +473,7 @@ async def _list_head(
# After adding new requests to the forefront, any existing leftover locked request is kept in the end.
self._queue_head.append(leftover_id)

return RequestQueueHead.model_validate(response)
return RequestQueueHead.model_validate(list_and_lost_data)

def _cache_request(
self,
Expand Down
17 changes: 9 additions & 8 deletions src/apify/storage_clients/_apify/_request_queue_single_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
if TYPE_CHECKING:
from collections.abc import Sequence

from apify_client.clients import RequestQueueClientAsync
from apify_client._resource_clients import RequestQueueClientAsync

logger = getLogger(__name__)

Expand Down Expand Up @@ -288,16 +288,17 @@ async def _list_head(self) -> None:

# Update metadata
# Check if there is another client working with the RequestQueue
self.metadata.had_multiple_clients = response.get('hadMultipleClients', False)
self.metadata.had_multiple_clients = response.had_multiple_clients
# Should warn once? This might be outside expected context if the other consumers consumes at the same time

if modified_at := response.get('queueModifiedAt'):
if response.queue_modified_at:
modified_at = datetime.fromisoformat(response.queue_modified_at.replace('Z', '+00:00'))
self.metadata.modified_at = max(self.metadata.modified_at, modified_at)

# Update the cached data
for request_data in response.get('items', []):
for request_data in response.items:
request = Request.model_validate(request_data)
request_id = request_data['id']
request_id = request_data.id

if request_id in self._requests_in_progress:
# Ignore requests that are already in progress, we will not process them again.
Expand Down Expand Up @@ -365,7 +366,7 @@ async def _update_request(
)

return ProcessedRequest.model_validate(
{'uniqueKey': request.unique_key} | response,
{'uniqueKey': request.unique_key} | response.model_dump(by_alias=True),
)

async def _init_caches(self) -> None:
Expand All @@ -378,9 +379,9 @@ async def _init_caches(self) -> None:
Local deduplication is cheaper, it takes 1 API call for whole cache and 1 read operation per request.
"""
response = await self._api_client.list_requests(limit=10_000)
for request_data in response.get('items', []):
for request_data in response.items:
request = Request.model_validate(request_data)
request_id = request_data['id']
request_id = request_data.id

if request.was_already_handled:
# Cache just id for deduplication
Expand Down
21 changes: 8 additions & 13 deletions tests/integration/actor/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from collections.abc import Awaitable, Callable, Coroutine, Iterator, Mapping
from decimal import Decimal

from apify_client.clients.resource_clients import ActorClientAsync
from apify_client._resource_clients import ActorClientAsync

_TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
_API_URL_ENV_VAR = 'APIFY_INTEGRATION_TESTS_API_URL'
Expand Down Expand Up @@ -236,19 +236,19 @@ async def _make_actor(
],
)

actor_client = client.actor(created_actor['id'])
actor_client = client.actor(created_actor.id)

print(f'Building Actor {actor_name}...')
build_result = await actor_client.build(version_number='0.0')
build_client = client.build(build_result['id'])
build_client = client.build(build_result.id)
build_client_result = await build_client.wait_for_finish(wait_secs=600)

assert build_client_result is not None
assert build_client_result['status'] == ActorJobStatus.SUCCEEDED
assert build_client_result.status == ActorJobStatus.SUCCEEDED

# We only mark the client for cleanup if the build succeeded, so that if something goes wrong here,
# you have a chance to check the error.
actors_for_cleanup.append(created_actor['id'])
actors_for_cleanup.append(created_actor.id)

return actor_client

Expand All @@ -259,14 +259,9 @@ async def _make_actor(
actor_client = ApifyClient(token=apify_token, api_url=os.getenv(_API_URL_ENV_VAR)).actor(actor_id)

if (actor := actor_client.get()) is not None:
actor_client.update(
pricing_infos=[
*actor.get('pricingInfos', []),
{
'pricingModel': 'FREE',
},
]
)
assert actor.pricing_infos is not None
new_pricing_infos = [*actor.pricing_infos, {'pricingModel': 'FREE'}]
actor_client.update(pricing_infos=new_pricing_infos)

actor_client.delete()

Expand Down
13 changes: 7 additions & 6 deletions tests/integration/actor/test_actor_api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def main_outer() -> None:

inner_run_status = await Actor.apify_client.actor(inner_actor_id).last_run().get()
assert inner_run_status is not None
assert inner_run_status.get('status') in ['READY', 'RUNNING']
assert inner_run_status.status in ['READY', 'RUNNING']

inner_actor = await make_actor(label='start-inner', main_func=main_inner)
outer_actor = await make_actor(label='start-outer', main_func=main_outer)
Expand Down Expand Up @@ -175,7 +175,7 @@ async def main_outer() -> None:

inner_run_status = await Actor.apify_client.actor(inner_actor_id).last_run().get()
assert inner_run_status is not None
assert inner_run_status.get('status') == 'SUCCEEDED'
assert inner_run_status.status == 'SUCCEEDED'

inner_actor = await make_actor(label='call-inner', main_func=main_inner)
outer_actor = await make_actor(label='call-outer', main_func=main_outer)
Expand Down Expand Up @@ -221,7 +221,7 @@ async def main_outer() -> None:

inner_run_status = await Actor.apify_client.task(inner_task_id).last_run().get()
assert inner_run_status is not None
assert inner_run_status.get('status') == 'SUCCEEDED'
assert inner_run_status.status == 'SUCCEEDED'

inner_actor = await make_actor(label='call-task-inner', main_func=main_inner)
outer_actor = await make_actor(label='call-task-outer', main_func=main_outer)
Expand All @@ -237,7 +237,7 @@ async def main_outer() -> None:

run_result_outer = await run_actor(
outer_actor,
run_input={'test_value': test_value, 'inner_task_id': task['id']},
run_input={'test_value': test_value, 'inner_task_id': task.id},
)

assert run_result_outer.status == 'SUCCEEDED'
Expand All @@ -248,7 +248,7 @@ async def main_outer() -> None:
assert inner_output_record is not None
assert inner_output_record['value'] == f'{test_value}_XXX_{test_value}'

await apify_client_async.task(task['id']).delete()
await apify_client_async.task(task.id).delete()


@pytest.mark.skip(reason='Requires Actor permissions beyond limited permissions, see #715.')
Expand All @@ -274,7 +274,8 @@ async def main_outer() -> None:
inner_actor = await make_actor(label='abort-inner', main_func=main_inner)
outer_actor = await make_actor(label='abort-outer', main_func=main_outer)

inner_run_id = (await inner_actor.start())['id']
actor_run = await inner_actor.start()
inner_run_id = actor_run.id

run_result_outer = await run_actor(
outer_actor,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/actor/test_actor_charge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections.abc import Iterable

from apify_client import ApifyClientAsync
from apify_client.clients import ActorClientAsync
from apify_client._resource_clients import ActorClientAsync

from .conftest import MakeActorFunction, RunActorFunction

Expand Down Expand Up @@ -54,7 +54,7 @@ async def main() -> None:
actor = await actor_client.get()

assert actor is not None
return str(actor['id'])
return str(actor.id)


@pytest_asyncio.fixture(scope='function', loop_scope='module')
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/actor/test_actor_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def test_force_cloud(
try:
dataset_details = await dataset_client.get()
assert dataset_details is not None
assert dataset_details.get('name') == dataset_name
assert dataset_details.name == dataset_name

dataset_items = await dataset_client.list_items()
assert dataset_items.items == [dataset_item]
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/actor/test_actor_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def test_force_cloud(
try:
key_value_store_details = await key_value_store_client.get()
assert key_value_store_details is not None
assert key_value_store_details.get('name') == key_value_store_name
assert key_value_store_details.name == key_value_store_name

key_value_store_record = await key_value_store_client.get_record('foo')
assert key_value_store_record is not None
Expand Down Expand Up @@ -141,7 +141,7 @@ async def main_get() -> None:
default_kvs_info = await actor_set.last_run().key_value_store().get()
assert default_kvs_info is not None

run_result_get = await run_actor(actor_get, run_input={'kvs-id': default_kvs_info['id']})
run_result_get = await run_actor(actor_get, run_input={'kvs-id': default_kvs_info.id})

assert run_result_get.status == 'SUCCEEDED'

Expand Down
Loading
Loading