From 9e1b2529d698773e0358645b4e3078232f035f54 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 17:37:27 +0200 Subject: [PATCH 1/6] refactor: Remove dead code and tidy storage-client internals --- pyproject.toml | 2 +- src/apify/_actor.py | 1 - src/apify/_consts.py | 3 -- src/apify/_crypto.py | 2 - src/apify/_proxy_configuration.py | 20 ---------- src/apify/_utils.py | 8 ---- src/apify/events/py.typed | 0 src/apify/request_loaders/py.typed | 0 src/apify/scrapy/_logging_config.py | 2 +- src/apify/scrapy/middlewares/py.typed | 0 src/apify/scrapy/pipelines/py.typed | 0 src/apify/scrapy/py.typed | 0 .../_apify/_alias_resolving.py | 37 ++++++++++++------- .../storage_clients/_apify/_dataset_client.py | 2 + .../_apify/_request_queue_shared_client.py | 10 +---- .../storage_clients/_apify/_storage_client.py | 6 ++- src/apify/storage_clients/_apify/_utils.py | 2 +- src/apify/storage_clients/_apify/py.typed | 0 src/apify/storage_clients/py.typed | 0 src/apify/storages/py.typed | 0 tests/unit/conftest.py | 1 + .../storage_clients/test_alias_resolver.py | 1 - tests/unit/test_proxy_configuration.py | 33 +---------------- tests/unit/test_utils.py | 21 +---------- uv.lock | 5 +-- 25 files changed, 39 insertions(+), 117 deletions(-) delete mode 100644 src/apify/events/py.typed delete mode 100644 src/apify/request_loaders/py.typed delete mode 100644 src/apify/scrapy/middlewares/py.typed delete mode 100644 src/apify/scrapy/pipelines/py.typed delete mode 100644 src/apify/scrapy/py.typed delete mode 100644 src/apify/storage_clients/_apify/py.typed delete mode 100644 src/apify/storage_clients/py.typed delete mode 100644 src/apify/storages/py.typed diff --git a/pyproject.toml b/pyproject.toml index bc05d589d..c3add790c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ "Programming Language :: Python :: 3.13", "Programming Language :: Python :: 3.14", "Topic :: Software Development :: Libraries", + "Typing :: Typed", ] keywords = [ "apify", @@ -239,7 +240,6 @@ exclude-newer = "24 hours" [tool.uv.exclude-newer-package] # Allow internal Apify packages to install immediately. apify-client = false -apify-shared = false apify_fingerprint_datapoints = false crawlee = false diff --git a/src/apify/_actor.py b/src/apify/_actor.py index 234232969..f3ecad831 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -186,7 +186,6 @@ async def __aenter__(self) -> Self: # Update the global Actor proxy to refer to this instance. cast('Proxy', Actor).__wrapped__ = self # ty: ignore[invalid-assignment] self._is_exiting = False - self._was_final_persist_state_emitted = False # Initialize the storage client and register it in the service locator. _ = self._storage_client diff --git a/src/apify/_consts.py b/src/apify/_consts.py index 2d2b62acd..7bbee0065 100644 --- a/src/apify/_consts.py +++ b/src/apify/_consts.py @@ -21,9 +21,6 @@ ) """Regex matching encrypted input values with base64-encoded components.""" -EXIT_CODE_SUCCESS = 0 -"""Exit code indicating that the Actor finished successfully.""" - EXIT_CODE_ERROR_USER_FUNCTION_THREW = 91 """Exit code indicating that the Actor's main function raised an exception.""" diff --git a/src/apify/_crypto.py b/src/apify/_crypto.py index c7c2bc160..6065c45cb 100644 --- a/src/apify/_crypto.py +++ b/src/apify/_crypto.py @@ -109,8 +109,6 @@ def private_decrypt( decipher_bytes = decryptor.update(encrypted_data_bytes) + decryptor.finalize() except InvalidTagException as exc: raise ValueError('Decryption failed, malformed encrypted value or password.') from exc - except Exception: - raise return decipher_bytes.decode('utf-8') diff --git a/src/apify/_proxy_configuration.py b/src/apify/_proxy_configuration.py index 023f25e38..fec4d4ddf 100644 --- a/src/apify/_proxy_configuration.py +++ b/src/apify/_proxy_configuration.py @@ -1,12 +1,10 @@ from __future__ import annotations -import ipaddress import json import re from dataclasses import dataclass, field from re import Pattern from typing import TYPE_CHECKING, Any -from urllib.parse import urljoin, urlparse import impit from yarl import URL @@ -38,24 +36,6 @@ SESSION_ID_MAX_LENGTH = 50 -def is_url(url: str) -> bool: - """Check if the given string is a valid URL.""" - try: - parsed_url = urlparse(urljoin(url, '/')) - has_all_parts = all([parsed_url.scheme, parsed_url.netloc, parsed_url.path]) - is_domain = '.' in parsed_url.netloc - is_localhost = parsed_url.netloc == 'localhost' - try: - ipaddress.ip_address(parsed_url.netloc) - is_ip_address = True - except Exception: - is_ip_address = False - - return has_all_parts and any([is_domain, is_localhost, is_ip_address]) - except Exception: - return False - - def _check( value: Any, *, diff --git a/src/apify/_utils.py b/src/apify/_utils.py index ca099f705..8469ae97b 100644 --- a/src/apify/_utils.py +++ b/src/apify/_utils.py @@ -6,7 +6,6 @@ import sys from collections.abc import Callable from contextlib import asynccontextmanager -from enum import Enum from functools import wraps from importlib import metadata from typing import TYPE_CHECKING, Any, Literal, TypeVar, cast @@ -123,13 +122,6 @@ def wrapper(func: T) -> T: return wrapper -def maybe_extract_enum_member_value(maybe_enum_member: Any) -> Any: - """Extract the value of an enumeration member if it is an Enum, otherwise return the original value.""" - if isinstance(maybe_enum_member, Enum): - return maybe_enum_member.value - return maybe_enum_member - - class ReentrantLock: """A reentrant lock implementation for asyncio using asyncio.Lock.""" diff --git a/src/apify/events/py.typed b/src/apify/events/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/request_loaders/py.typed b/src/apify/request_loaders/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/scrapy/_logging_config.py b/src/apify/scrapy/_logging_config.py index 2f51074a9..19d28c3df 100644 --- a/src/apify/scrapy/_logging_config.py +++ b/src/apify/scrapy/_logging_config.py @@ -10,7 +10,7 @@ # Define logger names. _PRIMARY_LOGGERS = ['apify', 'apify_client', 'scrapy'] -_SUPPLEMENTAL_LOGGERS = ['filelock', 'hpack', 'httpcore', 'protego', 'twisted'] +_SUPPLEMENTAL_LOGGERS = ['filelock', 'protego', 'twisted'] _ALL_LOGGERS = _PRIMARY_LOGGERS + _SUPPLEMENTAL_LOGGERS # Mutable state shared with the Scrapy monkey-patch below. `initialize_logging` refreshes diff --git a/src/apify/scrapy/middlewares/py.typed b/src/apify/scrapy/middlewares/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/scrapy/pipelines/py.typed b/src/apify/scrapy/pipelines/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/scrapy/py.typed b/src/apify/scrapy/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index fd4c23fe3..df5c60953 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -8,7 +8,7 @@ from apify_client import ApifyClientAsync -from ._utils import hash_api_base_url_and_token +from ._utils import hash_api_public_base_url_and_token if TYPE_CHECKING: from collections.abc import Callable @@ -135,6 +135,10 @@ class AliasResolver: _alias_init_lock: Lock | None = None """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" + _api_clients: ClassVar[dict[tuple[str | None, str | None], ApifyClientAsync]] = {} + """Cache of Apify API clients keyed by `(token, api_url)`. Reused across instances so that repeated alias + resolution does not create (and leak) a fresh unclosed `ApifyClientAsync` on every call.""" + default_storage_key: ClassVar[str] = '__default__' def __init__( @@ -177,7 +181,7 @@ async def _get_alias_init_lock(cls) -> Lock: async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: """Get the aliases and storage ids mapping from the default kvs. - Mapping is loaded from kvs only once and is shared for all instances of the _AliasResolver class. + Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. Args: configuration: Configuration object to use for accessing the default KVS. @@ -222,12 +226,11 @@ async def store_mapping(self, storage_id: str) -> None: if not self._configuration.is_at_home: logging.getLogger(__name__).debug( - '_AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' + 'AliasResolver storage limited retention is only supported on Apify platform. Storage is not exported.' ) return default_kvs_client = await self._get_default_kvs_client(self._configuration) - await default_kvs_client.get() try: record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) @@ -246,20 +249,28 @@ def _storage_key(self) -> str: [ self._storage_type, self._alias, - hash_api_base_url_and_token(self._configuration), + hash_api_public_base_url_and_token(self._configuration), ] ) - @staticmethod - async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync: - """Get a client for the default key-value store.""" - apify_client_async = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - max_retries=8, - ) + @classmethod + async def _get_default_kvs_client(cls, configuration: Configuration) -> KeyValueStoreClientAsync: + """Get a client for the default key-value store. + The underlying `ApifyClientAsync` is cached per `(token, api_url)` and reused across calls, so repeated + alias resolution does not create (and leak) a fresh unclosed client every time. + """ if not configuration.default_key_value_store_id: raise ValueError("'Configuration.default_key_value_store_id' must be set.") + cache_key = (configuration.token, configuration.api_base_url) + apify_client_async = cls._api_clients.get(cache_key) + if apify_client_async is None: + apify_client_async = ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + max_retries=8, + ) + cls._api_clients[cache_key] = apify_client_async + return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) diff --git a/src/apify/storage_clients/_apify/_dataset_client.py b/src/apify/storage_clients/_apify/_dataset_client.py index 1d3351d8e..de9634fdc 100644 --- a/src/apify/storage_clients/_apify/_dataset_client.py +++ b/src/apify/storage_clients/_apify/_dataset_client.py @@ -144,6 +144,8 @@ async def payloads_generator(items: Sequence[Mapping[str, JsonSerializable]]) -> async with self._charge_lock(), self._lock: items = data if self._is_sequence_of_items(data) else [data] + if not items: + return limit = self._compute_limit_for_push(len(items)) items = items[:limit] diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 496ec4f19..8b55cebb3 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -198,15 +198,7 @@ async def fetch_next_request(self) -> Request | None: ) return None - # Use get request to ensure we have the full request object. - request = await self._get_request_by_id(next_request_id) - if request is None: - logger.debug( - 'Request fetched from the beginning of queue was not found in the RQ', - extra={'next_request_id': next_request_id}, - ) - return None - + # `_get_or_hydrate_request` already returns the fully hydrated request, so no extra fetch is needed. return request async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: diff --git a/src/apify/storage_clients/_apify/_storage_client.py b/src/apify/storage_clients/_apify/_storage_client.py index 02c06258e..b0c569b59 100644 --- a/src/apify/storage_clients/_apify/_storage_client.py +++ b/src/apify/storage_clients/_apify/_storage_client.py @@ -9,7 +9,7 @@ from ._dataset_client import ApifyDatasetClient from ._key_value_store_client import ApifyKeyValueStoreClient from ._request_queue_client import ApifyRequestQueueClient -from ._utils import hash_api_base_url_and_token +from ._utils import hash_api_public_base_url_and_token from apify._configuration import Configuration as ApifyConfiguration from apify._utils import docs_group @@ -126,7 +126,9 @@ def get_storage_client_cache_key(self, configuration: CrawleeConfiguration) -> H if isinstance(configuration, ApifyConfiguration): # It is not supported to open exactly same queue with 'single' and 'shared' client at the same time. # Whichever client variation gets used first, wins. - return super().get_storage_client_cache_key(configuration), hash_api_base_url_and_token(configuration) + return super().get_storage_client_cache_key(configuration), hash_api_public_base_url_and_token( + configuration + ) config_class = type(configuration) raise TypeError(self._LSP_ERROR_MSG.format(f'{config_class.__module__}.{config_class.__name__}')) diff --git a/src/apify/storage_clients/_apify/_utils.py b/src/apify/storage_clients/_apify/_utils.py index c9bcd54d7..92127549b 100644 --- a/src/apify/storage_clients/_apify/_utils.py +++ b/src/apify/storage_clients/_apify/_utils.py @@ -39,7 +39,7 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) -> return url_safe_key[:request_id_length] -def hash_api_base_url_and_token(configuration: Configuration) -> str: +def hash_api_public_base_url_and_token(configuration: Configuration) -> str: """Hash configuration.api_public_base_url and configuration.token in deterministic way.""" if configuration.api_public_base_url is None or configuration.token is None: raise ValueError("'Configuration.api_public_base_url' and 'Configuration.token' must be set.") diff --git a/src/apify/storage_clients/_apify/py.typed b/src/apify/storage_clients/_apify/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/storage_clients/py.typed b/src/apify/storage_clients/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/apify/storages/py.typed b/src/apify/storages/py.typed deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index c289a3dd9..75f1d5152 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -78,6 +78,7 @@ def _prepare_test_env() -> None: AliasResolver._alias_map = {} AliasResolver._alias_map_loaded = False AliasResolver._alias_init_lock = None + AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/unit/storage_clients/test_alias_resolver.py b/tests/unit/storage_clients/test_alias_resolver.py index 707b6a564..2dc3e8ba7 100644 --- a/tests/unit/storage_clients/test_alias_resolver.py +++ b/tests/unit/storage_clients/test_alias_resolver.py @@ -105,7 +105,6 @@ async def test_store_mapping_uses_injected_configuration_is_at_home() -> None: fake_kvs_client = AsyncMock() fake_kvs_client.get_record = AsyncMock(return_value=None) fake_kvs_client.set_record = AsyncMock(return_value=None) - fake_kvs_client.get = AsyncMock(return_value={'id': 'default-kvs-id'}) with patch.object(AliasResolver, '_get_default_kvs_client', return_value=fake_kvs_client): await resolver.store_mapping(storage_id='new-id-789') diff --git a/tests/unit/test_proxy_configuration.py b/tests/unit/test_proxy_configuration.py index 29ac2876c..2d6ed74ce 100644 --- a/tests/unit/test_proxy_configuration.py +++ b/tests/unit/test_proxy_configuration.py @@ -13,7 +13,7 @@ from apify_client import ApifyClientAsync from apify._consts import ApifyEnvVars -from apify._proxy_configuration import ProxyConfiguration, is_url +from apify._proxy_configuration import ProxyConfiguration if TYPE_CHECKING: from pytest_httpserver import HTTPServer @@ -588,37 +588,6 @@ def request_handler(request: Request, response: Response) -> Response: assert call_mock.call_count == 0 -def test_is_url_validation() -> None: - assert is_url('http://dummy-proxy.com:8000') is True - assert is_url('https://example.com') is True - assert is_url('http://localhost') is True - assert is_url('https://12.34.56.78') is True - assert is_url('http://12.34.56.78:9012') is True - assert is_url('http://::1') is True - assert is_url('https://2f45:4da6:8f56:af8c:5dce:c1de:14d2:8661') is True - - assert is_url('dummy-proxy.com:8000') is False - assert is_url('gyfwgfhkjhljkfhdsf') is False - assert is_url('http://') is False - assert is_url('http://example') is False - assert is_url('http:/example.com') is False - assert is_url('12.34.56.78') is False - assert is_url('::1') is False - assert is_url('https://4da6:8f56:af8c:5dce:c1de:14d2:8661') is False - - -@pytest.mark.parametrize( - 'value', - [ - pytest.param('', id='empty_string'), - pytest.param(None, id='none'), - ], -) -def test_is_url_with_completely_unparsable_input(value: str | None) -> None: - """Test is_url with input that causes urlparse to fail.""" - assert is_url(value) is False # ty: ignore[invalid-argument-type] - - def test_check_min_length_raises() -> None: """Test that _check raises ValueError for values shorter than min_length.""" from apify._proxy_configuration import _check diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 526fd5d28..fcc5a4f4d 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,12 +1,11 @@ from __future__ import annotations -from enum import Enum from typing import TYPE_CHECKING from unittest.mock import patch import pytest -from apify._utils import docs_group, docs_name, get_system_info, is_running_in_ipython, maybe_extract_enum_member_value +from apify._utils import docs_group, docs_name, get_system_info, is_running_in_ipython def test_ipython_detection_when_active() -> None: @@ -39,24 +38,6 @@ def test_get_system_info_ipython_flag(*, ipython_running: bool) -> None: assert info['is_running_in_ipython'] is ipython_running -def test_maybe_extract_enum_member_value_with_enum() -> None: - """Test that enum members are unwrapped to their values.""" - - class Color(Enum): - RED = 'red' - BLUE = 'blue' - - assert maybe_extract_enum_member_value(Color.RED) == 'red' - assert maybe_extract_enum_member_value(Color.BLUE) == 'blue' - - -def test_maybe_extract_enum_member_value_with_non_enum() -> None: - """Test that non-enum values are returned unchanged.""" - assert maybe_extract_enum_member_value('hello') == 'hello' - assert maybe_extract_enum_member_value(42) == 42 - assert maybe_extract_enum_member_value(None) is None - - if TYPE_CHECKING: # Regression guard: if `docs_group`/`docs_name` stop being identity-typed (`Callable[[T], T]`), # the decorated classes degrade to `Unknown`, the accesses below stop erroring, and the diff --git a/uv.lock b/uv.lock index 08991e4ed..8816cc39b 100644 --- a/uv.lock +++ b/uv.lock @@ -7,10 +7,9 @@ exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for exclude-newer-span = "PT24H" [options.exclude-newer-package] -apify-fingerprint-datapoints = false -crawlee = false -apify-shared = false apify-client = false +crawlee = false +apify-fingerprint-datapoints = false [[package]] name = "annotated-types" From 752b8e5f62a01b4bcfa6bcbf3cae6ce4e674c59a Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 18:44:20 +0200 Subject: [PATCH 2/6] refactor: Keep nested py.typed markers --- src/apify/events/py.typed | 0 src/apify/request_loaders/py.typed | 0 src/apify/scrapy/middlewares/py.typed | 0 src/apify/scrapy/pipelines/py.typed | 0 src/apify/scrapy/py.typed | 0 src/apify/storage_clients/_apify/py.typed | 0 src/apify/storage_clients/py.typed | 0 src/apify/storages/py.typed | 0 8 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 src/apify/events/py.typed create mode 100644 src/apify/request_loaders/py.typed create mode 100644 src/apify/scrapy/middlewares/py.typed create mode 100644 src/apify/scrapy/pipelines/py.typed create mode 100644 src/apify/scrapy/py.typed create mode 100644 src/apify/storage_clients/_apify/py.typed create mode 100644 src/apify/storage_clients/py.typed create mode 100644 src/apify/storages/py.typed diff --git a/src/apify/events/py.typed b/src/apify/events/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/request_loaders/py.typed b/src/apify/request_loaders/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/scrapy/middlewares/py.typed b/src/apify/scrapy/middlewares/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/scrapy/pipelines/py.typed b/src/apify/scrapy/pipelines/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/scrapy/py.typed b/src/apify/scrapy/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/storage_clients/_apify/py.typed b/src/apify/storage_clients/_apify/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/storage_clients/py.typed b/src/apify/storage_clients/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/src/apify/storages/py.typed b/src/apify/storages/py.typed new file mode 100644 index 000000000..e69de29bb From 631867ce34ff41f12ee5c7e9cf44dd011cc1ef07 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 18:56:22 +0200 Subject: [PATCH 3/6] fix: Restore full-request re-fetch in shared request-queue fetch_next_request --- .../_apify/_request_queue_shared_client.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/apify/storage_clients/_apify/_request_queue_shared_client.py b/src/apify/storage_clients/_apify/_request_queue_shared_client.py index 8b55cebb3..ea4965c41 100644 --- a/src/apify/storage_clients/_apify/_request_queue_shared_client.py +++ b/src/apify/storage_clients/_apify/_request_queue_shared_client.py @@ -198,7 +198,17 @@ async def fetch_next_request(self) -> Request | None: ) return None - # `_get_or_hydrate_request` already returns the fully hydrated request, so no extra fetch is needed. + # `_get_or_hydrate_request` may return a request from the queue-head cache, which is populated by + # `list_and_lock_head` and only holds a partial request (no user data, no headers). Re-fetch it by id to + # guarantee the caller gets the full request object. + request = await self._get_request_by_id(next_request_id) + if request is None: + logger.debug( + 'Request fetched from the beginning of queue was not found in the RQ', + extra={'next_request_id': next_request_id}, + ) + return None + return request async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: From e8e185b18a21caf22930fbb3b772d5bb258be60d Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Fri, 12 Jun 2026 19:05:28 +0200 Subject: [PATCH 4/6] refactor: Type AliasResolver API-client cache key and reset it across test conftests --- .../storage_clients/_apify/_alias_resolving.py | 13 ++++++++++--- tests/e2e/conftest.py | 1 + tests/integration/conftest.py | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index df5c60953..f4c8006dd 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -4,7 +4,7 @@ from asyncio import Lock from functools import cached_property from logging import getLogger -from typing import TYPE_CHECKING, ClassVar, Literal, overload +from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, overload from apify_client import ApifyClientAsync @@ -111,6 +111,13 @@ async def open_by_alias( return get_resource_client_by_id(raw_metadata.id) +class _ApiClientCacheKey(NamedTuple): + """Cache key for `AliasResolver._api_clients` — identifies an `ApifyClientAsync` by its credentials.""" + + token: str | None + api_url: str | None + + class AliasResolver: """Class for handling aliases. @@ -135,7 +142,7 @@ class AliasResolver: _alias_init_lock: Lock | None = None """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" - _api_clients: ClassVar[dict[tuple[str | None, str | None], ApifyClientAsync]] = {} + _api_clients: ClassVar[dict[_ApiClientCacheKey, ApifyClientAsync]] = {} """Cache of Apify API clients keyed by `(token, api_url)`. Reused across instances so that repeated alias resolution does not create (and leak) a fresh unclosed `ApifyClientAsync` on every call.""" @@ -263,7 +270,7 @@ async def _get_default_kvs_client(cls, configuration: Configuration) -> KeyValue if not configuration.default_key_value_store_id: raise ValueError("'Configuration.default_key_value_store_id' must be set.") - cache_key = (configuration.token, configuration.api_base_url) + cache_key = _ApiClientCacheKey(configuration.token, configuration.api_base_url) apify_client_async = cls._api_clients.get(cache_key) if apify_client_async is None: apify_client_async = ApifyClientAsync( diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 655bc1cfb..8c6baf61b 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -87,6 +87,7 @@ def _prepare_test_env() -> None: # Reset the AliasResolver class state. AliasResolver._alias_map = {} AliasResolver._alias_init_lock = None + AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c7bd72905..8d852802d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -73,6 +73,7 @@ def _prepare_test_env() -> None: # Reset the AliasResolver class state. AliasResolver._alias_map = {} AliasResolver._alias_init_lock = None + AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) From 1f750833c809316730be9150d5e5b61e130c7767 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Sat, 13 Jun 2026 10:12:46 +0200 Subject: [PATCH 5/6] refactor: Inject API client into AliasResolver instead of caching globally --- .../_apify/_alias_resolving.py | 73 +++++-------- .../_apify/_api_client_creation.py | 2 + tests/e2e/conftest.py | 1 - tests/integration/conftest.py | 1 - tests/unit/conftest.py | 1 - .../storage_clients/test_alias_resolver.py | 102 +++++++++++++++--- 6 files changed, 119 insertions(+), 61 deletions(-) diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index f4c8006dd..56e13ebbd 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -4,9 +4,7 @@ from asyncio import Lock from functools import cached_property from logging import getLogger -from typing import TYPE_CHECKING, ClassVar, Literal, NamedTuple, overload - -from apify_client import ApifyClientAsync +from typing import TYPE_CHECKING, ClassVar, Literal, overload from ._utils import hash_api_public_base_url_and_token @@ -14,6 +12,7 @@ from collections.abc import Callable from types import TracebackType + from apify_client import ApifyClientAsync from apify_client._resource_clients import ( DatasetClientAsync, DatasetCollectionClientAsync, @@ -35,6 +34,7 @@ async def open_by_alias( storage_type: Literal['Dataset'], collection_client: DatasetCollectionClientAsync, get_resource_client_by_id: Callable[[str], DatasetClientAsync], + api_client: ApifyClientAsync, configuration: Configuration, ) -> DatasetClientAsync: ... @@ -46,6 +46,7 @@ async def open_by_alias( storage_type: Literal['KeyValueStore'], collection_client: KeyValueStoreCollectionClientAsync, get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], + api_client: ApifyClientAsync, configuration: Configuration, ) -> KeyValueStoreClientAsync: ... @@ -57,6 +58,7 @@ async def open_by_alias( storage_type: Literal['RequestQueue'], collection_client: RequestQueueCollectionClientAsync, get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], + api_client: ApifyClientAsync, configuration: Configuration, ) -> RequestQueueClientAsync: ... @@ -69,6 +71,7 @@ async def open_by_alias( KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync ), get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync], + api_client: ApifyClientAsync, configuration: Configuration, ) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: """Open storage by alias, creating it if necessary. @@ -81,6 +84,8 @@ async def open_by_alias( storage_type: The type of storage to open. collection_client: The Apify API collection client for the storage type. get_resource_client_by_id: A callable that takes a storage ID and returns the resource client. + api_client: The Apify API client used for the storage operation. Reused to access the default KVS that + holds the alias mapping, so alias resolution does not spin up its own client. configuration: Configuration object containing API credentials and settings. Returns: @@ -94,6 +99,7 @@ async def open_by_alias( storage_type=storage_type, alias=alias, configuration=configuration, + api_client=api_client, ) as alias_resolver: storage_id = await alias_resolver.resolve_id() @@ -111,13 +117,6 @@ async def open_by_alias( return get_resource_client_by_id(raw_metadata.id) -class _ApiClientCacheKey(NamedTuple): - """Cache key for `AliasResolver._api_clients` — identifies an `ApifyClientAsync` by its credentials.""" - - token: str | None - api_url: str | None - - class AliasResolver: """Class for handling aliases. @@ -142,10 +141,6 @@ class AliasResolver: _alias_init_lock: Lock | None = None """Lock for creating alias storages. Only one alias storage can be created at the time. Global for all instances.""" - _api_clients: ClassVar[dict[_ApiClientCacheKey, ApifyClientAsync]] = {} - """Cache of Apify API clients keyed by `(token, api_url)`. Reused across instances so that repeated alias - resolution does not create (and leak) a fresh unclosed `ApifyClientAsync` on every call.""" - default_storage_key: ClassVar[str] = '__default__' def __init__( @@ -153,10 +148,12 @@ def __init__( storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], alias: str, configuration: Configuration, + api_client: ApifyClientAsync, ) -> None: self._storage_type = storage_type self._alias = alias self._configuration = configuration + self._api_client = api_client async def __aenter__(self) -> AliasResolver: """Context manager to prevent race condition in alias creation.""" @@ -184,26 +181,22 @@ async def _get_alias_init_lock(cls) -> Lock: cls._alias_init_lock = Lock() return cls._alias_init_lock - @classmethod - async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: + async def _get_alias_map(self) -> dict[str, str]: """Get the aliases and storage ids mapping from the default kvs. - Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. - - Args: - configuration: Configuration object to use for accessing the default KVS. + Mapping is loaded from kvs only once and is shared for all instances of the `AliasResolver` class. Returns: Map of aliases and storage ids. """ - if not cls._alias_map_loaded and configuration.is_at_home: - default_kvs_client = await cls._get_default_kvs_client(configuration) + if not AliasResolver._alias_map_loaded and self._configuration.is_at_home: + default_kvs_client = self._get_default_kvs_client() - record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) - cls._alias_map = record.get('value', {}) if record else {} - cls._alias_map_loaded = True + record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) + AliasResolver._alias_map = record.get('value', {}) if record else {} + AliasResolver._alias_map_loaded = True - return cls._alias_map + return AliasResolver._alias_map async def resolve_id(self) -> str | None: """Get id of the aliased storage. @@ -223,12 +216,12 @@ async def resolve_id(self) -> str | None: return storage_id # Fallback to the mapping saved in the default KVS - return (await self._get_alias_map(self._configuration)).get(self._storage_key, None) + return (await self._get_alias_map()).get(self._storage_key, None) async def store_mapping(self, storage_id: str) -> None: """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" # Update in-memory mapping - alias_map = await self._get_alias_map(self._configuration) + alias_map = await self._get_alias_map() alias_map[self._storage_key] = storage_id if not self._configuration.is_at_home: @@ -237,7 +230,7 @@ async def store_mapping(self, storage_id: str) -> None: ) return - default_kvs_client = await self._get_default_kvs_client(self._configuration) + default_kvs_client = self._get_default_kvs_client() try: record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) @@ -260,24 +253,14 @@ def _storage_key(self) -> str: ] ) - @classmethod - async def _get_default_kvs_client(cls, configuration: Configuration) -> KeyValueStoreClientAsync: + def _get_default_kvs_client(self) -> KeyValueStoreClientAsync: """Get a client for the default key-value store. - The underlying `ApifyClientAsync` is cached per `(token, api_url)` and reused across calls, so repeated - alias resolution does not create (and leak) a fresh unclosed client every time. + Derived from the injected `ApifyClientAsync`, so alias resolution shares the same HTTP client (and its + connection pool and event loop affinity) as the storage operation that triggered it, instead of creating + and leaking its own. """ - if not configuration.default_key_value_store_id: + if not self._configuration.default_key_value_store_id: raise ValueError("'Configuration.default_key_value_store_id' must be set.") - cache_key = _ApiClientCacheKey(configuration.token, configuration.api_base_url) - apify_client_async = cls._api_clients.get(cache_key) - if apify_client_async is None: - apify_client_async = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - max_retries=8, - ) - cls._api_clients[cache_key] = apify_client_async - - return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) + return self._api_client.key_value_store(key_value_store_id=self._configuration.default_key_value_store_id) diff --git a/src/apify/storage_clients/_apify/_api_client_creation.py b/src/apify/storage_clients/_apify/_api_client_creation.py index 56c35bf24..41d091da5 100644 --- a/src/apify/storage_clients/_apify/_api_client_creation.py +++ b/src/apify/storage_clients/_apify/_api_client_creation.py @@ -117,6 +117,7 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync: storage_type=storage_type, collection_client=collection_client, get_resource_client_by_id=get_resource_client, + api_client=apify_client, configuration=configuration, ) # ty:ignore[no-matching-overload] @@ -127,6 +128,7 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync: storage_type=storage_type, collection_client=collection_client, get_resource_client_by_id=get_resource_client, + api_client=apify_client, configuration=configuration, ) # ty:ignore[no-matching-overload] diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 8c6baf61b..655bc1cfb 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -87,7 +87,6 @@ def _prepare_test_env() -> None: # Reset the AliasResolver class state. AliasResolver._alias_map = {} AliasResolver._alias_init_lock = None - AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8d852802d..c7bd72905 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -73,7 +73,6 @@ def _prepare_test_env() -> None: # Reset the AliasResolver class state. AliasResolver._alias_map = {} AliasResolver._alias_init_lock = None - AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 75f1d5152..c289a3dd9 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -78,7 +78,6 @@ def _prepare_test_env() -> None: AliasResolver._alias_map = {} AliasResolver._alias_map_loaded = False AliasResolver._alias_init_lock = None - AliasResolver._api_clients = {} # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/unit/storage_clients/test_alias_resolver.py b/tests/unit/storage_clients/test_alias_resolver.py index 2dc3e8ba7..a8cf67637 100644 --- a/tests/unit/storage_clients/test_alias_resolver.py +++ b/tests/unit/storage_clients/test_alias_resolver.py @@ -1,15 +1,23 @@ from __future__ import annotations -from unittest.mock import AsyncMock, patch +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +from apify_client import ApifyClientAsync from apify._configuration import Configuration from apify.storage_clients._apify._alias_resolving import AliasResolver +def _api_client() -> ApifyClientAsync: + """Build a throwaway API client for resolver construction in tests that never issue real requests.""" + return ApifyClientAsync(token='test-token') + + def test_storage_key_format() -> None: """Test that _storage_key has the expected format: type,alias,hash.""" config = Configuration(token='test-token', api_base_url='https://api.apify.com') - resolver = AliasResolver(storage_type='Dataset', alias='my-alias', configuration=config) + resolver = AliasResolver(storage_type='Dataset', alias='my-alias', configuration=config, api_client=_api_client()) key = resolver._storage_key parts = key.split(',') assert len(parts) == 3 @@ -22,7 +30,9 @@ async def test_resolve_id_returns_none_for_unknown() -> None: """Test that resolve_id returns None for an alias not in the map.""" AliasResolver._alias_map = {} config = Configuration(token='test-token') - resolver = AliasResolver(storage_type='Dataset', alias='unknown-alias', configuration=config) + resolver = AliasResolver( + storage_type='Dataset', alias='unknown-alias', configuration=config, api_client=_api_client() + ) result = await resolver.resolve_id() assert result is None @@ -30,7 +40,9 @@ async def test_resolve_id_returns_none_for_unknown() -> None: async def test_resolve_id_returns_stored_id() -> None: """Test that resolve_id returns the ID if it was previously stored.""" config = Configuration(token='test-token', api_base_url='https://api.apify.com') - resolver = AliasResolver(storage_type='KeyValueStore', alias='test-alias', configuration=config) + resolver = AliasResolver( + storage_type='KeyValueStore', alias='test-alias', configuration=config, api_client=_api_client() + ) storage_key = resolver._storage_key AliasResolver._alias_map = {storage_key: 'stored-id-123'} @@ -42,7 +54,9 @@ async def test_store_mapping_local_only() -> None: """Test that store_mapping only updates in-memory map when not at home.""" AliasResolver._alias_map = {} config = Configuration(is_at_home=False, token='test-token') - resolver = AliasResolver(storage_type='RequestQueue', alias='test-alias', configuration=config) + resolver = AliasResolver( + storage_type='RequestQueue', alias='test-alias', configuration=config, api_client=_api_client() + ) await resolver.store_mapping(storage_id='new-id-456') @@ -55,7 +69,7 @@ async def test_concurrent_alias_creation_uses_lock() -> None: AliasResolver._alias_init_lock = None AliasResolver._alias_map = {} config = Configuration(token='test-token') - resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config) + resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) async with resolver: # Lock should be acquired @@ -71,26 +85,28 @@ async def test_get_alias_map_returns_in_memory_map() -> None: """Test that _get_alias_map returns the in-memory map when not at home.""" AliasResolver._alias_map = {'existing_key': 'existing_id'} config = Configuration(is_at_home=False, token='test-token') + resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) - result = await AliasResolver._get_alias_map(config) + result = await resolver._get_alias_map() assert result == {'existing_key': 'existing_id'} # Also verify that an empty map is returned without fetching from KVS when not at home AliasResolver._alias_map = {} - result = await AliasResolver._get_alias_map(config) + result = await resolver._get_alias_map() assert result == {} async def test_get_alias_map_loads_from_kvs_only_once_when_empty() -> None: """An empty KVS response must not trigger repeat fetches on subsequent calls.""" config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') + resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) fake_kvs_client = AsyncMock() fake_kvs_client.get_record = AsyncMock(return_value=None) with patch.object(AliasResolver, '_get_default_kvs_client', return_value=fake_kvs_client): - await AliasResolver._get_alias_map(config) - await AliasResolver._get_alias_map(config) - await AliasResolver._get_alias_map(config) + await resolver._get_alias_map() + await resolver._get_alias_map() + await resolver._get_alias_map() assert fake_kvs_client.get_record.await_count == 1 assert AliasResolver._alias_map == {} @@ -100,7 +116,7 @@ async def test_store_mapping_uses_injected_configuration_is_at_home() -> None: """`store_mapping` gates on the injected configuration's `is_at_home`, not the global one.""" # Global `is_at_home` defaults to False; injected config says True — the KVS write must still happen. config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') - resolver = AliasResolver(storage_type='Dataset', alias='test-alias', configuration=config) + resolver = AliasResolver(storage_type='Dataset', alias='test-alias', configuration=config, api_client=_api_client()) fake_kvs_client = AsyncMock() fake_kvs_client.get_record = AsyncMock(return_value=None) @@ -129,6 +145,66 @@ async def test_configuration_storages_alias_resolving() -> None: # Check that id of each non-default storage saved in the mapping is resolved for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'): assert ( - await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id() + await AliasResolver( + storage_type=storage_type, alias='custom', configuration=configuration, api_client=_api_client() + ).resolve_id() == f'custom_{storage_type}_id' ) + + +def test_default_kvs_client_derives_from_injected_client() -> None: + """The default-KVS client used for alias mapping is derived from the injected client, not a freshly created one.""" + api_client = _api_client() + config = Configuration(token='test-token', default_key_value_store_id='default-kvs-id') + resolver = AliasResolver(storage_type='Dataset', alias='a', configuration=config, api_client=api_client) + + kvs_client = resolver._get_default_kvs_client() + + assert kvs_client.resource_id == 'default-kvs-id' + # Shares the injected client's HTTP client (and its connection pool), proving no separate client is spun up. + assert kvs_client._http_client is api_client.http_client + + +def test_resolvers_use_their_own_injected_client() -> None: + """Each resolver derives its KVS client from its own injected client; there is no shared process-global cache.""" + config = Configuration(token='test-token', default_key_value_store_id='default-kvs-id') + client_a = _api_client() + client_b = _api_client() + resolver_a = AliasResolver(storage_type='Dataset', alias='a', configuration=config, api_client=client_a) + resolver_b = AliasResolver(storage_type='Dataset', alias='b', configuration=config, api_client=client_b) + + assert resolver_a._get_default_kvs_client()._http_client is client_a.http_client + assert resolver_b._get_default_kvs_client()._http_client is client_b.http_client + assert client_a.http_client is not client_b.http_client + + +def test_alias_resolution_runs_across_event_loops_with_shared_client() -> None: + """A single injected client can drive alias resolution from more than one event loop without loop-bound state.""" + config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') + + kvs_client = AsyncMock() + kvs_client.get_record = AsyncMock(return_value={'value': {}}) + kvs_client.set_record = AsyncMock(return_value=None) + api_client = MagicMock() + api_client.key_value_store = MagicMock(return_value=kvs_client) + + async def store_on_current_loop(alias: str, storage_id: str) -> None: + # Each loop starts from clean class state and builds its own lock on the running loop. + AliasResolver._alias_map = {} + AliasResolver._alias_map_loaded = False + AliasResolver._alias_init_lock = None + resolver = AliasResolver(storage_type='Dataset', alias=alias, configuration=config, api_client=api_client) + async with resolver: + await resolver.store_mapping(storage_id=storage_id) + + loop_a = asyncio.new_event_loop() + loop_b = asyncio.new_event_loop() + try: + loop_a.run_until_complete(store_on_current_loop('alias-a', 'id-a')) + loop_b.run_until_complete(store_on_current_loop('alias-b', 'id-b')) + finally: + loop_a.close() + loop_b.close() + + # The same injected client served both event loops. + assert kvs_client.set_record.await_count == 2 From 405898d6107cd487ed8e0b2db8ffc2cda3471bae Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Sat, 13 Jun 2026 10:43:35 +0200 Subject: [PATCH 6/6] refactor: Move AliasResolver API-client injection to a dedicated PR Reverts the api_client injection into AliasResolver/open_by_alias added earlier on this branch so this PR stays a focused dead-code cleanup; the client-lifecycle change is submitted as a separate PR per review feedback. --- .../_apify/_alias_resolving.py | 56 +++++----- .../_apify/_api_client_creation.py | 2 - .../storage_clients/test_alias_resolver.py | 102 +++--------------- 3 files changed, 40 insertions(+), 120 deletions(-) diff --git a/src/apify/storage_clients/_apify/_alias_resolving.py b/src/apify/storage_clients/_apify/_alias_resolving.py index 56e13ebbd..1bfff85e1 100644 --- a/src/apify/storage_clients/_apify/_alias_resolving.py +++ b/src/apify/storage_clients/_apify/_alias_resolving.py @@ -6,13 +6,14 @@ from logging import getLogger from typing import TYPE_CHECKING, ClassVar, Literal, overload +from apify_client import ApifyClientAsync + from ._utils import hash_api_public_base_url_and_token if TYPE_CHECKING: from collections.abc import Callable from types import TracebackType - from apify_client import ApifyClientAsync from apify_client._resource_clients import ( DatasetClientAsync, DatasetCollectionClientAsync, @@ -34,7 +35,6 @@ async def open_by_alias( storage_type: Literal['Dataset'], collection_client: DatasetCollectionClientAsync, get_resource_client_by_id: Callable[[str], DatasetClientAsync], - api_client: ApifyClientAsync, configuration: Configuration, ) -> DatasetClientAsync: ... @@ -46,7 +46,6 @@ async def open_by_alias( storage_type: Literal['KeyValueStore'], collection_client: KeyValueStoreCollectionClientAsync, get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync], - api_client: ApifyClientAsync, configuration: Configuration, ) -> KeyValueStoreClientAsync: ... @@ -58,7 +57,6 @@ async def open_by_alias( storage_type: Literal['RequestQueue'], collection_client: RequestQueueCollectionClientAsync, get_resource_client_by_id: Callable[[str], RequestQueueClientAsync], - api_client: ApifyClientAsync, configuration: Configuration, ) -> RequestQueueClientAsync: ... @@ -71,7 +69,6 @@ async def open_by_alias( KeyValueStoreCollectionClientAsync | RequestQueueCollectionClientAsync | DatasetCollectionClientAsync ), get_resource_client_by_id: Callable[[str], KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync], - api_client: ApifyClientAsync, configuration: Configuration, ) -> KeyValueStoreClientAsync | RequestQueueClientAsync | DatasetClientAsync: """Open storage by alias, creating it if necessary. @@ -84,8 +81,6 @@ async def open_by_alias( storage_type: The type of storage to open. collection_client: The Apify API collection client for the storage type. get_resource_client_by_id: A callable that takes a storage ID and returns the resource client. - api_client: The Apify API client used for the storage operation. Reused to access the default KVS that - holds the alias mapping, so alias resolution does not spin up its own client. configuration: Configuration object containing API credentials and settings. Returns: @@ -99,7 +94,6 @@ async def open_by_alias( storage_type=storage_type, alias=alias, configuration=configuration, - api_client=api_client, ) as alias_resolver: storage_id = await alias_resolver.resolve_id() @@ -148,12 +142,10 @@ def __init__( storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], alias: str, configuration: Configuration, - api_client: ApifyClientAsync, ) -> None: self._storage_type = storage_type self._alias = alias self._configuration = configuration - self._api_client = api_client async def __aenter__(self) -> AliasResolver: """Context manager to prevent race condition in alias creation.""" @@ -181,22 +173,26 @@ async def _get_alias_init_lock(cls) -> Lock: cls._alias_init_lock = Lock() return cls._alias_init_lock - async def _get_alias_map(self) -> dict[str, str]: + @classmethod + async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]: """Get the aliases and storage ids mapping from the default kvs. - Mapping is loaded from kvs only once and is shared for all instances of the `AliasResolver` class. + Mapping is loaded from kvs only once and is shared for all instances of the AliasResolver class. + + Args: + configuration: Configuration object to use for accessing the default KVS. Returns: Map of aliases and storage ids. """ - if not AliasResolver._alias_map_loaded and self._configuration.is_at_home: - default_kvs_client = self._get_default_kvs_client() + if not cls._alias_map_loaded and configuration.is_at_home: + default_kvs_client = await cls._get_default_kvs_client(configuration) - record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) - AliasResolver._alias_map = record.get('value', {}) if record else {} - AliasResolver._alias_map_loaded = True + record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY) + cls._alias_map = record.get('value', {}) if record else {} + cls._alias_map_loaded = True - return AliasResolver._alias_map + return cls._alias_map async def resolve_id(self) -> str | None: """Get id of the aliased storage. @@ -216,12 +212,12 @@ async def resolve_id(self) -> str | None: return storage_id # Fallback to the mapping saved in the default KVS - return (await self._get_alias_map()).get(self._storage_key, None) + return (await self._get_alias_map(self._configuration)).get(self._storage_key, None) async def store_mapping(self, storage_id: str) -> None: """Add alias and related storage id to the mapping in default kvs and local in-memory mapping.""" # Update in-memory mapping - alias_map = await self._get_alias_map() + alias_map = await self._get_alias_map(self._configuration) alias_map[self._storage_key] = storage_id if not self._configuration.is_at_home: @@ -230,7 +226,7 @@ async def store_mapping(self, storage_id: str) -> None: ) return - default_kvs_client = self._get_default_kvs_client() + default_kvs_client = await self._get_default_kvs_client(self._configuration) try: record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY) @@ -253,14 +249,16 @@ def _storage_key(self) -> str: ] ) - def _get_default_kvs_client(self) -> KeyValueStoreClientAsync: - """Get a client for the default key-value store. + @staticmethod + async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStoreClientAsync: + """Get a client for the default key-value store.""" + apify_client_async = ApifyClientAsync( + token=configuration.token, + api_url=configuration.api_base_url, + max_retries=8, + ) - Derived from the injected `ApifyClientAsync`, so alias resolution shares the same HTTP client (and its - connection pool and event loop affinity) as the storage operation that triggered it, instead of creating - and leaking its own. - """ - if not self._configuration.default_key_value_store_id: + if not configuration.default_key_value_store_id: raise ValueError("'Configuration.default_key_value_store_id' must be set.") - return self._api_client.key_value_store(key_value_store_id=self._configuration.default_key_value_store_id) + return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id) diff --git a/src/apify/storage_clients/_apify/_api_client_creation.py b/src/apify/storage_clients/_apify/_api_client_creation.py index 41d091da5..56c35bf24 100644 --- a/src/apify/storage_clients/_apify/_api_client_creation.py +++ b/src/apify/storage_clients/_apify/_api_client_creation.py @@ -117,7 +117,6 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync: storage_type=storage_type, collection_client=collection_client, get_resource_client_by_id=get_resource_client, - api_client=apify_client, configuration=configuration, ) # ty:ignore[no-matching-overload] @@ -128,7 +127,6 @@ def get_resource_client(storage_id: str) -> DatasetClientAsync: storage_type=storage_type, collection_client=collection_client, get_resource_client_by_id=get_resource_client, - api_client=apify_client, configuration=configuration, ) # ty:ignore[no-matching-overload] diff --git a/tests/unit/storage_clients/test_alias_resolver.py b/tests/unit/storage_clients/test_alias_resolver.py index a8cf67637..2dc3e8ba7 100644 --- a/tests/unit/storage_clients/test_alias_resolver.py +++ b/tests/unit/storage_clients/test_alias_resolver.py @@ -1,23 +1,15 @@ from __future__ import annotations -import asyncio -from unittest.mock import AsyncMock, MagicMock, patch - -from apify_client import ApifyClientAsync +from unittest.mock import AsyncMock, patch from apify._configuration import Configuration from apify.storage_clients._apify._alias_resolving import AliasResolver -def _api_client() -> ApifyClientAsync: - """Build a throwaway API client for resolver construction in tests that never issue real requests.""" - return ApifyClientAsync(token='test-token') - - def test_storage_key_format() -> None: """Test that _storage_key has the expected format: type,alias,hash.""" config = Configuration(token='test-token', api_base_url='https://api.apify.com') - resolver = AliasResolver(storage_type='Dataset', alias='my-alias', configuration=config, api_client=_api_client()) + resolver = AliasResolver(storage_type='Dataset', alias='my-alias', configuration=config) key = resolver._storage_key parts = key.split(',') assert len(parts) == 3 @@ -30,9 +22,7 @@ async def test_resolve_id_returns_none_for_unknown() -> None: """Test that resolve_id returns None for an alias not in the map.""" AliasResolver._alias_map = {} config = Configuration(token='test-token') - resolver = AliasResolver( - storage_type='Dataset', alias='unknown-alias', configuration=config, api_client=_api_client() - ) + resolver = AliasResolver(storage_type='Dataset', alias='unknown-alias', configuration=config) result = await resolver.resolve_id() assert result is None @@ -40,9 +30,7 @@ async def test_resolve_id_returns_none_for_unknown() -> None: async def test_resolve_id_returns_stored_id() -> None: """Test that resolve_id returns the ID if it was previously stored.""" config = Configuration(token='test-token', api_base_url='https://api.apify.com') - resolver = AliasResolver( - storage_type='KeyValueStore', alias='test-alias', configuration=config, api_client=_api_client() - ) + resolver = AliasResolver(storage_type='KeyValueStore', alias='test-alias', configuration=config) storage_key = resolver._storage_key AliasResolver._alias_map = {storage_key: 'stored-id-123'} @@ -54,9 +42,7 @@ async def test_store_mapping_local_only() -> None: """Test that store_mapping only updates in-memory map when not at home.""" AliasResolver._alias_map = {} config = Configuration(is_at_home=False, token='test-token') - resolver = AliasResolver( - storage_type='RequestQueue', alias='test-alias', configuration=config, api_client=_api_client() - ) + resolver = AliasResolver(storage_type='RequestQueue', alias='test-alias', configuration=config) await resolver.store_mapping(storage_id='new-id-456') @@ -69,7 +55,7 @@ async def test_concurrent_alias_creation_uses_lock() -> None: AliasResolver._alias_init_lock = None AliasResolver._alias_map = {} config = Configuration(token='test-token') - resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) + resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config) async with resolver: # Lock should be acquired @@ -85,28 +71,26 @@ async def test_get_alias_map_returns_in_memory_map() -> None: """Test that _get_alias_map returns the in-memory map when not at home.""" AliasResolver._alias_map = {'existing_key': 'existing_id'} config = Configuration(is_at_home=False, token='test-token') - resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) - result = await resolver._get_alias_map() + result = await AliasResolver._get_alias_map(config) assert result == {'existing_key': 'existing_id'} # Also verify that an empty map is returned without fetching from KVS when not at home AliasResolver._alias_map = {} - result = await resolver._get_alias_map() + result = await AliasResolver._get_alias_map(config) assert result == {} async def test_get_alias_map_loads_from_kvs_only_once_when_empty() -> None: """An empty KVS response must not trigger repeat fetches on subsequent calls.""" config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') - resolver = AliasResolver(storage_type='Dataset', alias='test', configuration=config, api_client=_api_client()) fake_kvs_client = AsyncMock() fake_kvs_client.get_record = AsyncMock(return_value=None) with patch.object(AliasResolver, '_get_default_kvs_client', return_value=fake_kvs_client): - await resolver._get_alias_map() - await resolver._get_alias_map() - await resolver._get_alias_map() + await AliasResolver._get_alias_map(config) + await AliasResolver._get_alias_map(config) + await AliasResolver._get_alias_map(config) assert fake_kvs_client.get_record.await_count == 1 assert AliasResolver._alias_map == {} @@ -116,7 +100,7 @@ async def test_store_mapping_uses_injected_configuration_is_at_home() -> None: """`store_mapping` gates on the injected configuration's `is_at_home`, not the global one.""" # Global `is_at_home` defaults to False; injected config says True — the KVS write must still happen. config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') - resolver = AliasResolver(storage_type='Dataset', alias='test-alias', configuration=config, api_client=_api_client()) + resolver = AliasResolver(storage_type='Dataset', alias='test-alias', configuration=config) fake_kvs_client = AsyncMock() fake_kvs_client.get_record = AsyncMock(return_value=None) @@ -145,66 +129,6 @@ async def test_configuration_storages_alias_resolving() -> None: # Check that id of each non-default storage saved in the mapping is resolved for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'): assert ( - await AliasResolver( - storage_type=storage_type, alias='custom', configuration=configuration, api_client=_api_client() - ).resolve_id() + await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id() == f'custom_{storage_type}_id' ) - - -def test_default_kvs_client_derives_from_injected_client() -> None: - """The default-KVS client used for alias mapping is derived from the injected client, not a freshly created one.""" - api_client = _api_client() - config = Configuration(token='test-token', default_key_value_store_id='default-kvs-id') - resolver = AliasResolver(storage_type='Dataset', alias='a', configuration=config, api_client=api_client) - - kvs_client = resolver._get_default_kvs_client() - - assert kvs_client.resource_id == 'default-kvs-id' - # Shares the injected client's HTTP client (and its connection pool), proving no separate client is spun up. - assert kvs_client._http_client is api_client.http_client - - -def test_resolvers_use_their_own_injected_client() -> None: - """Each resolver derives its KVS client from its own injected client; there is no shared process-global cache.""" - config = Configuration(token='test-token', default_key_value_store_id='default-kvs-id') - client_a = _api_client() - client_b = _api_client() - resolver_a = AliasResolver(storage_type='Dataset', alias='a', configuration=config, api_client=client_a) - resolver_b = AliasResolver(storage_type='Dataset', alias='b', configuration=config, api_client=client_b) - - assert resolver_a._get_default_kvs_client()._http_client is client_a.http_client - assert resolver_b._get_default_kvs_client()._http_client is client_b.http_client - assert client_a.http_client is not client_b.http_client - - -def test_alias_resolution_runs_across_event_loops_with_shared_client() -> None: - """A single injected client can drive alias resolution from more than one event loop without loop-bound state.""" - config = Configuration(is_at_home=True, token='test-token', default_key_value_store_id='default-kvs-id') - - kvs_client = AsyncMock() - kvs_client.get_record = AsyncMock(return_value={'value': {}}) - kvs_client.set_record = AsyncMock(return_value=None) - api_client = MagicMock() - api_client.key_value_store = MagicMock(return_value=kvs_client) - - async def store_on_current_loop(alias: str, storage_id: str) -> None: - # Each loop starts from clean class state and builds its own lock on the running loop. - AliasResolver._alias_map = {} - AliasResolver._alias_map_loaded = False - AliasResolver._alias_init_lock = None - resolver = AliasResolver(storage_type='Dataset', alias=alias, configuration=config, api_client=api_client) - async with resolver: - await resolver.store_mapping(storage_id=storage_id) - - loop_a = asyncio.new_event_loop() - loop_b = asyncio.new_event_loop() - try: - loop_a.run_until_complete(store_on_current_loop('alias-a', 'id-a')) - loop_b.run_until_complete(store_on_current_loop('alias-b', 'id-b')) - finally: - loop_a.close() - loop_b.close() - - # The same injected client served both event loops. - assert kvs_client.set_record.await_count == 2