From 403ae02004f9c4b6d9766730729485d5262c72a2 Mon Sep 17 00:00:00 2001 From: Martin Kersner Date: Fri, 3 Jul 2026 16:55:31 +0900 Subject: [PATCH] feat: async client pilot (httpx) for cex.candle + cex.ticker (#142) Closes #142 Add datamaxi.aio.AsyncDatamaxi (optional 'async' extra, httpx). Pilot scope: cex.candle + cex.ticker, async context manager, bounded retry of transient 5xx, ClientError/ServerError parity, and last_response. To avoid sync/async drift, extract endpoint resolution + error handling into datamaxi._dispatch (resolve_endpoint, raise_for_error, extract_limit_usage) and have the sync API delegate to them (behavior-preserving; full sync suite still green). httpx is an optional dependency (datamaxi[async]); importing the sync client never loads it, and using the async client without it raises a clear install hint. Query params are str()-encoded so bools match the sync urlencode output ('True', not httpx's 'true'). --- datamaxi/_dispatch.py | 93 ++++++++++ datamaxi/aio/__init__.py | 280 +++++++++++++++++++++++++++++ datamaxi/api.py | 66 +------ pyproject.toml | 3 + requirements/requirements-test.txt | 1 + tests/test_async.py | 142 +++++++++++++++ 6 files changed, 526 insertions(+), 59 deletions(-) create mode 100644 datamaxi/_dispatch.py create mode 100644 datamaxi/aio/__init__.py create mode 100644 tests/test_async.py diff --git a/datamaxi/_dispatch.py b/datamaxi/_dispatch.py new file mode 100644 index 0000000..f43c2eb --- /dev/null +++ b/datamaxi/_dispatch.py @@ -0,0 +1,93 @@ +"""Transport-agnostic request helpers shared by the sync and async clients. + +Keeping the endpoint resolution and error handling here (rather than in +``API``) lets the ``httpx``-based async client reuse exactly the same +param-splitting and error semantics as the sync ``requests`` client, so the +two can't drift. +""" + +import json +from json import JSONDecodeError + +from datamaxi.error import ClientError, ServerError +from datamaxi.lib.utils import check_required_parameter +from datamaxi._endpoints import ENDPOINTS + + +def resolve_endpoint(op_id, **params): + """Resolve ``op_id`` + caller params into ``(method, url_path, query)``. + + Uses ``datamaxi._endpoints.ENDPOINTS`` (generated from the backend + OpenAPI spec) as the single source of truth for path, method, the + path/query split, required params, and defaults. + """ + ep = ENDPOINTS.get(op_id) + if ep is None: + raise ValueError(f"unknown endpoint operation_id: {op_id!r}") + + spec_params = ep.get("params", {}) + + unknown = set(params) - set(spec_params) + if unknown: + raise ValueError( + f"{op_id}: unknown parameter(s) {sorted(unknown)}; " + f"expected one of {sorted(spec_params)}" + ) + + # Resolve each value: caller-supplied, else the registry default. + values = {} + for name, meta in spec_params.items(): + val = params.get(name) + if val is None and "default" in meta: + val = meta["default"] + values[name] = val + + # Enforce params the spec marks required. + for name, meta in spec_params.items(): + if meta.get("required"): + check_required_parameter(values.get(name), name) + + # Split path vs query params; interpolate path params into the URL. + url_path = ep["path"] + query_params = {} + for name, meta in spec_params.items(): + if meta.get("in") == "path": + url_path = url_path.replace("{" + name + "}", str(values[name])) + else: + query_params[name] = values[name] + + return ep["method"], url_path, query_params + + +def raise_for_error(status_code, text, headers): + """Raise ``ClientError`` / ``ServerError`` for a 4xx / 5xx response. + + Works on any response given its ``status_code`` / ``text`` / ``headers``, + so it applies identically to ``requests`` and ``httpx`` responses. + """ + if status_code < 400: + return + if 400 <= status_code < 500: + try: + err = json.loads(text) + except JSONDecodeError: + raise ClientError(status_code, text, None, headers) + error_data = None + if "data" in err: + error_data = err["data"] + raise ClientError(status_code, err["error"], headers, error_data) + raise ServerError(status_code, text) + + +def extract_limit_usage(headers): + """Pull the ``x-ratelimit-*`` triplet out of the response headers.""" + usage = {} + for key in headers.keys(): + k = key.lower() + if ( + k.startswith("x-ratelimit-limit") + or k.startswith("x-ratelimit-remaining") + or k.startswith("x-ratelimit-reset") + ): + usage[k] = headers[key] + return usage diff --git a/datamaxi/aio/__init__.py b/datamaxi/aio/__init__.py new file mode 100644 index 0000000..1e6f0fa --- /dev/null +++ b/datamaxi/aio/__init__.py @@ -0,0 +1,280 @@ +"""Async client (pilot) — ``httpx``-based, mirrors a slice of the sync surface. + +Requires the ``async`` extra:: + + pip install "datamaxi[async]" + +Usage:: + + from datamaxi.aio import AsyncDatamaxi + + async with AsyncDatamaxi(api_key="...") as client: + df = await client.cex.candle(exchange="binance", market="spot", + symbol="BTC-USDT") + ticker = await client.cex.ticker.get(exchange="binance", market="spot", + symbol="BTC-USDT") + +This is a deliberately small pilot (candle + ticker). It reuses the sync +client's endpoint resolution and error handling (``datamaxi._dispatch``) and +the shared DataFrame / ResponseMeta helpers, so the two clients can't drift on +request building or error semantics. +""" + +from __future__ import annotations + +import asyncio +import os +from typing import Any, Union, TYPE_CHECKING + +from datamaxi.__version__ import __version__ +from datamaxi.api import ResponseMeta +from datamaxi._dispatch import resolve_endpoint, raise_for_error, extract_limit_usage +from datamaxi.lib.constants import ( + BASE_URL, + SPOT, + FUTURES, + USD, + INTERVAL_1D, + Market, + Interval, +) +from datamaxi.lib.utils import check_required_parameters +from datamaxi.resources.responses import CandleResponse, TickerResponse + +if TYPE_CHECKING: + import pandas as pd + + +def _import_httpx(): + try: + import httpx + except ImportError as exc: # pragma: no cover - exercised via extra + raise ImportError( + "The async client requires httpx. Install it with: " + "pip install 'datamaxi[async]'" + ) from exc + return httpx + + +class AsyncAPI: + """Async transport built on ``httpx.AsyncClient``. + + Mirrors the sync ``API``: shared endpoint resolution, bounded retry of + transient gateway 5xx, the same ``ClientError`` / ``ServerError`` contract, + and ``last_response`` metadata. + """ + + def __init__( + self, + api_key=None, + base_url=None, + timeout=10, + max_retries=3, + retry_backoff=0.5, + retry_statuses=(502, 503, 504), + transport=None, + ): + httpx = _import_httpx() + self.api_key = api_key or os.environ.get("DATAMAXI_API_KEY") + self.base_url = base_url + self.timeout = timeout + self.max_retries = max_retries + self.retry_backoff = retry_backoff + self.retry_statuses = tuple(retry_statuses) + self.last_response = None + self._client = httpx.AsyncClient( + base_url=base_url or "", + timeout=timeout, + transport=transport, + headers={ + "Content-Type": "application/json;charset=utf-8", + "User-Agent": "datamaxi/" + __version__, + "X-DTMX-APIKEY": str(self.api_key), + }, + ) + + async def request_endpoint(self, op_id, **params): + method, url_path, query_params = resolve_endpoint(op_id, **params) + return await self.send_request(method, url_path, payload=query_params) + + async def send_request(self, method, url_path, payload=None): + # str()-encode scalars so bools match the sync client's urlencode + # output (e.g. include_source -> "True", not httpx's "true"). + params = {k: str(v) for k, v in (payload or {}).items() if v is not None} + for attempt in range(self.max_retries + 1): + response = await self._client.request(method, url_path, params=params) + if ( + response.status_code in self.retry_statuses + and attempt < self.max_retries + ): + await asyncio.sleep(self.retry_backoff * (attempt + 1)) + continue + break + + raise_for_error(response.status_code, response.text, response.headers) + + try: + data = response.json() + except ValueError: + data = response.text + + self.last_response = ResponseMeta( + status_code=response.status_code, + headers=response.headers, + limit_usage=extract_limit_usage(response.headers), + data=data, + ) + return data + + async def aclose(self): + await self._client.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.aclose() + + +class AsyncResource: + """Base for async resources — composes a shared ``AsyncAPI``.""" + + def __init__(self, api: "AsyncAPI"): + self._api = api + + async def request_endpoint(self, op_id, **params): + return await self._api.request_endpoint(op_id, **params) + + @property + def last_response(self): + return self._api.last_response + + +class AsyncCexCandle(AsyncResource): + async def __call__( + self, + exchange: str, + market: Market, + symbol: str, + currency: str = USD, + interval: Interval = INTERVAL_1D, + from_unix: str = None, + to_unix: str = None, + pandas: bool = True, + ) -> Union[pd.DataFrame, CandleResponse]: + """Fetch candle data (async). See ``datamaxi.Datamaxi.cex.candle``.""" + check_required_parameters( + [ + [exchange, "exchange"], + [symbol, "symbol"], + [interval, "interval"], + [market, "market"], + [currency, "currency"], + ] + ) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + + res = await self.request_endpoint( + "cex_candle", + exchange=exchange, + market=market, + symbol=symbol, + interval=interval, + currency=currency, + **{"from": from_unix, "to": to_unix}, + ) + if res["data"] is None or len(res["data"]) == 0: + raise ValueError("no data found") + + if pandas: + from datamaxi.resources.utils import convert_data_to_data_frame + + return convert_data_to_data_frame(res["data"]) + return res + + +class AsyncCexTicker(AsyncResource): + async def get( + self, + exchange: str, + symbol: str, + market: Market, + currency: str = None, + conversion_base: str = None, + include_source: bool = False, + pandas: bool = True, + ) -> Union[pd.DataFrame, TickerResponse]: + """Fetch ticker data (async). See ``datamaxi.Datamaxi.cex.ticker``.""" + check_required_parameters( + [ + [exchange, "exchange"], + [symbol, "symbol"], + [market, "market"], + ] + ) + if market not in [SPOT, FUTURES]: + raise ValueError("market must be either spot or futures") + + res = await self.request_endpoint( + "ticker", + exchange=exchange, + symbol=symbol, + market=market, + currency=currency, + conversion_base=conversion_base, + include_source=include_source, + ) + + if pandas: + import pandas as pd + + df = pd.DataFrame([res["data"]]) + df = df.set_index("d") + return df + return res + + +class AsyncCex(AsyncResource): + def __init__(self, api: "AsyncAPI"): + super().__init__(api) + self.candle = AsyncCexCandle(api) + self.ticker = AsyncCexTicker(api) + + +class AsyncDatamaxi: + """Async entrypoint (pilot). Exposes ``cex.candle`` and ``cex.ticker``. + + Use as an async context manager so the underlying ``httpx`` client is + closed, or call :meth:`aclose` explicitly. + """ + + def __init__(self, api_key=None, **kwargs: Any): + if "base_url" not in kwargs: + kwargs["base_url"] = BASE_URL + self._api = AsyncAPI(api_key, **kwargs) + self.cex = AsyncCex(self._api) + + async def aclose(self): + await self._api.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + await self.aclose() + + def __repr__(self): + return "AsyncDatamaxi(base_url={!r}, has_key={})".format( + self._api.base_url, bool(self._api.api_key) + ) + + +__all__ = [ + "AsyncDatamaxi", + "AsyncAPI", + "AsyncResource", + "AsyncCex", + "AsyncCexCandle", + "AsyncCexTicker", +] diff --git a/datamaxi/api.py b/datamaxi/api.py index 91ea59e..42f5c90 100644 --- a/datamaxi/api.py +++ b/datamaxi/api.py @@ -1,17 +1,13 @@ import os -import json -from json import JSONDecodeError import logging import warnings import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from .__version__ import __version__ -from datamaxi.error import ClientError, ServerError from datamaxi.lib.utils import cleanNoneValue from datamaxi.lib.utils import encoded_string -from datamaxi.lib.utils import check_required_parameter -from datamaxi._endpoints import ENDPOINTS +from datamaxi._dispatch import resolve_endpoint, raise_for_error class API(object): @@ -130,43 +126,12 @@ def request_endpoint(self, op_id, **params): required params, and default values. Callers pass wire-level parameter names as keyword arguments (e.g. ``**{"from": from_unix}``); semantic validation and response shaping stay in the calling client method. + + The resolution itself lives in ``datamaxi._dispatch.resolve_endpoint`` + so the async client reuses identical param handling. """ - ep = ENDPOINTS.get(op_id) - if ep is None: - raise ValueError(f"unknown endpoint operation_id: {op_id!r}") - - spec_params = ep.get("params", {}) - - unknown = set(params) - set(spec_params) - if unknown: - raise ValueError( - f"{op_id}: unknown parameter(s) {sorted(unknown)}; " - f"expected one of {sorted(spec_params)}" - ) - - # Resolve each value: caller-supplied, else the registry default. - values = {} - for name, meta in spec_params.items(): - val = params.get(name) - if val is None and "default" in meta: - val = meta["default"] - values[name] = val - - # Enforce params the spec marks required. - for name, meta in spec_params.items(): - if meta.get("required"): - check_required_parameter(values.get(name), name) - - # Split path vs query params; interpolate path params into the URL. - url_path = ep["path"] - query_params = {} - for name, meta in spec_params.items(): - if meta.get("in") == "path": - url_path = url_path.replace("{" + name + "}", str(values[name])) - else: - query_params[name] = values[name] - - return self.send_request(ep["method"], url_path, payload=query_params) + method, url_path, query_params = resolve_endpoint(op_id, **params) + return self.send_request(method, url_path, payload=query_params) def send_request(self, http_method, url_path, payload=None): if payload is None: @@ -229,24 +194,7 @@ def _dispatch_request(self, http_method): }.get(http_method, "GET") def _handle_exception(self, response): - status_code = response.status_code - if status_code < 400: - return - if 400 <= status_code < 500: - try: - err = json.loads(response.text) - except JSONDecodeError: - raise ClientError(status_code, response.text, None, response.headers) - error_data = None - if "data" in err: - error_data = err["data"] - raise ClientError( - status_code, - err["error"], - response.headers, - error_data, - ) - raise ServerError(status_code, response.text) + raise_for_error(response.status_code, response.text, response.headers) class ResponseMeta(object): diff --git a/pyproject.toml b/pyproject.toml index 8ce23ea..ab49761 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,9 @@ classifiers = [ ] dynamic = ["dependencies", "version"] +[project.optional-dependencies] +async = ["httpx>=0.27,<1"] + [tool.setuptools.dynamic] dependencies = {file = ["requirements/common.txt"]} version = {attr = "datamaxi.__version__.__version__"} diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index a383377..0a43e44 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -2,6 +2,7 @@ pytest-cov>=5 pytest>=8 responses>=0.25 +httpx>=0.27,<1 black==26.3.1 flake8==7.3.0 wheel==0.46.3 diff --git a/tests/test_async.py b/tests/test_async.py new file mode 100644 index 0000000..3f7c7b6 --- /dev/null +++ b/tests/test_async.py @@ -0,0 +1,142 @@ +"""Local tests for the async client pilot (#142). + +Uses httpx.MockTransport (no network, no extra deps) injected into AsyncAPI. +Skipped entirely when the optional ``httpx`` dependency is absent. +""" + +import asyncio + +import pandas as pd +import pytest + +httpx = pytest.importorskip("httpx") + +from datamaxi.aio import AsyncDatamaxi # noqa: E402 +from datamaxi.api import ResponseMeta # noqa: E402 +from datamaxi.error import ClientError # noqa: E402 + +BASE_URL = "https://api.datamaxiplus.com" +_CANDLE = { + "data": [{"d": "1700000000", "o": "1", "h": "2", "l": "1", "c": "2", "v": "9"}] +} +_TICKER = {"data": {"d": "1700000000", "p": "105.5"}} + + +def _run(coro): + return asyncio.run(coro) + + +def _client(handler): + return AsyncDatamaxi( + api_key="k", base_url=BASE_URL, transport=httpx.MockTransport(handler) + ) + + +def test_async_candle_returns_dataframe(): + def handler(request): + assert request.url.path == "/api/v1/cex/candle" + return httpx.Response(200, json=_CANDLE) + + async def run(): + async with _client(handler) as c: + return await c.cex.candle( + exchange="binance", market="spot", symbol="BTC-USDT" + ) + + df = _run(run()) + assert isinstance(df, pd.DataFrame) + assert "c" in df.columns + + +def test_async_candle_pandas_false_returns_envelope(): + async def run(): + async with _client(lambda r: httpx.Response(200, json=_CANDLE)) as c: + return await c.cex.candle( + exchange="binance", market="spot", symbol="BTC-USDT", pandas=False + ) + + res = _run(run()) + assert res == _CANDLE + + +def test_async_ticker_forwards_bool_param_like_sync(): + seen = {} + + def handler(request): + seen.update(dict(request.url.params)) + return httpx.Response(200, json=_TICKER) + + async def run(): + async with _client(handler) as c: + return await c.cex.ticker.get( + exchange="binance", + market="spot", + symbol="BTC-USDT", + include_source=True, + ) + + df = _run(run()) + assert isinstance(df, pd.DataFrame) + # bool encoded as "True" (matches the sync urlencode output), not "true" + assert seen["include_source"] == "True" + + +def test_async_last_response_populated(): + headers = {"x-ratelimit-remaining": "42"} + + async def run(): + async with _client( + lambda r: httpx.Response(200, json=_TICKER, headers=headers) + ) as c: + await c.cex.ticker.get(exchange="binance", market="spot", symbol="BTC-USDT") + return c.cex.ticker.last_response + + lr = _run(run()) + assert isinstance(lr, ResponseMeta) + assert lr.status_code == 200 + assert lr.limit_usage == {"x-ratelimit-remaining": "42"} + + +def test_async_client_error_raises(): + async def run(): + async with _client( + lambda r: httpx.Response(400, json={"error": "bad request"}) + ) as c: + await c.cex.ticker.get(exchange="binance", market="spot", symbol="BTC-USDT") + + with pytest.raises(ClientError): + _run(run()) + + +def test_async_retries_transient_5xx(): + calls = {"n": 0} + + def handler(request): + calls["n"] += 1 + if calls["n"] < 3: + return httpx.Response(503, json={"error": "busy"}) + return httpx.Response(200, json=_TICKER) + + async def run(): + async with AsyncDatamaxi( + api_key="k", + base_url=BASE_URL, + retry_backoff=0.0, + transport=httpx.MockTransport(handler), + ) as c: + return await c.cex.ticker.get( + exchange="binance", market="spot", symbol="BTC-USDT" + ) + + df = _run(run()) + assert isinstance(df, pd.DataFrame) + assert calls["n"] == 3 # 503, 503, 200 + + +def test_async_invalid_market_raises_before_request(): + async def run(): + async with _client(lambda r: httpx.Response(200, json=_TICKER)) as c: + await c.cex.candle(exchange="binance", market="bogus", symbol="BTC-USDT") + + with pytest.raises(ValueError): + _run(run())