Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions datamaxi/_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Transport-agnostic request helpers shared by the sync and async clients.

Keeping the endpoint resolution and error handling here (rather than in
``API``) lets the ``httpx``-based async client reuse exactly the same
param-splitting and error semantics as the sync ``requests`` client, so the
two can't drift.
"""

import json
from json import JSONDecodeError

from datamaxi.error import ClientError, ServerError
from datamaxi.lib.utils import check_required_parameter
from datamaxi._endpoints import ENDPOINTS


def resolve_endpoint(op_id, **params):
"""Resolve ``op_id`` + caller params into ``(method, url_path, query)``.

Uses ``datamaxi._endpoints.ENDPOINTS`` (generated from the backend
OpenAPI spec) as the single source of truth for path, method, the
path/query split, required params, and defaults.
"""
ep = ENDPOINTS.get(op_id)
if ep is None:
raise ValueError(f"unknown endpoint operation_id: {op_id!r}")

spec_params = ep.get("params", {})

unknown = set(params) - set(spec_params)
if unknown:
raise ValueError(
f"{op_id}: unknown parameter(s) {sorted(unknown)}; "
f"expected one of {sorted(spec_params)}"
)

# Resolve each value: caller-supplied, else the registry default.
values = {}
for name, meta in spec_params.items():
val = params.get(name)
if val is None and "default" in meta:
val = meta["default"]
values[name] = val

# Enforce params the spec marks required.
for name, meta in spec_params.items():
if meta.get("required"):
check_required_parameter(values.get(name), name)

# Split path vs query params; interpolate path params into the URL.
url_path = ep["path"]
query_params = {}
for name, meta in spec_params.items():
if meta.get("in") == "path":
url_path = url_path.replace("{" + name + "}", str(values[name]))
else:
query_params[name] = values[name]

return ep["method"], url_path, query_params


def raise_for_error(status_code, text, headers):
"""Raise ``ClientError`` / ``ServerError`` for a 4xx / 5xx response.

Works on any response given its ``status_code`` / ``text`` / ``headers``,
so it applies identically to ``requests`` and ``httpx`` responses.
"""
if status_code < 400:
return
if 400 <= status_code < 500:
try:
err = json.loads(text)
except JSONDecodeError:
raise ClientError(status_code, text, None, headers)
error_data = None
if "data" in err:
error_data = err["data"]
raise ClientError(status_code, err["error"], headers, error_data)
raise ServerError(status_code, text)


def extract_limit_usage(headers):
"""Pull the ``x-ratelimit-*`` triplet out of the response headers."""
usage = {}
for key in headers.keys():
k = key.lower()
if (
k.startswith("x-ratelimit-limit")
or k.startswith("x-ratelimit-remaining")
or k.startswith("x-ratelimit-reset")
):
usage[k] = headers[key]
return usage
280 changes: 280 additions & 0 deletions datamaxi/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
"""Async client (pilot) — ``httpx``-based, mirrors a slice of the sync surface.

Requires the ``async`` extra::

pip install "datamaxi[async]"

Usage::

from datamaxi.aio import AsyncDatamaxi

async with AsyncDatamaxi(api_key="...") as client:
df = await client.cex.candle(exchange="binance", market="spot",
symbol="BTC-USDT")
ticker = await client.cex.ticker.get(exchange="binance", market="spot",
symbol="BTC-USDT")

This is a deliberately small pilot (candle + ticker). It reuses the sync
client's endpoint resolution and error handling (``datamaxi._dispatch``) and
the shared DataFrame / ResponseMeta helpers, so the two clients can't drift on
request building or error semantics.
"""

from __future__ import annotations

import asyncio
import os
from typing import Any, Union, TYPE_CHECKING

from datamaxi.__version__ import __version__
from datamaxi.api import ResponseMeta
from datamaxi._dispatch import resolve_endpoint, raise_for_error, extract_limit_usage
from datamaxi.lib.constants import (
BASE_URL,
SPOT,
FUTURES,
USD,
INTERVAL_1D,
Market,
Interval,
)
from datamaxi.lib.utils import check_required_parameters
from datamaxi.resources.responses import CandleResponse, TickerResponse

if TYPE_CHECKING:
import pandas as pd


def _import_httpx():
try:
import httpx
except ImportError as exc: # pragma: no cover - exercised via extra
raise ImportError(
"The async client requires httpx. Install it with: "
"pip install 'datamaxi[async]'"
) from exc
return httpx


class AsyncAPI:
"""Async transport built on ``httpx.AsyncClient``.

Mirrors the sync ``API``: shared endpoint resolution, bounded retry of
transient gateway 5xx, the same ``ClientError`` / ``ServerError`` contract,
and ``last_response`` metadata.
"""

def __init__(
self,
api_key=None,
base_url=None,
timeout=10,
max_retries=3,
retry_backoff=0.5,
retry_statuses=(502, 503, 504),
transport=None,
):
httpx = _import_httpx()
self.api_key = api_key or os.environ.get("DATAMAXI_API_KEY")
self.base_url = base_url
self.timeout = timeout
self.max_retries = max_retries
self.retry_backoff = retry_backoff
self.retry_statuses = tuple(retry_statuses)
self.last_response = None
self._client = httpx.AsyncClient(
base_url=base_url or "",
timeout=timeout,
transport=transport,
headers={
"Content-Type": "application/json;charset=utf-8",
"User-Agent": "datamaxi/" + __version__,
"X-DTMX-APIKEY": str(self.api_key),
},
)

async def request_endpoint(self, op_id, **params):
method, url_path, query_params = resolve_endpoint(op_id, **params)
return await self.send_request(method, url_path, payload=query_params)

async def send_request(self, method, url_path, payload=None):
# str()-encode scalars so bools match the sync client's urlencode
# output (e.g. include_source -> "True", not httpx's "true").
params = {k: str(v) for k, v in (payload or {}).items() if v is not None}
for attempt in range(self.max_retries + 1):
response = await self._client.request(method, url_path, params=params)
if (
response.status_code in self.retry_statuses
and attempt < self.max_retries
):
await asyncio.sleep(self.retry_backoff * (attempt + 1))
continue
break

raise_for_error(response.status_code, response.text, response.headers)

try:
data = response.json()
except ValueError:
data = response.text

self.last_response = ResponseMeta(
status_code=response.status_code,
headers=response.headers,
limit_usage=extract_limit_usage(response.headers),
data=data,
)
return data

async def aclose(self):
await self._client.aclose()

async def __aenter__(self):
return self

async def __aexit__(self, *exc):
await self.aclose()


class AsyncResource:
"""Base for async resources — composes a shared ``AsyncAPI``."""

def __init__(self, api: "AsyncAPI"):
self._api = api

async def request_endpoint(self, op_id, **params):
return await self._api.request_endpoint(op_id, **params)

@property
def last_response(self):
return self._api.last_response


class AsyncCexCandle(AsyncResource):
async def __call__(
self,
exchange: str,
market: Market,
symbol: str,
currency: str = USD,
interval: Interval = INTERVAL_1D,
from_unix: str = None,
to_unix: str = None,
pandas: bool = True,
) -> Union[pd.DataFrame, CandleResponse]:
"""Fetch candle data (async). See ``datamaxi.Datamaxi.cex.candle``."""
check_required_parameters(
[
[exchange, "exchange"],
[symbol, "symbol"],
[interval, "interval"],
[market, "market"],
[currency, "currency"],
]
)
if market not in [SPOT, FUTURES]:
raise ValueError("market must be either spot or futures")

res = await self.request_endpoint(
"cex_candle",
exchange=exchange,
market=market,
symbol=symbol,
interval=interval,
currency=currency,
**{"from": from_unix, "to": to_unix},
)
if res["data"] is None or len(res["data"]) == 0:
raise ValueError("no data found")

if pandas:
from datamaxi.resources.utils import convert_data_to_data_frame

return convert_data_to_data_frame(res["data"])
return res


class AsyncCexTicker(AsyncResource):
async def get(
self,
exchange: str,
symbol: str,
market: Market,
currency: str = None,
conversion_base: str = None,
include_source: bool = False,
pandas: bool = True,
) -> Union[pd.DataFrame, TickerResponse]:
"""Fetch ticker data (async). See ``datamaxi.Datamaxi.cex.ticker``."""
check_required_parameters(
[
[exchange, "exchange"],
[symbol, "symbol"],
[market, "market"],
]
)
if market not in [SPOT, FUTURES]:
raise ValueError("market must be either spot or futures")

res = await self.request_endpoint(
"ticker",
exchange=exchange,
symbol=symbol,
market=market,
currency=currency,
conversion_base=conversion_base,
include_source=include_source,
)

if pandas:
import pandas as pd

df = pd.DataFrame([res["data"]])
df = df.set_index("d")
return df
return res


class AsyncCex(AsyncResource):
def __init__(self, api: "AsyncAPI"):
super().__init__(api)
self.candle = AsyncCexCandle(api)
self.ticker = AsyncCexTicker(api)


class AsyncDatamaxi:
"""Async entrypoint (pilot). Exposes ``cex.candle`` and ``cex.ticker``.

Use as an async context manager so the underlying ``httpx`` client is
closed, or call :meth:`aclose` explicitly.
"""

def __init__(self, api_key=None, **kwargs: Any):
if "base_url" not in kwargs:
kwargs["base_url"] = BASE_URL
self._api = AsyncAPI(api_key, **kwargs)
self.cex = AsyncCex(self._api)

async def aclose(self):
await self._api.aclose()

async def __aenter__(self):
return self

async def __aexit__(self, *exc):
await self.aclose()

def __repr__(self):
return "AsyncDatamaxi(base_url={!r}, has_key={})".format(
self._api.base_url, bool(self._api.api_key)
)


__all__ = [
"AsyncDatamaxi",
"AsyncAPI",
"AsyncResource",
"AsyncCex",
"AsyncCexCandle",
"AsyncCexTicker",
]
Loading
Loading