Skip to content

Commit bb15d92

Browse files
wilsonfreitasclaude
andcommitted
Phase 6: Async API
- SGS async API: async_get_json() and async_get() using asyncio.gather() for concurrent requests - Currency async API: full async chain (_async_currency_id_list, _async_get_currency_list, _async_get_currency_id, _async_fetch_symbol_response, _async_get_symbol, _async_get_symbol_text, async_get) with concurrent HTTP requests via asyncio.gather() - OData async: ODataQuery.async_text() and ODataQuery.async_collect() using _ASYNC_CLIENT - OData API: EndpointQuery.async_collect(), Endpoint.async_get(), Endpoint.async_query() with same signature as sync versions - All validation passing: 24 tests, ruff format/check, mypy Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 1ecc48e commit bb15d92

4 files changed

Lines changed: 513 additions & 3 deletions

File tree

bcb/currency.py

Lines changed: 256 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import re
45
import threading
56
from datetime import date, timedelta
@@ -11,7 +12,7 @@
1112
import pandas as pd
1213
from lxml import html
1314

14-
from bcb.http import _CLIENT
15+
from bcb.http import _CLIENT, _ASYNC_CLIENT
1516
from bcb.exceptions import (
1617
BCBAPIError,
1718
BCBAPINotFoundError,
@@ -647,3 +648,257 @@ def get(
647648
raise ValueError("Unknown side value, use: bid, ask, both")
648649
else:
649650
raise CurrencyNotFoundError(f"Currency not found: {symbols}")
651+
652+
653+
async def _async_currency_id_list(
654+
cache: _ThreadSafeCache | None = None,
655+
) -> pd.DataFrame:
656+
"""Async version of _currency_id_list()."""
657+
cache = cache or _DEFAULT_CACHE
658+
cache_key = _CacheKey(type="currency_id_list")
659+
cached = cache.get(cache_key)
660+
if cached is not None:
661+
return cached
662+
663+
url1 = (
664+
"https://ptax.bcb.gov.br/ptax_internet/consultaBoletim.do?"
665+
"method=exibeFormularioConsultaBoletim"
666+
)
667+
res = await _ASYNC_CLIENT.get(url1)
668+
if res.status_code == 429:
669+
raise BCBRateLimitError(
670+
"BCB API rate limit exceeded. Please try again later.",
671+
status_code=429,
672+
)
673+
if res.status_code == 404:
674+
raise BCBAPINotFoundError(
675+
"BCB API endpoint not found (404)",
676+
status_code=404,
677+
)
678+
if res.status_code >= 500:
679+
raise BCBAPIError(
680+
f"BCB API server error (status {res.status_code})",
681+
status_code=res.status_code,
682+
)
683+
if res.status_code != 200:
684+
msg = f"BCB API Request error, status code = {res.status_code}"
685+
raise BCBAPIError(msg, res.status_code)
686+
687+
doc = html.parse(BytesIO(res.content)).getroot()
688+
xpath = "//select[@name='ChkMoeda']/option"
689+
x = [(elm.text, elm.get("value")) for elm in doc.xpath(xpath)]
690+
df = pd.DataFrame(x, columns=["name", "id"])
691+
df["id"] = df["id"].astype("int32")
692+
cache.set(cache_key, df)
693+
return df
694+
695+
696+
async def _async_get_valid_currency_list(
697+
_date: date, n: int = 0, max_rollback: int = 30
698+
) -> "httpx.Response":
699+
"""Async version of _get_valid_currency_list()."""
700+
days_rolled_back = (date.today() - _date).days
701+
if days_rolled_back > max_rollback:
702+
raise BCBAPIError(
703+
f"No currency list available in last {max_rollback} days",
704+
status_code=503,
705+
)
706+
707+
url2 = f"https://www4.bcb.gov.br/Download/fechamento/M{_date:%Y%m%d}.csv"
708+
try:
709+
res = await _ASYNC_CLIENT.get(url2)
710+
except Exception as ex:
711+
if n >= 3:
712+
raise ex
713+
return await _async_get_valid_currency_list(_date, n + 1, max_rollback)
714+
715+
if res.status_code == 200:
716+
return res
717+
else:
718+
return await _async_get_valid_currency_list(
719+
_date - timedelta(1), 0, max_rollback
720+
)
721+
722+
723+
async def _async_get_currency_list(
724+
cache: _ThreadSafeCache | None = None,
725+
) -> pd.DataFrame:
726+
"""Async version of get_currency_list()."""
727+
cache = cache or _DEFAULT_CACHE
728+
cache_key = _CacheKey(type="currency_list")
729+
cached = cache.get(cache_key)
730+
if cached is not None:
731+
return cached
732+
733+
res = await _async_get_valid_currency_list(date.today())
734+
df = pd.read_csv(StringIO(res.text), delimiter=";")
735+
df.columns = [
736+
"code",
737+
"name",
738+
"symbol",
739+
"country_code",
740+
"country_name",
741+
"type",
742+
"exclusion_date",
743+
]
744+
df = df.loc[~df["country_code"].isna()]
745+
df["exclusion_date"] = pd.to_datetime(df["exclusion_date"], dayfirst=True)
746+
df["country_code"] = df["country_code"].astype("int32")
747+
df["code"] = df["code"].astype("int32")
748+
df["symbol"] = df["symbol"].str.strip()
749+
cache.set(cache_key, df)
750+
return df
751+
752+
753+
async def _async_get_currency_id(symbol: str) -> int:
754+
"""Async version of _get_currency_id() with concurrent cache warming."""
755+
id_list, all_currencies = await asyncio.gather(
756+
_async_currency_id_list(),
757+
_async_get_currency_list(),
758+
)
759+
x = pd.merge(id_list, all_currencies, on=["name"])
760+
matches = x.loc[x["symbol"] == symbol, "id"]
761+
if matches.empty:
762+
raise CurrencyNotFoundError(f"Unknown currency symbol: {symbol}")
763+
return int(matches.max())
764+
765+
766+
async def _async_fetch_symbol_response(
767+
symbol: str, start_date: DateInput, end_date: DateInput
768+
) -> "httpx.Response":
769+
"""Async version of _fetch_symbol_response()."""
770+
cid = await _async_get_currency_id(symbol)
771+
url = _currency_url(cid, start_date, end_date)
772+
res = await _ASYNC_CLIENT.get(url)
773+
774+
if res.headers["Content-Type"].startswith("text/html"):
775+
doc = html.parse(BytesIO(res.content)).getroot()
776+
xpath = "//div[@class='msgErro']"
777+
elm = doc.xpath(xpath)[0]
778+
x = elm.text
779+
x = re.sub(r"^\W+", "", x)
780+
x = re.sub(r"\W+$", "", x)
781+
msg = f"BCB API returned error: {x} - {symbol}"
782+
raise BCBAPIError(msg, status_code=400)
783+
784+
if res.status_code == 429:
785+
raise BCBRateLimitError(
786+
"BCB API rate limit exceeded. Please try again later.",
787+
status_code=429,
788+
)
789+
if res.status_code == 404:
790+
raise BCBAPINotFoundError(
791+
f"Currency data not found for {symbol}",
792+
status_code=404,
793+
)
794+
if res.status_code >= 500:
795+
raise BCBAPIError(
796+
f"BCB API server error (status {res.status_code})",
797+
status_code=res.status_code,
798+
)
799+
if res.status_code != 200:
800+
raise BCBAPIError(
801+
f"BCB API request failed with status {res.status_code}",
802+
status_code=res.status_code,
803+
)
804+
805+
return res
806+
807+
808+
async def _async_get_symbol(
809+
symbol: str, start_date: DateInput, end_date: DateInput
810+
) -> pd.DataFrame:
811+
"""Async version of _get_symbol()."""
812+
res = await _async_fetch_symbol_response(symbol, start_date, end_date)
813+
df = _validate_currency_csv(res.text)
814+
df = _parse_currency_dates(df)
815+
df = _parse_currency_types(df)
816+
df1 = df.set_index("Date")
817+
n = ["bid", "ask"]
818+
df1 = df1[n]
819+
tuples = list(zip([symbol] * len(n), n))
820+
df1.columns = pd.MultiIndex.from_tuples(tuples)
821+
return df1
822+
823+
824+
async def _async_get_symbol_text(
825+
symbol: str, start_date: DateInput, end_date: DateInput
826+
) -> str:
827+
"""Async version of _get_symbol_text()."""
828+
res = await _async_fetch_symbol_response(symbol, start_date, end_date)
829+
return res.text
830+
831+
832+
async def async_get(
833+
symbols: Union[str, List[str]],
834+
start: DateInput,
835+
end: DateInput,
836+
side: str = "ask",
837+
groupby: str = "symbol",
838+
output: str = "dataframe",
839+
) -> Union[pd.DataFrame, str, Dict[str, str]]:
840+
"""
841+
Retorna um DataFrame pandas com séries temporais com taxas de câmbio (async version).
842+
843+
Uses :func:`asyncio.gather` to fetch multiple symbols concurrently.
844+
845+
Same signature as :func:`get`, but returns a coroutine.
846+
847+
Parameters
848+
----------
849+
symbols : str, List[str]
850+
Códigos das moedas padrão ISO
851+
start : str, int, date, datetime, Timestamp
852+
Data de início da série
853+
end : string, int, date, datetime, Timestamp
854+
Data final da série
855+
side : str
856+
``'ask'``, ``'bid'`` ou ``'both'``
857+
groupby : str
858+
``'symbol'`` ou ``'side'``
859+
output : str
860+
``'dataframe'`` ou ``'text'``
861+
862+
Returns
863+
-------
864+
Union[pd.DataFrame, str, Dict[str, str]]
865+
Série temporal conforme especificado
866+
"""
867+
if isinstance(symbols, str):
868+
symbols = [symbols]
869+
870+
if output == "text":
871+
results: Dict[str, str] = {}
872+
texts = await asyncio.gather(
873+
*[_async_get_symbol_text(symbol, start, end) for symbol in symbols]
874+
)
875+
for symbol, text in zip(symbols, texts):
876+
if text is not None:
877+
results[symbol] = text
878+
if not results:
879+
raise CurrencyNotFoundError(f"Currency not found: {symbols}")
880+
if len(symbols) == 1:
881+
return results[symbols[0]]
882+
return results
883+
884+
dss = await asyncio.gather(
885+
*[_async_get_symbol(symbol, start, end) for symbol in symbols]
886+
)
887+
dss = [df for df in dss if df is not None]
888+
889+
if len(dss) > 0:
890+
df = pd.concat(dss, axis=1)
891+
if side in ("bid", "ask"):
892+
dx = df.reorder_levels([1, 0], axis=1).sort_index(axis=1)
893+
return dx[side]
894+
elif side == "both":
895+
if groupby == "symbol":
896+
return df
897+
elif groupby == "side":
898+
return df.reorder_levels([1, 0], axis=1).sort_index(axis=1)
899+
else:
900+
raise ValueError("Unknown groupby value, use: symbol, side")
901+
else:
902+
raise ValueError("Unknown side value, use: bid, ask, both")
903+
else:
904+
raise CurrencyNotFoundError(f"Currency not found: {symbols}")

0 commit comments

Comments
 (0)