Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/apify/storage_clients/_apify/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class RequestQueueStats(BaseModel):
"""The number of request queue reads."""

storage_bytes: Annotated[int, Field(alias='storageBytes', default=0)]
"""Storage size in Bytes."""
"""Storage size in bytes."""

write_count: Annotated[int, Field(alias='writeCount', default=0)]
"""The number of request queue writes."""
Expand Down
39 changes: 19 additions & 20 deletions src/apify/storage_clients/_apify/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class ApifyStorageClient(StorageClient):
should be used when multiple consumers need to process requests from the same queue simultaneously.
"""

_LSP_ERROR_MSG = 'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.'
"""This class (intentionally) violates the Liskov Substitution Principle.

It requires a specialized `Configuration` instance compared to its parent class.
"""

def __init__(self, *, request_queue_access: Literal['single', 'shared'] = 'single') -> None:
"""Initialize a new instance.

Expand All @@ -68,23 +74,6 @@ def __init__(self, *, request_queue_access: Literal['single', 'shared'] = 'singl
"""
self._request_queue_access = request_queue_access

# This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent.
_lsp_violation_error_message_template = (
'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.'
)

@override
def get_storage_client_cache_key(self, configuration: CrawleeConfiguration) -> Hashable:
if isinstance(configuration, ApifyConfiguration):
# It is not supported to open exactly same queue with 'single' and 'shared' client at the same time.
# Whichever client variation gets used first, wins.
return super().get_storage_client_cache_key(configuration), hash_api_base_url_and_token(configuration)

config_class = type(configuration)
raise TypeError(
self._lsp_violation_error_message_template.format(f'{config_class.__module__}.{config_class.__name__}')
)

@override
async def create_dataset_client(
self,
Expand All @@ -98,7 +87,7 @@ async def create_dataset_client(
if isinstance(configuration, ApifyConfiguration):
return await ApifyDatasetClient.open(id=id, name=name, alias=alias, configuration=configuration)

raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
raise TypeError(self._LSP_ERROR_MSG.format(type(configuration).__name__))

@override
async def create_kvs_client(
Expand All @@ -113,7 +102,7 @@ async def create_kvs_client(
if isinstance(configuration, ApifyConfiguration):
return await ApifyKeyValueStoreClient.open(id=id, name=name, alias=alias, configuration=configuration)

raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
raise TypeError(self._LSP_ERROR_MSG.format(type(configuration).__name__))

@override
async def create_rq_client(
Expand All @@ -130,4 +119,14 @@ async def create_rq_client(
id=id, name=name, alias=alias, configuration=configuration, access=self._request_queue_access
)

raise TypeError(self._lsp_violation_error_message_template.format(type(configuration).__name__))
raise TypeError(self._LSP_ERROR_MSG.format(type(configuration).__name__))

@override
def get_storage_client_cache_key(self, configuration: CrawleeConfiguration) -> Hashable:
if isinstance(configuration, ApifyConfiguration):
# It is not supported to open exactly same queue with 'single' and 'shared' client at the same time.
# Whichever client variation gets used first, wins.
return super().get_storage_client_cache_key(configuration), hash_api_base_url_and_token(configuration)

config_class = type(configuration)
raise TypeError(self._LSP_ERROR_MSG.format(f'{config_class.__module__}.{config_class.__name__}'))
49 changes: 24 additions & 25 deletions src/apify/storage_clients/_smart_apify/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

from apify._configuration import Configuration as ApifyConfiguration
from apify._utils import docs_group
from apify.storage_clients import ApifyStorageClient
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
from apify.storage_clients import ApifyStorageClient, FileSystemStorageClient

if TYPE_CHECKING:
from collections.abc import Hashable
Expand All @@ -36,7 +35,7 @@ class SmartApifyStorageClient(StorageClient):
def __init__(
self,
*,
cloud_storage_client: ApifyStorageClient | None = None,
cloud_storage_client: StorageClient | None = None,
local_storage_client: StorageClient | None = None,
) -> None:
"""Initialize a new instance.
Expand All @@ -47,35 +46,15 @@ def __init__(
local_storage_client: Storage client used when an Actor is not running on the Apify platform and when
`force_cloud` flag is not set. Defaults to `FileSystemStorageClient`.
"""
self._cloud_storage_client = cloud_storage_client or ApifyStorageClient(request_queue_access='single')
self._local_storage_client = local_storage_client or ApifyFileSystemStorageClient()
self._cloud_storage_client = cloud_storage_client or ApifyStorageClient()
self._local_storage_client = local_storage_client or FileSystemStorageClient()

def __str__(self) -> str:
return (
f'{self.__class__.__name__}(cloud_storage_client={self._cloud_storage_client.__class__.__name__},'
f' local_storage_client={self._local_storage_client.__class__.__name__})'
)

def get_suitable_storage_client(self, *, force_cloud: bool = False) -> StorageClient:
"""Get a suitable storage client based on the global configuration and the value of the force_cloud flag.

Args:
force_cloud: If True, return `cloud_storage_client`.
"""
if ApifyConfiguration.get_global_configuration().is_at_home:
return self._cloud_storage_client

configuration = ApifyConfiguration.get_global_configuration()
if force_cloud:
if configuration.token is None:
raise RuntimeError(
'In order to use the Apify cloud storage from your computer, '
'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
)
return self._cloud_storage_client

return self._local_storage_client

@override
def get_storage_client_cache_key(self, configuration: CrawleeConfiguration) -> Hashable:
if ApifyConfiguration.get_global_configuration().is_at_home:
Expand Down Expand Up @@ -123,3 +102,23 @@ async def create_rq_client(
return await self.get_suitable_storage_client().create_rq_client(
id=id, name=id, alias=alias, configuration=configuration
)

def get_suitable_storage_client(self, *, force_cloud: bool = False) -> StorageClient:
"""Get a suitable storage client based on the global configuration and the value of the force_cloud flag.

Args:
force_cloud: If True, return `cloud_storage_client`.
"""
if ApifyConfiguration.get_global_configuration().is_at_home:
return self._cloud_storage_client

configuration = ApifyConfiguration.get_global_configuration()
if force_cloud:
if configuration.token is None:
raise RuntimeError(
'In order to use the Apify cloud storage from your computer, '
'you need to provide an Apify token using the APIFY_TOKEN environment variable.'
)
return self._cloud_storage_client

return self._local_storage_client