diff --git a/README.md b/README.md index 9a60b265..15b719dd 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,7 @@ Reach to [stac-fastapi documentation](https://stac-utils.github.io/stac-fastapi/ | name | description | default value | | --- | --- | --- | | `DEBUG` | When set to `True`, set the EODAG logging level to `3`. Otherwise, set EODAG logging level to `2`. | False | +| `DATABASE_TYPE` | The type of database used by EODAG to store data (`sqlite` or `postgresql`). | "sqlite" | | `KEEP_ORIGIN_URL` | Keep origin as alternate URL when data-download extension is enabled. | False | | `ORIGIN_URL_BLACKLIST` | Hide from clients items assets' origin URLs starting with URLs from the list. A string of comma separated values is expected. | "" | | `COUNT` | Whether to run a query with a count request or not. | False | @@ -133,6 +134,20 @@ export APP_PORT=8080 # change app port because otel-collector uses 8000 python stac_fastapi/eodag/app.py ``` +### PostgreSQL database parameters + +When `DATABASE_TYPE` is set to `postgresql`, the connection is configured through the standard libpq environment variables. + +| name | description | default value | +| --- | --- | --- | +| `PGHOST` | Hostname or IP address of the PostgreSQL server. | "" | +| `PGPORT` | Port on which the PostgreSQL server is listening. | "" | +| `PGUSER` | PostgreSQL user name used to authenticate. | "" | +| `PGDATABASE` | Name of the database to connect to. | "" | +| `PGPASSWORD` | Password used to authenticate the PostgreSQL user. | "" | + +Any variable that is left unset is omitted from the connection string, letting libpq apply its own defaults (e.g. `PGHOST` defaults to a local connection on the same machine, `PGPORT` to `5432`, `PGUSER` to the current OS user name, `PGDATABASE` to the value of `PGUSER`). + ### EODAG parameters EODAG configuration parameters are available from [EODAG documentation](https://eodag.readthedocs.io/en/stable/getting_started_guide/configure.html). diff --git a/pyproject.toml b/pyproject.toml index cb82a9af..42ec15bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,13 +8,14 @@ license = { file = "LICENSE" } requires-python = ">= 3.9" dependencies = [ "attr", - "eodag[all-providers] @ git+https://github.com/CS-SI/eodag.git@develop", + "eodag[all-providers] @ git+https://github.com/CS-SI/eodag.git@db-for-collections-storage", "fastapi", "geojson", "geojson-pydantic", "orjson", "pydantic", "pydantic_core", + "cql2", "pygeofilter", "stac-fastapi.api >= 4.0", "stac-fastapi.extensions", @@ -32,7 +33,10 @@ Repository = "https://github.com/CS-SI/stac-fastapi-eodag.git" "Bug Tracker" = "https://github.com/CS-SI/stac-fastapi-eodag/issues/" [project.optional-dependencies] -server = ["uvicorn[standard]"] +server = ["uvicorn[standard]", "stac_fastapi.eodag[postgresql]"] +postgresql = [ + "psycopg[binary] >= 3.1.10", +] telemetry = [ "opentelemetry-api", "opentelemetry-sdk", @@ -65,7 +69,13 @@ explicit_package_bases = true exclude = ["tests", ".venv"] [[tool.mypy.overrides]] -module = ["geojson", "pygeofilter", "pygeofilter.*", "stac_fastapi", "stac_fastapi.*"] +module = [ + "geojson", + "pygeofilter", + "pygeofilter.*", + "stac_fastapi", + "stac_fastapi.*", +] ignore_missing_imports = true [tool.pytest.ini_options] diff --git a/stac_fastapi/eodag/app.py b/stac_fastapi/eodag/app.py index c232103a..7327cc6a 100644 --- a/stac_fastapi/eodag/app.py +++ b/stac_fastapi/eodag/app.py @@ -30,7 +30,6 @@ from fastapi.responses import ORJSONResponse from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import ( - EmptyRequest, ItemCollectionUri, create_get_request_model, create_post_request_model, @@ -38,6 +37,7 @@ ) from stac_fastapi.extensions.core import ( CollectionSearchExtension, + CollectionSearchFilterExtension, FilterExtension, FreeTextExtension, QueryExtension, @@ -114,10 +114,12 @@ # collection_search extensions cs_extensions_map = { - "query": QueryExtension(conformance_classes=[QueryConformanceClasses.COLLECTIONS]), - "offset-pagination": OffsetPaginationExtension(), - "collection-search": CollectionSearchExtension(), - "free-text": FreeTextExtension(conformance_classes=[FreeTextConformanceClasses.COLLECTIONS]), + "sort": SortExtension(conformance_classes=[SortConformanceClasses.COLLECTIONS]), + "filter": CollectionSearchFilterExtension(client=FiltersClient()), + "free_text": FreeTextExtension( + conformance_classes=[FreeTextConformanceClasses.COLLECTIONS], + ), + "pagination": OffsetPaginationExtension(), } # item_collection extensions @@ -128,11 +130,14 @@ "filter": FilterExtension(client=FiltersClient(stac_metadata_model=stac_metadata_model)), } +collection_search_extension = CollectionSearchExtension.from_extensions(cs_extensions_map.values()) + all_extensions = { **search_extensions_map, **cs_extensions_map, **itm_col_extensions_map, **{ + "collection-search": collection_search_extension, "data-download": DataDownload(), "collection-order": CollectionOrderExtension( client=BaseCollectionOrderClient(stac_metadata_model=stac_metadata_model) @@ -194,13 +199,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: search_post_model = create_post_request_model(search_extensions) search_get_model = create_get_request_model(search_extensions) -collections_model = create_request_model( - "CollectionsRequest", - base_model=EmptyRequest, - extensions=get_enabled_extensions(cs_extensions_map), - request_type="GET", -) - item_collection_model = create_request_model( "ItemsRequest", base_model=ItemCollectionUri, @@ -221,7 +219,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: response_class=ORJSONResponse, search_get_request_model=search_get_model, search_post_request_model=search_post_model, - collections_get_request_model=collections_model, + collections_get_request_model=collection_search_extension.GET, items_get_request_model=item_collection_model, middlewares=[ Middleware(BrotliMiddleware), diff --git a/stac_fastapi/eodag/config.py b/stac_fastapi/eodag/config.py index 121b12bf..831a1715 100644 --- a/stac_fastapi/eodag/config.py +++ b/stac_fastapi/eodag/config.py @@ -20,7 +20,7 @@ from __future__ import annotations from functools import lru_cache -from typing import Annotated, Union +from typing import Annotated, Literal, Union from pydantic import Field from pydantic.functional_validators import BeforeValidator @@ -34,6 +34,12 @@ class Settings(ApiSettings): debug: bool = False + database_type: Annotated[Literal["sqlite", "postgresql"], Field] = Field( + default="sqlite", + description="Type of database to use for STAC API backend", + validate_default=False, + ) + keep_origin_url: bool = Field( default=False, description=("Keep origin as alternate URL when data-download extension is enabled."), @@ -71,6 +77,13 @@ class Settings(ApiSettings): alias="validate", ) + provider_online_status_threshold: float = Field( + default=0.8, + description="Threshold for considering a provider as online based" + "on the statuses of the all collections provided", + alias="provider_online_status_threshold", + ) + @lru_cache(maxsize=1) def get_settings() -> Settings: diff --git a/stac_fastapi/eodag/core.py b/stac_fastapi/eodag/core.py index d673ee88..22a1c3aa 100644 --- a/stac_fastapi/eodag/core.py +++ b/stac_fastapi/eodag/core.py @@ -22,21 +22,18 @@ import asyncio import logging import re -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, Any, Optional, cast from urllib.parse import unquote_plus import attr +import cql2 import orjson from fastapi import HTTPException from pydantic import ValidationError from pydantic_core import InitErrorDetails, PydanticCustomError -from pygeofilter.backends.cql2_json import to_cql2 from pygeofilter.parsers.cql2_json import parse as parse_json -from pygeofilter.parsers.cql2_text import parse as parse_cql2_text -from stac_fastapi.api.models import create_post_request_model from stac_fastapi.types.errors import NotFoundError from stac_fastapi.types.requests import get_base_url -from stac_fastapi.types.rfc3339 import str_to_interval from stac_fastapi.types.search import BaseSearchPostRequest from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection from stac_pydantic.links import Relations @@ -46,8 +43,6 @@ from eodag.api.collection import Collection as EodagCollection from eodag.api.collection import CollectionsList from eodag.plugins.search.build_search_result import ECMWFSearch -from eodag.utils import deepcopy, get_geometry_from_various -from eodag.utils.exceptions import NoMatchingCollection as EodagNoMatchingCollection from stac_fastapi.eodag.client import CustomCoreClient from stac_fastapi.eodag.config import get_settings from stac_fastapi.eodag.constants import DEFAULT_LIMIT @@ -62,15 +57,13 @@ ) from stac_fastapi.eodag.models.stac_metadata import CommonStacMetadata from stac_fastapi.eodag.utils import ( - check_poly_is_point, - dt_range_to_eodag, format_datetime_range, is_dict_str_any, str2json, ) if TYPE_CHECKING: - from typing import Optional, Union + from typing import Union from fastapi import Request from pydantic import BaseModel @@ -90,56 +83,14 @@ class EodagCoreClient(CustomCoreClient): post_request_model: type[BaseModel] = attr.ib(default=BaseSearchPostRequest) stac_metadata_model: type[CommonStacMetadata] = attr.ib(default=CommonStacMetadata) - def _get_collection( - self, collection: EodagCollection, request: Request, collections_providers: dict[str, set] - ) -> Collection: - """Convert a EODAG produt type to a STAC collection.""" - # extend collection with external stac collection if any - extended_collection = Collection(deepcopy(request.app.state.ext_stac_collections.get(collection.id, {}))) - extended_collection["type"] = "Collection" - - platform_value = [p for p in (collection.platform or "").split(",") if p] - constellation = [c for c in (collection.constellation or "").split(",") if c] - processing_level = [pl for pl in (collection.processing_level or "").split(",") if pl] - instruments = collection.instruments or [] - federation_backends = collections_providers.get(collection._id, set()) - - summaries: dict[str, Any] = { - "platform": platform_value, - "constellation": constellation, - "processing:level": processing_level, - "instruments": instruments, - "federation:backends": federation_backends, - } - extended_collection["summaries"] = { - **(getattr(collection, "summaries", {}) or {}), - **{k: v for k, v in summaries.items() if v}, - } - - extended_collection["extent"] = { - "spatial": extended_collection.get("extent", {}).get("spatial") - or collection.extent.spatial.to_dict() - or {"bbox": [[-180.0, -90.0, 180.0, 90.0]]}, - "temporal": extended_collection.get("extent", {}).get("temporal") - or collection.extent.temporal.to_dict() - or {"interval": [[None, None]]}, - } - - for key in ["license", "description", "title"]: - if key not in extended_collection and (value := getattr(collection, key)): - extended_collection[key] = value - - keywords = collection.keywords or [] - keywords = keywords.split(",") if isinstance(keywords, str) else keywords - try: - extended_collection["keywords"] = list(set(keywords + extended_collection.get("keywords", []))) - except TypeError as e: - logger.warning("Could not merge keywords from external collection for %s: %s", collection.id, str(e)) - - extended_collection["id"] = collection.id + def _format_collection(self, collection: EodagCollection, request: Request) -> Collection: + """Convert a EODAG STAC collection to a STAC collection for API.""" # keep only federation backends which allow order mechanism # to create "retrieve" collection links from them + # TODO: this needs to be changed: we cannot request the search plugins for each collection, it is too costly. + # TODO: We should find a way to know which federation backends support + # the order mechanism without requesting the plugins manager def has_ecmwf_search_plugin(federation_backends, request): for fb in federation_backends: search_plugins = request.app.state.dag._plugins_manager.get_search_plugins(provider=fb) @@ -148,22 +99,37 @@ def has_ecmwf_search_plugin(federation_backends, request): return False extension_names = [type(ext).__name__ for ext in self.extensions] + + federation_backends = set( + request.app.state.dag.db.get_federation_backends(collection=collection._id, enabled=True) + ) if self.extension_is_enabled("CollectionOrderExtension") and not has_ecmwf_search_plugin( federation_backends, request ): extension_names.remove("CollectionOrderExtension") - if collection.links: - extra_links = [link.model_dump() for link in collection.links.root] - else: - extra_links = [] - extended_coll_links = extended_collection.get("links", []) - extended_collection["links"] = CollectionLinks( - collection_id=extended_collection["id"], + coll_dict = collection.model_dump(mode="json", exclude={"alias", "eodag_stac_collection"}) + for link in coll_dict["links"]: + if link.get("label:assets") is None: + link.pop("label:assets") + assets = coll_dict.get("assets") + if isinstance(assets, dict): + for asset in assets.values(): + if asset.get("description") is None: + asset.pop("description", None) + + # add API-required links + all_coll_links = CollectionLinks( + collection_id=collection.id, request=request, - ).get_links(extensions=extension_names, extra_links=extra_links + extended_coll_links) + ).get_links(extensions=extension_names, extra_links=coll_dict["links"]) - return Collection(**extended_collection) + # move federation:backends to summaries + if not coll_dict.get("summaries"): + coll_dict["summaries"] = {} + coll_dict["summaries"]["federation:backends"] = coll_dict.pop("federation:backends") + coll_dict["links"] = all_coll_links + return Collection(**coll_dict) async def _search_base(self, search_request: BaseSearchPostRequest, request: Request) -> ItemCollection: eodag_args = prepare_search_base_args(search_request=search_request, model=self.stac_metadata_model) @@ -176,7 +142,7 @@ async def _search_base(self, search_request: BaseSearchPostRequest, request: Req # check if the collection exists if collection := eodag_args.get("collection"): - all_coll = await asyncio.to_thread(request.app.state.dag.list_collections, fetch_providers=False) + all_coll = await asyncio.to_thread(request.app.state.dag.list_collections) # only check the first collection (EODAG search only support a single collection) existing_coll = [coll for coll in all_coll if coll.id == collection] if not existing_coll: @@ -237,9 +203,13 @@ async def all_collections( bbox: Optional[list[NumType]] = None, datetime: Optional[str] = None, limit: Optional[int] = 10, + # Extensions offset: Optional[int] = 0, q: Optional[list[str]] = None, - query: Optional[str] = None, + sortby: Optional[list[str]] = None, + filter_expr: Optional[str] = None, + filter_lang: Optional[str] = None, + **kwargs: Any, ) -> Collections: """ Get all collections from EODAG. @@ -250,7 +220,10 @@ async def all_collections( :param limit: Maximum number of collections to return. :param offset: Starting position from which to return collections. :param q: Query string to filter the collections. - :param query: Query string to filter collections. + :param query: Query string to filter the search. + :param sortby: List of fields to sort the results by. + :param filter_expr: CQL filter to apply to the search. + :param filter_lang: Language of the filter. :returns: All collections. :raises HTTPException: If the unsupported bbox parameter is provided. """ @@ -260,68 +233,32 @@ async def all_collections( prev_link: Optional[dict[str, Any]] = None first_link: Optional[dict[str, Any]] = None - # get provider filter - provider = None - if query: - query_attr = orjson.loads(unquote_plus(query)) - parsed_query = parse_query(query_attr) - provider = parsed_query.get("federation:backends") - provider = provider[0] if isinstance(provider, list) else provider - - all_colls = await asyncio.to_thread( - request.app.state.dag.list_collections, provider=provider, fetch_providers=False - ) - - # datetime & free-text-search filters - if any((q, datetime)): - start, end = dt_range_to_eodag(str_to_interval(datetime)) - - # q is always a list, per stac-api free_text extension definiton - # Expanding with AND as default. - free_text = " AND ".join(q or []) - - try: - guessed_collections = await asyncio.to_thread( - request.app.state.dag.guess_collection, - free_text=free_text, - start_date=start, - end_date=end, - ) - guessed_collections_ids = [coll.id for coll in guessed_collections] - except EodagNoMatchingCollection: - collections = CollectionsList([]) + cql2_json = None + if filter_expr: + if filter_lang == "cql2-text": + cql2_json = cql2.parse_text(filter_expr).to_json() + elif filter_lang == "cql2-json": + cql2_json = str2json("filter_expr", filter_expr) else: - collections = CollectionsList([coll for coll in all_colls if coll.id in guessed_collections_ids]) - else: - collections = all_colls - - providers = request.app.state.dag.providers - collections_providers: dict[str, set] = {} - for p_name, p in providers.items(): - if getattr(p.config, "products", None): - for coll in p.config.products: - if coll not in collections_providers: - collections_providers[coll] = set() - collections_providers[coll].add(p_name) - - formatted_collections = [self._get_collection(coll, request, collections_providers) for coll in collections] - - # bbox filter - if bbox: - bbox_geom = get_geometry_from_various(geometry=bbox) - - default_extent = [[-180.0, -90.0, 180.0, 90.0]] - formatted_collections = [ - c - for c in formatted_collections - if check_poly_is_point( - get_geometry_from_various( # type: ignore - geometry=c.get("extent", {}).get("spatial", {}).get("bbox", default_extent)[0] - ) - ).intersection(bbox_geom) - ] + raise HTTPException(status_code=400, detail=f"Unsupported filter_lang {filter_lang}") + + if not self.extension_is_enabled("OffsetPaginationExtension"): + limit = None + + collections = cast( + CollectionsList, + await asyncio.to_thread( + request.app.state.dag.list_collections, + geometry=bbox, + datetime=datetime, + limit=limit, + q=" ".join(q) if q else None, + cql2_json=cql2_json, + sortby=sortby, + ), + ) - total = len(formatted_collections) + number_matched = cast(int, collections.number_matched) links = [ { @@ -336,9 +273,12 @@ async def all_collections( limit = limit if limit is not None else 10 offset = offset if offset is not None else 0 - formatted_collections = formatted_collections[offset : offset + limit] + collections = collections[offset : offset + limit] + # info about number matched was lost during the slice, then restore it + # TODO: find a way to not lose it during the slice + collections.number_matched = number_matched - if offset + limit < total: + if offset + limit < collections.number_matched: next_link = {"body": {"limit": limit, "offset": offset + limit}} if offset > 0: @@ -346,6 +286,9 @@ async def all_collections( first_link = {"body": {"limit": limit, "offset": 0}} + # format collections + formatted_collections = [self._format_collection(coll, request) for coll in collections] + extension_names = [type(ext).__name__ for ext in self.extensions] paging_links = CollectionSearchPagingLinks( @@ -357,8 +300,8 @@ async def all_collections( return Collections( collections=formatted_collections, links=links, - numberMatched=total, - numberReturned=len(formatted_collections), + numberMatched=collections.number_matched, + numberReturned=len(collections), ) async def get_collection(self, collection_id: str, request: Request, **kwargs: Any) -> Collection: @@ -373,21 +316,14 @@ async def get_collection(self, collection_id: str, request: Request, **kwargs: A :returns: The collection. :raises NotFoundError: If the collection does not exist. """ - all_collections = await asyncio.to_thread(request.app.state.dag.list_collections, fetch_providers=False) - collection = next( - (c for c in all_collections if c.id == collection_id), - None, + collection = cast( + Optional[EodagCollection], await asyncio.to_thread(request.app.state.dag.get_collection, id=collection_id) ) + if collection is None: raise NotFoundError(f"Collection {collection_id} does not exist.") - providers = request.app.state.dag.providers - collection_providers: dict[str, set] = {collection._id: set()} - for p_name, p in providers.items(): - if getattr(p.config, "products", None) and collection._id in p.config.products: - collection_providers[collection._id].add(p_name) - - return self._get_collection(collection, request, collection_providers) + return self._format_collection(collection, request) async def item_collection( self, @@ -430,10 +366,11 @@ async def item_collection( ) search_request = self.post_request_model.model_validate(clean) - item_collection = await self._search_base(search_request, request) + item_collection = cast(ItemCollection, await self._search_base(search_request, request)) extension_names = [type(ext).__name__ for ext in self.extensions] + extra_links = item_collection.get("links", []) links = ItemCollectionLinks(collection_id=collection_id, request=request).get_links( - extensions=extension_names, extra_links=item_collection["links"] + extensions=extension_names, extra_links=extra_links ) item_collection["links"] = links return item_collection @@ -449,10 +386,6 @@ async def post_search( :param kwargs: Additional keyword arguments. :returns: Found items. """ - search_post_model = create_post_request_model(self.extensions) - request_json = await request.json() - search_post_model.model_validate(request_json, extra="forbid") - return await self._search_base(search_request, request) async def get_search( @@ -549,10 +482,11 @@ def _clean_search_args( """Clean up search arguments to match format expected by pgstac""" if filter_expr: if filter_lang == "cql2-text": - filter_expr = to_cql2(parse_cql2_text(filter_expr)) - filter_lang = "cql2-json" - - base_args["filter"] = str2json("filter_expr", filter_expr) + base_args["filter"] = cql2.parse_text(filter_expr).to_json() + elif filter_lang == "cql2-json": + base_args["filter"] = str2json("filter_expr", filter_expr) + else: + raise HTTPException(status_code=400, detail=f"Unsupported filter_lang {filter_lang}") base_args["filter_lang"] = "cql2-json" if datetime: diff --git a/stac_fastapi/eodag/dag.py b/stac_fastapi/eodag/dag.py index 93c41f8f..291b34f0 100644 --- a/stac_fastapi/eodag/dag.py +++ b/stac_fastapi/eodag/dag.py @@ -20,12 +20,12 @@ from __future__ import annotations import logging +import os from typing import TYPE_CHECKING -from stac_pydantic.collection import Extent, SpatialExtent, TimeInterval - from eodag import EODataAccessGateway -from eodag.api.collection import CollectionsList +from eodag.api.collection import CollectionsDict +from eodag.databases.base import Database from eodag.utils.exceptions import ( RequestError, TimeOutError, @@ -34,10 +34,12 @@ from stac_fastapi.eodag.config import get_settings if TYPE_CHECKING: - from typing import Any, Union + from typing import Any, Optional from fastapi import FastAPI + from eodag.api.collection import CollectionsDict, CollectionsList + logger = logging.getLogger(__name__) @@ -70,55 +72,151 @@ def fetch_external_stac_collections( return ext_stac_collections -def init_dag(app: FastAPI) -> None: - """Init EODataAccessGateway server instance, pre-running all time consuming tasks""" - settings = get_settings() +def get_providers_status(collections_list: CollectionsList) -> dict[str, dict[str, Any]]: + """Get the status of all providers based on their collections' status. - dag = EODataAccessGateway() - - ext_stac_collections = fetch_external_stac_collections( - dag.list_collections(fetch_providers=settings.fetch_providers) - ) - - app.state.ext_stac_collections = ext_stac_collections - - # update eodag collections config form external stac collections - for c, c_f in dag.collections_config.items(): - for key in (c, getattr(c_f, "alias", None)): - if key is None: - continue - ext_col = ext_stac_collections.get(key) - if not ext_col: - continue - - platform: Union[str, list[str]] = ext_col.get("summaries", {}).get("platform") - constellation: Union[str, list[str]] = ext_col.get("summaries", {}).get("constellation") - instruments: Union[str, list[str]] = ext_col.get("summaries", {}).get("instruments") - processing_level: Union[str, list[str]] = ext_col.get("summaries", {}).get("processing:level") - if isinstance(platform, list): - platform = ",".join(platform) - if isinstance(constellation, list): - constellation = ",".join(constellation) - if isinstance(processing_level, list): - processing_level = ",".join(processing_level) - ext_extent = ext_col["extent"] - temporal_ext = TimeInterval(**ext_extent.get("temporal", [[None, None]])) - spatial_ext = SpatialExtent(**ext_extent.get("spatial", {"bbox": [[-180.0, -90.0, 180.0, 90.0]]})) - - update_fields: dict[str, Any] = { - "title": c_f.title or ext_col.get("title"), - "description": c_f.description or ext_col["description"], - "keywords": ext_col.get("keywords"), - "instruments": c_f.instruments or instruments, - "platform": c_f.platform or platform, - "constellation": c_f.constellation or constellation, - "processing_level": c_f.processing_level or processing_level, - "license": ext_col["license"], - "extent": Extent(temporal=temporal_ext, spatial=spatial_ext), + :param collections_list: list of EODAG collections + :return: Dictionary of providers' status indexed by provider ID + """ + settings = get_settings() + online_threshold = settings.provider_online_status_threshold + # the key of providers_status is the provider ID + providers_status: dict[str, dict[str, Any]] = {} + + def _get_last_check(status_1: dict[str, Any], status_2: dict[str, Any], key: str) -> Optional[str]: + """Get the most recent check date between two status dictionaries + + The comparision handle possible `None` values in the dates. + + :param status_1: First status dictionary + :param status_2: Second status dictionary + :param key: Key to check in the status dictionaries (e.g. "last_status_check" or "last_successful_check") + :return: Most recent check date as string, or None if not available in both dictionaries + """ + check_1 = status_1.get(key) + check_2 = status_2.get(key) + if check_1 and check_2: + return max(check_1, check_2) + elif check_1: + return check_1 + elif check_2: + return check_2 + else: + return None + + # count how many collections are available for each provider + for collection in collections_list: + providers = collection.federation_backends + for provider_id in providers: + d = { + "count_online": 0, + "count_offline": 0, + "total_collections": 0, + "last_status_check": None, + "last_successful_check": None, } - clean = {k: v for k, v in update_fields.items() if v is not None} - for field, value in clean.items(): - setattr(c_f, field, value) + ps = providers_status.setdefault(provider_id, d) + providers_status[provider_id]["total_collections"] += 1 + + # count how many online/offline collections for each provider and keep the last check dates + for collection in collections_list: + if not getattr(collection, "federation", None): + continue + for provider_id, status in collection.federation.items(): + ps = providers_status[provider_id] + ps["last_status_check"] = _get_last_check(ps, status, "last_status_check") + ps["last_successful_check"] = _get_last_check(ps, status, "last_successful_check") + if status["status"] == "online": + ps["count_online"] += 1 + else: + ps["count_offline"] += 1 + + # determine provider's status based on online/offline collections count and threshold + ret_providers_status = {} + for provider_id, status in providers_status.items(): + count_online = status["count_online"] + count_offline = status["count_offline"] + total = count_online + count_offline + if total == 0: + provider_status = "offline" + elif count_online / total >= online_threshold: + provider_status = "online" + else: + provider_status = "offline" + ret_providers_status[provider_id] = { + "status": provider_status, + "last_status_check": status["last_status_check"], + "last_successful_check": status["last_successful_check"], + "successful_collections": count_online, + "failed_collections": count_offline, + "total_collections": status["total_collections"], + } + + return ret_providers_status + + +def _build_database() -> Database | None: + """Build the EODAG database backend according to settings. + + Returns ``None`` to let ``EODataAccessGateway`` fall back to its default + SQLite backend. For the ``postgresql`` backend, a libpq connection string + is built from the standard ``PG*`` environment variables (``PGHOST``, + ``PGPORT``, ``PGUSER``, ``PGDATABASE``, ``PGPASSWORD``); any variable that + is unset is omitted, letting libpq apply its own defaults. + """ + settings = get_settings() + if settings.database_type != "postgresql": + return None + + try: + from psycopg.conninfo import make_conninfo + + from stac_fastapi.eodag.databases.postgresql import PostgreSQLDatabase + except ImportError as e: + raise ImportError( + "The 'postgresql' extra is required to use the PostgreSQL backend. " + "Install it with: pip install stac-fastapi-eodag[postgresql]" + ) from e + + pg_env_to_kwarg = { + "PGHOST": "host", + "PGPORT": "port", + "PGUSER": "user", + "PGDATABASE": "dbname", + "PGPASSWORD": "password", + } + kwargs = {kwarg: os.environ[env] for env, kwarg in pg_env_to_kwarg.items() if os.getenv(env)} + conninfo = make_conninfo(**kwargs) + + return PostgreSQLDatabase(conninfo=conninfo) + + +def init_dag(app: FastAPI) -> None: + """Init EODataAccessGateway server instance, pre-running all time consuming tasks""" + dag = EODataAccessGateway(db=_build_database()) + + ext_stac_collections = fetch_external_stac_collections(dag.list_collections()) + + # update eodag collections config from external stac collections + collections = {} + status = {} + for c in dag.list_collections(): + if ext_coll := ext_stac_collections.get(c.id): + collection = ext_coll + collection["id"] = c._id + collection["alias"] = c.id + if federation := collection.pop("federation", None): + status[c._id] = federation + collections[c._id] = collection + + dag.db.upsert_collections(CollectionsDict.from_configs(collections)) + + # store status in a separate DB column to avoid federation to be overwritten + # by subsequent upsert_collections() calls + dag.db.set_status(status) + + # store providers status in app state + app.state.providers_status = get_providers_status(dag.list_collections()) # pre-build search plugins for provider in dag.providers: diff --git a/stac_fastapi/eodag/databases/postgresql.py b/stac_fastapi/eodag/databases/postgresql.py new file mode 100644 index 00000000..b5f3fe4d --- /dev/null +++ b/stac_fastapi/eodag/databases/postgresql.py @@ -0,0 +1,981 @@ +# -*- coding: utf-8 -*- +# Copyright 2026, CS GROUP - France, https://www.csgroup.eu/ +# +# This file is part of EODAG project +# https://www.github.com/CS-SI/EODAG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""PostgreSQL backend for EODAG. + +Connection parameters are read from the standard libpq environment variables: +``PGHOST``, ``PGPORT``, ``PGUSER``, ``PGDATABASE``, ``PGPASSWORD``. + +Requirements (server-side): +- PostgreSQL >= 12 (for STORED generated columns) +- ``postgis`` extension (spatial types and indexes) +- ``unaccent`` extension (CQL2 ``accenti`` operator) + +PostgreSQL always uses Write-Ahead Logging (WAL) internally; no client-side +configuration is needed. +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Optional, Union, cast + +import cql2 +import orjson +import psycopg +from eodag.api.collection import Collection, CollectionsDict +from eodag.api.product.metadata_mapping import NOT_AVAILABLE +from eodag.databases.base import ( + Database, + stac_search_to_where, + stac_sortby_to_order_by, +) +from eodag.utils import ( + GENERIC_COLLECTION, + PLUGINS_TOPIC_KEYS, +) +from eodag.utils.env import is_env_var_true +from psycopg import sql as pg_sql +from psycopg.rows import dict_row +from psycopg.types.json import JsonbDumper, set_json_dumps, set_json_loads + +from stac_fastapi.eodag.databases.postgresql_cql2 import cql2_json_to_sql +from stac_fastapi.eodag.databases.postgresql_fts import stac_q_to_tsquery + +if TYPE_CHECKING: + from collections.abc import Sequence + + from eodag.config import ProviderConfig + from psycopg.rows import DictRow + from shapely.geometry.base import BaseGeometry + +logger = logging.getLogger("eodag.databases.postgresql_database") +BASE_COLLECTION = { + "description": "description", + "keywords": [], + "extent": {"spatial": {"bbox": [-180, -90, 180, 90]}, "temporal": {"interval": [[None, None]]}}, +} + + +class PostgreSQLDatabase(Database): + """Database backend backed by PostgreSQL (with PostGIS and unaccent). + + The connection is created from the standard libpq environment variables + (``PGHOST``, ``PGPORT``, ``PGUSER``, ``PGDATABASE``, ``PGPASSWORD``). + """ + + _con: psycopg.Connection[Any] + + def __init__(self, conninfo: Optional[str] = None) -> None: + """Initialize the database by creating a connection and preparing the schema. + + :param conninfo: Optional libpq connection string. When omitted (the + default), connection parameters are read from the ``PG*`` + environment variables. + + :raises: :class:`~psycopg.Error` if the connection cannot be established + or the schema initialisation fails. + """ + # ``conninfo=""`` makes libpq use the standard PG* environment variables. + # The connection is opened with dict_row factory so that execute() can + # be called directly on the connection object. + try: + self._con = psycopg.connect( + conninfo if conninfo is not None else "", + autocommit=False, + row_factory=dict_row, + ) + except psycopg.Error as e: + logger.error( + "Failed to connect to PostgreSQL database " + "(check PGHOST, PGPORT, PGUSER, PGDATABASE, PGPASSWORD " + "environment variables or the supplied conninfo): %s", + e, + ) + raise + + _register_json_adapters(self._con) + + try: + _ensure_extensions(self._con) + create_collections_table(self._con) + create_collections_federation_backends_table(self._con) + create_federation_backends_table(self._con) + self._con.commit() + except Exception: + if not self._con.closed: + self._con.rollback() + raise + + def close(self) -> None: + """Close the database connection. + + No-op if the connection is already closed. + """ + if self._con and not self._con.closed: + self._con.close() + + # ------------------------------------------------------------------ utils + def _execute(self, sql: str, parameters: Optional[Sequence[Any]] = None) -> psycopg.Cursor[DictRow]: + """Execute a SQL statement, rolling back the connection on failure. + + :param sql: SQL statement to execute. + :param parameters: Sequence of parameters to bind to the statement. + + :raises: :class:`~psycopg.Error` if execution fails (the transaction is + rolled back before re-raising). + + :returns: The cursor after execution. + """ + try: + return self._con.execute(sql, parameters) + except psycopg.Error as e: + self._con.rollback() + raise e + + def _executemany(self, sql: str, parameters: Sequence[Sequence[Any]]) -> psycopg.Cursor[DictRow]: + """Execute a SQL statement against many parameter sets. + + :param sql: SQL statement to execute repeatedly. + :param parameters: Sequence of parameter sequences, one per execution. + + :raises: :class:`~psycopg.Error` if any execution fails (the transaction + is rolled back before re-raising). + + :returns: The cursor after all executions. + """ + try: + cur = self._con.cursor() + cur.executemany(sql, parameters) + return cur + except psycopg.Error as e: + self._con.rollback() + raise e + + # --------------------------------------------------------------- mutators + def delete_collections(self, collection_ids: list[str]) -> None: + """Remove collections and their federation backend configs from the database. + + Matches against both ``id`` and ``internal_id`` columns to handle aliases. + + :param collection_ids: List of collection IDs (or internal IDs) to delete. + + :raises ValueError: if ``collection_ids`` is empty. + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + if not collection_ids: + raise ValueError("collection_ids cannot be empty") + + match_clause = "(id = ANY(%s) OR internal_id = ANY(%s))" + # Delete federation backend configs using internal_id lookup + self._execute( + "DELETE FROM collections_federation_backends WHERE collection_id IN " + f"(SELECT internal_id FROM collections WHERE {match_clause})", + (collection_ids, collection_ids), + ) + self._execute( + f"DELETE FROM collections WHERE {match_clause}", + (collection_ids, collection_ids), + ) + self._con.commit() + + def delete_collections_federation_backends(self, collection_ids: list[str]) -> None: + """Remove collection entries from the collections_federation_backends table. + + :param collection_ids: List of collection internal IDs whose federation + backend entries should be removed. No-op if the list is empty. + + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + if not collection_ids: + return + self._execute( + "DELETE FROM collections_federation_backends WHERE collection_id = ANY(%s)", + (collection_ids,), + ) + self._con.commit() + + def upsert_collections(self, collections: CollectionsDict) -> None: + """Add or update collections in the database. + + Collections with ID ``GENERIC_COLLECTION`` or ``GENERIC_PRODUCT_TYPE`` + are silently skipped. + + :param collections: Mapping of collection objects or dicts to upsert. + + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + + def _get_id(c: Any) -> Optional[str]: + if isinstance(c, Collection): + return c.id + return c.get("id") if isinstance(c, dict) else None + + rows = [ + (_collection_to_json(c),) + for c in collections.values() + if _get_id(c) not in (GENERIC_COLLECTION, "GENERIC_PRODUCT_TYPE") + ] + if not rows: + return + + cur = self._executemany( + """ + INSERT INTO collections (content) VALUES (%s) + ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content; + """, + rows, + ) + upserted = cur.rowcount + self._con.commit() + + if upserted and upserted > 0: + logger.debug( + "%d collection(s) have been updated or added to the database", + upserted, + ) + + def _upsert_federation_backends( + self, + fb_configs: list[tuple[str, dict[str, dict[str, Any]], int, dict[str, Any], bool]], + ) -> None: + """Add or update federation backend configs (providers) in the database. + + :param fb_configs: List of tuples ``(name, plugins_config, priority, + metadata, enabled)`` describing each federation backend. + + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + if not fb_configs: + return + self._executemany( + """ + INSERT INTO federation_backends (name, plugins_config, priority, metadata, enabled) + VALUES (%s, %s, %s, %s, %s) + ON CONFLICT (name) DO UPDATE SET + plugins_config = EXCLUDED.plugins_config, + priority = EXCLUDED.priority, + metadata = EXCLUDED.metadata, + enabled = EXCLUDED.enabled + """, + fb_configs, + ) + logger.debug("Upserted %d federation backend(s)", len(fb_configs)) + + def _upsert_collections_federation_backends(self, coll_fb_configs: list[tuple[str, str, dict[str, Any]]]) -> None: + """Upsert collection-specific federation backend configs. + + :param coll_fb_configs: List of tuples ``(collection_id, + federation_backend_name, plugins_config)`` to upsert. + + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + if not coll_fb_configs: + return + self._executemany( + """ + INSERT INTO collections_federation_backends + (collection_id, federation_backend_name, plugins_config) + VALUES (%s, %s, %s) + ON CONFLICT (collection_id, federation_backend_name) DO UPDATE SET + plugins_config = EXCLUDED.plugins_config + """, + coll_fb_configs, + ) + logger.debug("Upserted %d collection-provider config(s)", len(coll_fb_configs)) + + def _refresh_collections_denorm(self, changed_fbs: list[str]) -> None: + """Refresh the denormalized ``federation_backends`` and ``priority`` columns. + + Re-aggregates the ``federation_backends`` array and ``priority`` value for + all collections linked to the given federation backends. + + :param changed_fbs: Names of federation backends whose collections should + be refreshed. No-op if the list is empty. + + :raises: :class:`~psycopg.Error` if the database operation fails. + """ + if not changed_fbs: + return + + # Build a CTE that aggregates federation backends and max priority per + # affected collection, then update those collections in a single pass. + self._execute( + """ + WITH affected AS ( + SELECT DISTINCT cfb.collection_id + FROM collections_federation_backends cfb + WHERE cfb.federation_backend_name = ANY(%s) + ), + agg AS ( + SELECT + cfb.collection_id, + COALESCE( + array_agg(fb.name ORDER BY fb.priority DESC, fb.name ASC), + ARRAY[]::text[] + ) AS federation_backends, + COALESCE(MAX(fb.priority), 0) AS priority + FROM affected a + JOIN collections_federation_backends cfb + ON cfb.collection_id = a.collection_id + JOIN federation_backends fb + ON fb.name = cfb.federation_backend_name AND fb.enabled = TRUE + GROUP BY cfb.collection_id + ) + UPDATE collections c + SET + federation_backends = COALESCE(agg.federation_backends, ARRAY[]::text[]), + priority = COALESCE(agg.priority, 0) + FROM affected + LEFT JOIN agg ON agg.collection_id = affected.collection_id + WHERE c.internal_id = affected.collection_id; + """, + (changed_fbs,), + ) + + def upsert_fb_configs(self, configs: list[ProviderConfig]) -> None: + """Add or update federation backend configs (providers) in the database. + + Processes each provider config to extract plugin settings and per-collection + product configurations, then persists them atomically. In permissive mode + (``EODAG_STRICT_COLLECTIONS`` not set), unknown collection IDs are created + automatically. + + :param configs: List of provider configuration objects to persist. + + :raises: :class:`~psycopg.Error` if the database operation fails (the + transaction is rolled back before re-raising). + """ + federation_backend_configs: list[tuple[str, dict[str, dict[str, Any]], int, dict[str, Any], bool]] = [] + coll_fb_configs: list[tuple[str, str, dict[str, Any]]] = [] + changed_fbs: set[str] = set() + known_collections = {coll["id"] for coll in self.collections_search(with_fbs_only=False)[0]} | { + GENERIC_COLLECTION, + "GENERIC_PRODUCT_TYPE", + } + strict_mode = is_env_var_true("EODAG_STRICT_COLLECTIONS") + collections_to_add: list[Collection] = [] + + def strip_credentials(plugin_conf: dict[str, Any]) -> dict[str, Any]: + return {k: v for k, v in plugin_conf.items() if k != "credentials"} + + for config in configs: + exclude_keys = set(PLUGINS_TOPIC_KEYS) | { + "name", + "priority", + "enabled", + "products", + } + metadata = {k: v for k, v in config.__dict__.items() if k not in exclude_keys} + + plugins_config: dict[str, dict[str, Any]] = {} + for k in PLUGINS_TOPIC_KEYS: + if val := getattr(config, k, None): + plugins_config[k] = strip_credentials(val.__dict__) + + federation_backend_configs.append( + ( + config.name, + plugins_config, + getattr(config, "priority", 0), + metadata, + config.enabled, + ) + ) + + topics_cfg: dict[str, dict[str, Any]] = {} + products_cfg = getattr(config, "products", {}) + if getattr(config, "api", None): + topics_cfg["api"] = products_cfg + else: + topics_cfg["search"] = products_cfg + if products_download_cfg := getattr(getattr(config, "download", None), "products", None): + topics_cfg["download"] = products_download_cfg + + tmp: dict[str, dict[str, Any]] = defaultdict(lambda t=topics_cfg: {topic: None for topic in t}) + + for topic, products_cfg in topics_cfg.items(): + for coll_id, cfg in products_cfg.items(): + if strict_mode and coll_id not in known_collections: + continue + if coll_id not in known_collections: + collections_to_add.append(Collection(id=coll_id, title=coll_id, description=NOT_AVAILABLE)) + + tmp[coll_id][topic] = cfg + + for coll_id, cfg in tmp.items(): + coll_fb_configs.append((coll_id, config.name, cfg)) + + changed_fbs.add(config.name) + + try: + if collections_to_add: + self.upsert_collections(CollectionsDict(collections_to_add)) + logger.debug( + "Collections permissive mode, %s added", + ", ".join(c.id for c in collections_to_add), + ) + self._upsert_federation_backends(federation_backend_configs) + self._upsert_collections_federation_backends(coll_fb_configs) + self._refresh_collections_denorm(sorted(changed_fbs)) + self._con.commit() + except Exception: + if not self._con.closed: + self._con.rollback() + raise + + def set_priority(self, name: str, priority: int) -> None: + """Set the priority of a federation backend. + + :param name: Name of the federation backend to update. + :param priority: New priority value. + + :raises: :class:`~psycopg.Error` if the database operation fails (the + transaction is rolled back before re-raising). + """ + try: + self._execute( + "UPDATE federation_backends SET priority = %s WHERE name = %s", + (priority, name), + ) + self._refresh_collections_denorm([name]) + self._con.commit() + except Exception: + if not self._con.closed: + self._con.rollback() + raise + + def set_status(self, status: dict[str, dict[str, Any]]) -> None: + """ + Set the federation status. + + :param status: Dictionary mapping collection IDs to their backend status. + + :raises: :class:`~psycopg.Error` if the database operation fails (the + transaction is rolled back before re-raising). + """ + try: + self._execute( + "UPDATE collections SET federation = %s WHERE id = %s OR internal_id = %s", + [(status, c, c) for c, status in status.items()], + ) + self._con.commit() + except Exception: + if not self._con.closed: + self._con.rollback() + raise + + # --------------------------------------------------------------- queries + def collections_search( + self, + geometry: Optional[Union[str, dict[str, float], BaseGeometry]] = None, + datetime: Optional[str] = None, + limit: Optional[int] = None, + q: Optional[str] = None, + ids: Optional[list[str]] = None, + federation_backends: Optional[list[str]] = None, + cql2_text: Optional[str] = None, + cql2_json: Optional[dict[str, Any]] = None, + sortby: Optional[list[dict[str, str]]] = None, + with_fbs_only: bool = True, + ) -> tuple[list[dict[str, Any]], int]: + """Search collections matching the given parameters. + + :param geometry: Optional spatial filter as a WKT string, a bounding-box + dict, or a Shapely geometry. + :param datetime: Optional temporal filter as an ISO 8601 datetime or + interval (e.g. ``"2020-01-01/2020-12-31"``). + :param limit: Maximum number of collections to return. + :param q: Free-text search expression (STAC ``q`` syntax). + :param ids: List of collection IDs to restrict results to. + :param federation_backends: List of federation backend names to filter by. + :param cql2_text: CQL2 text filter expression. Mutually exclusive with + ``cql2_json``. + :param cql2_json: CQL2 JSON filter expression. Mutually exclusive with + ``cql2_text``. + :param sortby: List of sort specifications, each a dict with ``field`` + and ``direction`` keys. + :param with_fbs_only: If ``True`` (default), only return collections that + have at least one federation backend assigned. + + :raises ValueError: if both ``cql2_text`` and ``cql2_json`` are provided. + :raises: :class:`~psycopg.Error` if the database query fails. + + :returns: A tuple ``(collections, total)`` where ``collections`` is the + list of matching collection dicts (each including a + ``federation:backends`` key) and ``total`` is the total number of + matching collections ignoring the ``limit``. + """ + if cql2_text and cql2_json: + raise ValueError("Cannot provide both cql2_text and cql2_json") + + if cql2_text: + cql2_json = cql2.parse_text(cql2_text).to_json() + + where = stac_search_to_where( + cql2_json_to_sql, + geometry, + datetime, + ids, + federation_backends, + cql2_json, + ) + + from_clause = "FROM collections c" + where_parts = [where, "c.federation_backends IS NOT NULL"] if with_fbs_only else [where] + params: list[Any] = [] + order_terms: list[str] = [] + select_score = "" + + if q: + ts_expr = stac_q_to_tsquery(q) + if ts_expr: + # Match against the precomputed weighted tsvector, ranked with + # ``ts_rank_cd`` (ascending = best match first means we negate it). + where_parts.append("c.tsv @@ to_tsquery('simple', %s)") + params.append(ts_expr) + + select_score = ", ts_rank_cd(c.tsv, to_tsquery('simple', %s)) AS rank_score" + params.insert(0, ts_expr) # parameter for SELECT clause + order_terms = ["rank_score DESC"] + + if sortby: + order_terms = stac_sortby_to_order_by(sortby) + + order_terms.extend(["c.priority DESC NULLS LAST", "c.id ASC"]) + order_by = " ORDER BY " + ", ".join(order_terms) + + full_where = " AND ".join(where_parts) + + # Count uses only the WHERE-clause parameters, not the SELECT-clause one. + count_params = list(params) + if select_score: + # The first parameter is the SELECT-clause tsquery; drop it for COUNT. + count_params = count_params[1:] + + count_row = self._execute( + f"SELECT COUNT(*) AS n {from_clause} WHERE {full_where}", + count_params or None, + ).fetchone() + number_matched = cast(int, count_row["n"]) if count_row is not None else 0 + + sql = ( + f"SELECT c.content AS content, " + f"c.federation_backends AS federation_backends, " + f"c.federation AS federation{select_score} " + f"{from_clause} WHERE {full_where}{order_by}" + ) + if limit is not None: + sql += f" LIMIT {int(limit)}" + + collections_list: list[dict[str, Any]] = [] + for row in self._execute(sql, params or None).fetchall(): + coll = _collection_from_json(row["content"]) + coll["federation:backends"] = row["federation_backends"] + coll["federation"] = row["federation"] + collections_list.append(coll) + + return collections_list, number_matched + + def get_federation_backends( + self, + names: Optional[set[str]] = None, + enabled: Optional[bool] = None, + fetchable: Optional[bool] = None, + collection: Optional[str] = None, + limit: Optional[int] = None, + ) -> dict[str, dict[str, Any]]: + """Return federation backends according to filters. + + :param names: Restrict results to backends with these names. + :param enabled: If ``True``, return only enabled backends; if ``False``, + return only disabled ones; if ``None`` (default), return all. + :param fetchable: If ``True``, return only backends whose metadata marks + them as fetchable; if ``False``, return only non-fetchable ones; if + ``None`` (default), return all. + :param collection: If provided, restrict to backends linked to this + collection ID. + :param limit: Maximum number of backends to return. + + :raises: :class:`~psycopg.Error` if the database query fails. + + :returns: Dict mapping each backend name to a dict with ``priority``, + ``enabled``, and ``metadata`` keys. + """ + sql = "SELECT fb.name, fb.priority, fb.enabled, fb.metadata FROM federation_backends fb" + where_clauses: list[str] = [] + params: list[Any] = [] + + if collection: + sql += ( + " INNER JOIN collections_federation_backends cfb " + "ON fb.name = cfb.federation_backend_name AND cfb.collection_id = %s" + ) + params.append(collection) + + if enabled is not None: + where_clauses.append(f"{'NOT ' if not enabled else ''}fb.enabled") + + if fetchable is not None: + cmp = "= TRUE" if fetchable else "IS DISTINCT FROM TRUE" + where_clauses.append(f"(fb.metadata->>'fetchable')::boolean {cmp}") + + if names: + where_clauses.append("fb.name = ANY(%s)") + params.append(list(names)) + + if where_clauses: + sql += " WHERE " + " AND ".join(where_clauses) + + sql += " ORDER BY fb.priority DESC, fb.name ASC" + if limit is not None: + sql += f" LIMIT {int(limit)}" + + rows = self._execute(sql, params or None).fetchall() + + return { + row["name"]: { + "priority": row["priority"], + "enabled": bool(row["enabled"]), + "metadata": row["metadata"] or {}, + } + for row in rows + } + + def get_fb_config( + self, + name: str, + collections: set[str] | None = None, + ) -> dict[str, Any]: + """Get the federation backend config for a given provider and optional collection filter. + + :param name: Name of the federation backend to retrieve. + :param collections: Optional set of collection IDs whose per-collection + plugin config should be included in the returned dict. + + :raises KeyError: if the federation backend ``name`` is not found. + :raises: :class:`~psycopg.Error` if the database query fails. + + :returns: Dict with the provider's merged plugin config, metadata, + priority, enabled flag, and per-collection product configs. + """ + collections = collections or set() + + if collections: + cfb_filter_sql = "cfb.collection_id = ANY(%s)" + cfb_params: tuple[Any, ...] = (list(collections),) + else: + cfb_filter_sql = "FALSE" + cfb_params = () + + sql = f""" + SELECT + fb.plugins_config AS provider_plugins_config, + fb.priority AS provider_priority, + fb.metadata AS provider_metadata, + fb.enabled AS provider_enabled, + + c.collection_id AS collection_id, + c.plugins_config AS collection_plugins_config + FROM federation_backends fb + LEFT JOIN ( + SELECT + cfb.collection_id, + cfb.plugins_config + FROM collections_federation_backends cfb + WHERE cfb.federation_backend_name = %s + AND {cfb_filter_sql} + ) AS c + ON TRUE + WHERE fb.name = %s + """ + params = (name, *cfb_params, name) + + rows = self._execute(sql, params).fetchall() + if not rows or not rows[0]["provider_plugins_config"]: + raise KeyError(f"Provider '{name}' not found") + + base: dict[str, Any] = ( + (rows[0]["provider_plugins_config"] or {}) + | (rows[0]["provider_metadata"] or {}) + | { + "priority": rows[0]["provider_priority"], + "enabled": bool(rows[0]["provider_enabled"]), + "name": name, + } + ) + base.setdefault("products", {}) + if isinstance(base.get("download"), dict): + base["download"].setdefault("products", {}) + + for r in rows: + cid = r["collection_id"] + if not cid: + continue + blob = r["collection_plugins_config"] or {} + base["products"][cid] = blob.get("search", {}) or blob.get("api", {}) + if isinstance(base.get("download"), dict): + base["download"]["products"][cid] = blob.get("download", {}) + + return base + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _collection_to_json(collection: Any) -> dict[str, Any]: + """Serialize a Collection (or dict) for JSONB storage. + + Drops the ``federation:backends`` and ``federation`` keys from the stored content as + they are materialized in a dedicated ``text[]`` column. + + :param collection: A :class:`~eodag.api.collection.Collection` instance or + a plain dict representing a STAC collection. + + :returns: Dict ready for insertion into the ``content`` JSONB column. + """ + if isinstance(collection, Collection): + data = collection.model_dump(mode="json") + data["_id"] = collection._id + else: + data = dict(collection) + for key in BASE_COLLECTION: + if not data.get(key): + data[key] = BASE_COLLECTION[key] + data.pop("federation:backends", None) + data.pop("federation", None) + return data + + +def _collection_from_json(data: Any) -> dict[str, Any]: + """Deserialize a stored JSONB collection back to a STAC-shaped dict. + + Remaps ``_id`` (internal id) back to ``id`` so that + ``Collection(**data)`` reconstructs correctly. + + :param data: Raw data loaded from the JSONB ``content`` column. + + :returns: STAC-shaped collection dict with ``id`` restored. + """ + if data is None: + return {} + data = dict(data) + if "_id" in data: + data["id"] = data.pop("_id") + return data + + +def _register_json_adapters(con: psycopg.Connection[Any]) -> None: + """Register per-connection JSON adapters. + + - Plain ``dict`` parameters are dumped as ``jsonb`` (no need to wrap them + in ``Jsonb(...)`` at every call site). + - JSON (de)serialization uses ``orjson`` for performance. + + :param con: The psycopg connection on which to register the adapters. + """ + con.adapters.register_dumper(dict, JsonbDumper) + set_json_dumps(orjson.dumps, context=con) + set_json_loads(orjson.loads, context=con) + + +def _ensure_extensions(con: psycopg.Connection[Any]) -> None: + """Ensure required PostgreSQL extensions are installed. + + Extensions ``postgis`` and ``unaccent`` are required by this backend. + Logs a warning and continues if an extension cannot be created (e.g. because + a superuser is required) rather than raising an error. + + :param con: The psycopg connection to use to install the extensions. + """ + cur = con.cursor() + for ext in ("postgis", "unaccent"): + try: + cur.execute(pg_sql.SQL("CREATE EXTENSION IF NOT EXISTS {}").format(pg_sql.Identifier(ext))) + except psycopg.Error: + con.rollback() + logger.warning( + "Could not enable extension '%s'. It may need to be installed by a superuser.", + ext, + ) + + +def create_collections_table(con: psycopg.Connection[Any]) -> None: + """Create the ``collections`` table along with its indexes and triggers. + + The schema relies on: + - PostgreSQL 12+ generated columns for ``id``, ``internal_id``, datetimes + and the FTS ``tsvector``. + - PostGIS ``geometry(Polygon, 4326)`` and a GiST index for spatial filters. + - GIN indexes on ``federation_backends`` (text[]) and ``tsv`` (tsvector). + - A BEFORE INSERT/UPDATE trigger (``collections_set_derived_cols``) maintains + the ``geometry``, ``datetime``, ``end_datetime``, and ``tsv`` columns from + the ``content`` JSONB (generated columns cannot reference JSONB expressions + in all PostgreSQL versions, so a trigger is used instead). + + :param con: Open psycopg connection used to execute the DDL statements. The + caller is responsible for committing. + + :raises: :class:`~psycopg.Error` if any DDL statement fails. + """ + cur = con.cursor() + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS collections ( + key BIGSERIAL PRIMARY KEY, + content JSONB NOT NULL, + id TEXT GENERATED ALWAYS AS (content->>'id') STORED UNIQUE, + internal_id TEXT GENERATED ALWAYS AS (content->>'_id') STORED UNIQUE, + datetime TIMESTAMPTZ, + end_datetime TIMESTAMPTZ, + geometry geometry(Polygon, 4326), + federation_backends TEXT[], + federation TEXT[], + priority INTEGER, + tsv tsvector + ); + """ + ) + + # Trigger to (re)compute derived columns (geometry, datetime, end_datetime, tsv) + # from the JSON content on every INSERT or UPDATE. + cur.execute( + """ + CREATE OR REPLACE FUNCTION collections_set_derived_cols() + RETURNS TRIGGER AS $$ + DECLARE + bbox jsonb; + xmin double precision; + ymin double precision; + xmax double precision; + ymax double precision; + kws text; + dt_start text; + dt_end text; + BEGIN + bbox := NEW.content #> '{extent,spatial,bbox,0}'; + IF bbox IS NULL OR jsonb_typeof(bbox) <> 'array' THEN + NEW.geometry := NULL; + ELSE + xmin := (bbox->>0)::double precision; + ymin := (bbox->>1)::double precision; + xmax := (bbox->>2)::double precision; + ymax := (bbox->>3)::double precision; + NEW.geometry := ST_MakeEnvelope(xmin, ymin, xmax, ymax, 4326); + END IF; + + dt_start := NEW.content #>> '{extent,temporal,interval,0,0}'; + NEW.datetime := CASE + WHEN dt_start IS NULL THEN '-infinity'::timestamptz + ELSE dt_start::timestamptz + END; + + dt_end := NEW.content #>> '{extent,temporal,interval,0,1}'; + NEW.end_datetime := CASE + WHEN dt_end IS NULL THEN 'infinity'::timestamptz + ELSE dt_end::timestamptz + END; + + SELECT string_agg(value, ' ') + INTO kws + FROM jsonb_array_elements_text( + COALESCE(NEW.content->'keywords', '[]'::jsonb) + ) AS value; + + NEW.tsv := + setweight(to_tsvector('simple', + coalesce(NEW.content->>'title', '')), 'A') + || setweight(to_tsvector('simple', + coalesce(NEW.content->>'description', '')), 'B') + || setweight(to_tsvector('simple', + coalesce(kws, '')), 'C'); + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + cur.execute("DROP TRIGGER IF EXISTS collections_set_geometry_trg ON collections;") + cur.execute("DROP TRIGGER IF EXISTS collections_set_derived_cols_trg ON collections;") + cur.execute( + """ + CREATE TRIGGER collections_set_derived_cols_trg + BEFORE INSERT OR UPDATE OF content ON collections + FOR EACH ROW EXECUTE FUNCTION collections_set_derived_cols(); + """ + ) + + # Indexes + cur.execute("CREATE INDEX IF NOT EXISTS idx_collections_datetime ON collections (datetime);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_collections_end_datetime ON collections (end_datetime);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_collections_geometry ON collections USING GIST (geometry);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_collections_fbs ON collections USING GIN (federation_backends);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_collections_tsv ON collections USING GIN (tsv);") + + +def create_federation_backends_table(con: psycopg.Connection[Any]) -> None: + """Create the ``federation_backends`` table. + + :param con: Open psycopg connection used to execute the DDL statement. The + caller is responsible for committing. + + :raises: :class:`~psycopg.Error` if the DDL statement fails. + """ + cur = con.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS federation_backends ( + key BIGSERIAL PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + plugins_config JSONB NOT NULL, + priority INTEGER NOT NULL, + metadata JSONB, + enabled BOOLEAN NOT NULL + ); + """ + ) + + +def create_collections_federation_backends_table(con: psycopg.Connection[Any]) -> None: + """Create the per-collection federation backend configuration table. + + :param con: Open psycopg connection used to execute the DDL statements. The + caller is responsible for committing. + + :raises: :class:`~psycopg.Error` if any DDL statement fails. + """ + cur = con.cursor() + cur.execute( + """ + CREATE TABLE IF NOT EXISTS collections_federation_backends ( + collection_id TEXT, + federation_backend_name TEXT, + plugins_config JSONB NOT NULL, + PRIMARY KEY (collection_id, federation_backend_name) + ); + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_cfb_backend_collection + ON collections_federation_backends (federation_backend_name, collection_id); + """ + ) diff --git a/stac_fastapi/eodag/databases/postgresql_cql2.py b/stac_fastapi/eodag/databases/postgresql_cql2.py new file mode 100644 index 00000000..8f92e25b --- /dev/null +++ b/stac_fastapi/eodag/databases/postgresql_cql2.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +# Copyright 2026, CS GROUP - France, https://www.csgroup.eu/ +# +# This file is part of EODAG project +# https://www.github.com/CS-SI/EODAG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""CQL2 to PostgreSQL SQL translator for EODAG's PostgreSQL backend.""" + +from __future__ import annotations + +import re +from typing import Any + +import cql2 +import orjson +from eodag.databases.base import ( + BASE_COLLECTION_TABLE_COLUMNS, + extract_properties, + validate_supported_ops, +) + +# Properties that map to text columns / values; default for non-mapped properties. +# Numeric-like JSONB values are compared as text by default which still works for +# CQL2's range/equality semantics on STAC fields (most are strings). + +# cql2 emits the Postgres-flavored ARRAY operator ``@@`` for ``a_overlaps``. +# In real Postgres this token is the FTS match operator, so we rewrite it to the +# array overlap operator ``&&``. +_RE_OVERLAP_ARRAY = re.compile(r"@@\s*ARRAY\[") + +# cql2 emits ``strip_accents(...)`` for the CQL2 ``accenti`` operator. PostgreSQL +# provides the ``unaccent()`` function via the ``unaccent`` extension. +_RE_STRIP_ACCENTS = re.compile(r"\bstrip_accents\s*\(", re.IGNORECASE) + +# Scalar normalisation for A_CONTAINS / A_CONTAINEDBY / A_OVERLAPS: +# ``col @> 'value'``, ``col <@ 'value'``, ``col @@ 'value'`` +# → ``col @> ARRAY['value']`` / ``col <@ ARRAY['value']`` / ``col @@ ARRAY['value']``. +# Skips values starting with ``{`` (PG array literal) or ``[`` (JSON array literal). +_RE_CONTAINS_SCALAR = re.compile(r"(@>|<@|@@)\s*'([^{[][^']*?)'") + +# Unified JSONB array-operator rewrite. +# +# cql2 extracts content properties via ``->>`` / ``#>>`` which return ``text``. +# PostgreSQL has no array or JSONB operators for ``text``, so we must switch to +# the jsonb-returning forms (``->`` / ``#>``) and adapt the RHS accordingly. +# +# Handled operators: +# @> (A_CONTAINS) → jsonb @> jsonb +# <@ (A_CONTAINEDBY) → jsonb <@ jsonb +# && (A_OVERLAPS) → EXISTS sub-select (no native jsonb && jsonb) +# +# Handles one or more values in the ARRAY[...] literal. +_RE_JSONB_ARRAY_OP = re.compile( + r"\(content\s*(->>|#>>)\s*'([^']+)'\)\s*(@>|<@|&&|=)\s*(ARRAY\[[^\]]+\])", + re.IGNORECASE, +) + +# a_equals on physical text[] columns: ``col = ARRAY[...]``. +# PostgreSQL ``=`` is order-sensitive; rewrite to order-independent +# bidirectional containment so the semantics match CQL2 a_equals. +# Only applies to bare identifiers (not content->> extractions, which are +# handled by _RE_JSONB_ARRAY_OP above). +_RE_TEXT_ARRAY_EQUALS = re.compile(r"(\b(?:c\.)?\w+\b)\s*=\s*(ARRAY\[[^\]]+\])") + + +def _rewrite_jsonb_array_op(m: re.Match) -> str: # type: ignore[type-arg] + """Rewrite a JSONB text-extraction + array-operator expression. + + Switches the LHS from text-extraction (``->>``, ``#>>``) to jsonb-extraction + (``->``, ``#>``) and converts the ``ARRAY[...]`` RHS to a JSONB array literal + for ``@>`` and ``<@``. For ``&&`` (A_OVERLAPS) there is no native + ``jsonb && jsonb`` operator, so an ``EXISTS`` sub-select is generated instead. + + :param m: Regex match object from :data:`_RE_JSONB_ARRAY_OP`. + + :returns: Rewritten SQL fragment using jsonb-typed operators. + """ + extract_op = m.group(1) # '->>' or '#>>' + key = m.group(2) + op = m.group(3) + # Build jsonb-extraction LHS (drop the trailing '>' to get '->' or '#>') + if extract_op == "->>": + lhs = f"(content->'{key}')" + else: # '#>>' + lhs = f"(content #> '{key}')" + # Parse all values from ARRAY['v1', 'v2', ...] + items = re.findall(r"'([^']*)'", m.group(4)) + if op == "&&": + # No jsonb && jsonb operator; expand to EXISTS overlap check. + items_sql = ", ".join(f"'{i}'" for i in items) + return f"EXISTS (SELECT 1 FROM jsonb_array_elements_text({lhs}) _e WHERE _e = ANY(ARRAY[{items_sql}]))" + json_rhs = orjson.dumps(items).decode() + if op == "=": + # a_equals: order-independent — both directions of containment. + return f"({lhs} @> '{json_rhs}'::jsonb AND {lhs} <@ '{json_rhs}'::jsonb)" + return f"{lhs} {op} '{json_rhs}'::jsonb" + + +def _replace_properties(sql: str, properties: set[str]) -> str: + """Rewrite property references in the SQL. + + Properties matching a physical column (see ``BASE_COLLECTION_TABLE_COLUMNS``) + are passed through; all others are translated to ``content->>'prop'`` (text) + JSONB extraction expressions. + + :param sql: Raw SQL string emitted by cql2 with quoted property names. + :param properties: Set of property names extracted from the CQL2 expression. + + :returns: SQL string with all property references replaced by their + column name or JSONB extraction expression. + """ + result = sql + for prop in sorted(properties, key=len, reverse=True): + if value := BASE_COLLECTION_TABLE_COLUMNS.get(prop): + prop_expr = value + quoted = f'"{value}"' + else: + # For dotted property names (e.g. ``summaries.platform``) convert to a + # JSONB path expression so PostgreSQL navigates the nested structure + # rather than looking for a flat top-level key named + # ``"summaries.platform"``. + if "." in prop: + path = ",".join(prop.split(".")) + prop_expr = f"(content #>> '{{{path}}}')" # e.g. (content #>> '{summaries,platform}') + else: + # Use ``->>`` to extract the JSON value as text (compatible with + # most CQL2 comparison operators which produce text-typed RHS + # literals). + prop_expr = f"(content->>'{prop}')" + quoted = f'"{prop}"' + + if quoted in result: + result = result.replace(quoted, prop_expr) + else: + result = re.sub(rf"\b{re.escape(prop)}\b", prop_expr, result) + + return result + + +def _postgres_compat(sql: str) -> str: + """Apply PostgreSQL-specific rewrites to the SQL emitted by cql2. + + Most of cql2's SQL output is already valid PostgreSQL, but a couple of + tokens need to be adjusted for native semantics: + + - ``@@ ARRAY[...]`` is rewritten to ``&& ARRAY[...]`` so it uses the + Postgres array-overlap operator instead of the FTS match operator. + - ``strip_accents(...)`` is rewritten to ``unaccent(...)`` (requires the + ``unaccent`` extension to be installed in the database). + - ``col @> 'scalar'`` / ``col <@ 'scalar'`` / ``col @@ 'scalar'`` are + normalised to the ARRAY form. + - ``(content->>'k') OP ARRAY[...]`` / ``(content #>> 'path') OP ARRAY[...]`` + are rewritten to use jsonb-returning extraction and a JSONB array literal + (or an EXISTS sub-select for ``&&``; bidirectional containment for ``=``). + - ``col = ARRAY[...]`` on physical ``text[]`` columns is rewritten to + order-independent ``@> AND <@`` containment (CQL2 a_equals is unordered). + + :param sql: SQL string with property references already rewritten by + :func:`~stac_fastapi.eodag.databases.postgresql_cql2._replace_properties`. + + :returns: PostgreSQL-compatible SQL string ready for use as a WHERE clause. + """ + result = _RE_OVERLAP_ARRAY.sub("&& ARRAY[", sql) + result = _RE_STRIP_ACCENTS.sub("unaccent(", result) + result = _RE_CONTAINS_SCALAR.sub(r"\1 ARRAY['\2']", result) + result = _RE_JSONB_ARRAY_OP.sub(_rewrite_jsonb_array_op, result) + # a_equals on physical text[] columns: rewrite to order-independent containment. + result = _RE_TEXT_ARRAY_EQUALS.sub( + lambda m: f"({m.group(1)} @> {m.group(2)} AND {m.group(1)} <@ {m.group(2)})", + result, + ) + return result + + +def cql2_json_to_sql(cql2_json: dict[str, Any]) -> str: + """Validate CQL2 JSON and return a PostgreSQL-compatible WHERE SQL fragment. + + :param cql2_json: CQL2 filter expression as a parsed JSON dict. + + :raises ValueError: if the expression uses unsupported CQL2 operators. + + :returns: PostgreSQL-compatible WHERE clause fragment (without the + ``WHERE`` keyword). + """ + + validate_supported_ops(cql2_json) + + expr = cql2.parse_json(orjson.dumps(cql2_json).decode()) + raw_sql = expr.to_sql() + + properties: set[str] = set() + extract_properties(cql2_json, properties) + where_sql = _replace_properties(raw_sql, properties) + where_sql = _postgres_compat(where_sql) + + return where_sql diff --git a/stac_fastapi/eodag/databases/postgresql_fts.py b/stac_fastapi/eodag/databases/postgresql_fts.py new file mode 100644 index 00000000..0cbb5266 --- /dev/null +++ b/stac_fastapi/eodag/databases/postgresql_fts.py @@ -0,0 +1,221 @@ +# -*- coding: utf-8 -*- +# Copyright 2026, CS GROUP - France, https://www.csgroup.eu/ +# +# This file is part of EODAG project +# https://www.github.com/CS-SI/EODAG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""STAC q expression parser for PostgreSQL ``tsquery``. + +Supported syntax (same as the SQLite/FTS5 variant): +- bare terms (implicit OR between adjacent terms, per STAC default) +- quoted phrases ("exact phrase") -> emitted as PostgreSQL phrase tsquery +- explicit AND / OR operators +- parentheses for grouping + +``+`` and ``-`` prefix operators are not supported and will raise ``ValueError``. + +The output is a string usable with ``to_tsquery('simple', $expr)``. +""" + +from __future__ import annotations + +import re +from typing import cast + +_FTS_TOKEN_RE = re.compile( + r'"(?:[^"\\]|\\.)*"|\(|\)|\bAND\b|\bOR\b|[^\s()"]+', + flags=re.IGNORECASE, +) + +_UNSUPPORTED_PREFIX_RE = re.compile(r"^[+-]") + +_PRECEDENCE = {"OR": 1, "AND": 2} +_SYNTAX = frozenset(_PRECEDENCE) | {"(", ")"} + +# tsquery operator mapping +_OP_MAP = {"AND": "&", "OR": "|"} + +# Lexeme split (whitespace and basic punctuation) for phrases. +_LEXEME_SPLIT_RE = re.compile(r"\W+", flags=re.UNICODE) + + +def _escape_lexeme(token: str) -> str: + """Escape a single lexeme for tsquery by wrapping it in single quotes. + + Internal single quotes are doubled to avoid tsquery syntax errors. + + :param token: Raw lexeme string to escape. + + :returns: The lexeme wrapped in single quotes with internal quotes doubled. + """ + return "'" + token.replace("'", "''") + "'" + + +def _term_to_tsquery(token: str) -> str: + """Convert one parsed token (bare word or quoted phrase) to a tsquery fragment. + + Quoted phrases are split into individual lexemes joined with the PostgreSQL + phrase operator ``<->``. + + :param token: A bare-word token or a double-quoted phrase token. + + :raises ValueError: if a quoted phrase contains no lexemes after splitting. + + :returns: A tsquery fragment string (lexeme or phrase expression). + """ + if token.startswith('"') and token.endswith('"') and len(token) >= 2: + # Quoted phrase: split into lexemes joined with the phrase operator <->. + inner = token[1:-1].replace('\\"', '"') + lexemes = [w for w in _LEXEME_SPLIT_RE.split(inner) if w] + if not lexemes: + raise ValueError("Empty phrase in q expression") + if len(lexemes) == 1: + return _escape_lexeme(lexemes[0]) + return "(" + " <-> ".join(_escape_lexeme(w) for w in lexemes) + ")" + return _escape_lexeme(token) + + +def _normalize_tokens(q: str) -> list[str]: + """Tokenize a STAC ``q`` expression and insert implicit OR operators. + + Rejects the ``+`` and ``-`` prefix operators. Adjacent operands have an + implicit ``OR`` inserted between them, following the STAC default. + + :param q: Raw STAC ``q`` filter string. + + :raises ValueError: if a token starts with an unsupported ``+`` or ``-`` + prefix operator. + + :returns: List of tokens with implicit ``OR`` operators inserted. + """ + raw = cast(list[str], _FTS_TOKEN_RE.findall(q.strip())) + if not raw: + return [] + + out: list[str] = [] + for tok in raw: + up = tok.upper() + if up in _SYNTAX: + out.append(up) + continue + + if _UNSUPPORTED_PREFIX_RE.match(tok): + msg = f"Unsupported operator '{tok[0]}' in q expression. Use AND / OR / parentheses instead." + raise ValueError(msg) + + out.append(tok) + + # Insert implicit OR (default STAC behavior) between adjacent operands. + with_or: list[str] = [] + for i, tok in enumerate(out): + with_or.append(tok) + if i < len(out) - 1: + nxt = out[i + 1] + left_is_end = tok not in _SYNTAX or tok == ")" + right_is_start = nxt not in _SYNTAX or nxt == "(" + if left_is_end and right_is_start: + with_or.append("OR") + + return with_or + + +def _to_postfix(tokens: list[str]) -> list[str]: + """Convert infix tokens to postfix order using the shunting-yard algorithm. + + Handles left-associative ``AND`` (higher precedence) and ``OR`` operators + and parentheses for grouping. + + :param tokens: List of normalised infix tokens (operators, operands, + parentheses) as produced by :func:`_normalize_tokens`. + + :raises ValueError: if parentheses are unbalanced. + + :returns: List of tokens in postfix (reverse-Polish) order. + """ + out: list[str] = [] + ops: list[str] = [] + + for tok in tokens: + if tok not in _SYNTAX: + out.append(tok) + elif tok == "(": + ops.append(tok) + elif tok == ")": + while ops and ops[-1] != "(": + out.append(ops.pop()) + if not ops: + raise ValueError("Unbalanced parentheses") + ops.pop() + else: # AND / OR + while ops and ops[-1] != "(" and _PRECEDENCE.get(ops[-1], 0) >= _PRECEDENCE[tok]: + out.append(ops.pop()) + ops.append(tok) + + while ops: + if ops[-1] in {"(", ")"}: + raise ValueError("Unbalanced parentheses") + out.append(ops.pop()) + + return out + + +def _postfix_to_tsquery(postfix: list[str]) -> str: + """Build a ``tsquery`` expression string from a list of postfix tokens. + + :param postfix: List of postfix-ordered tokens as produced by + :func:`_to_postfix`. + + :raises ValueError: if an operator is encountered with fewer than two + operands on the stack, or if the stack does not contain exactly one + result after processing. + + :returns: A complete ``tsquery`` expression string. + """ + stack: list[str] = [] + + for tok in postfix: + if tok in _PRECEDENCE: + if len(stack) < 2: + raise ValueError(f"Missing operand for operator {tok}") + rhs = stack.pop() + lhs = stack.pop() + stack.append(f"({lhs}) {_OP_MAP[tok]} ({rhs})") + else: + stack.append(_term_to_tsquery(tok)) + + if len(stack) != 1: + raise ValueError("Invalid expression") + return stack[0] + + +def stac_q_to_tsquery(q: str) -> str: + """Convert a STAC ``q`` expression to a PostgreSQL ``tsquery`` string. + + Output is intended for ``to_tsquery('simple', $expr)``. + + :param q: STAC ``q`` filter string. Supports bare terms, ``"quoted phrases"``, + ``AND`` / ``OR`` operators and parentheses for grouping. Returns an + empty string if ``q`` contains no tokens. + + :raises ValueError: if ``q`` contains unsupported prefix operators (``+``, + ``-``), an unbalanced parenthesis, or an otherwise malformed expression. + + :returns: A ``tsquery``-compatible expression string, or ``""`` if ``q`` + contains no tokens. + """ + tokens = _normalize_tokens(q) + if not tokens: + return "" + postfix = _to_postfix(tokens) + return _postfix_to_tsquery(postfix) diff --git a/stac_fastapi/eodag/errors.py b/stac_fastapi/eodag/errors.py index 15ba0540..45d66fba 100644 --- a/stac_fastapi/eodag/errors.py +++ b/stac_fastapi/eodag/errors.py @@ -35,7 +35,6 @@ MisconfiguredError, NoMatchingCollection, NotAvailableError, - QuotaExceededError, RequestError, TimeOutError, UnsupportedCollection, @@ -61,7 +60,6 @@ UnsupportedProvider: status.HTTP_404_NOT_FOUND, ValidationError: status.HTTP_400_BAD_REQUEST, RequestError: status.HTTP_400_BAD_REQUEST, - QuotaExceededError: status.HTTP_429_TOO_MANY_REQUESTS, } logger = logging.getLogger(__name__) diff --git a/stac_fastapi/eodag/models/item.py b/stac_fastapi/eodag/models/item.py index 1f1eb5c7..07eb0364 100644 --- a/stac_fastapi/eodag/models/item.py +++ b/stac_fastapi/eodag/models/item.py @@ -26,6 +26,7 @@ from stac_fastapi.types.requests import get_base_url from stac_fastapi.types.stac import Item from stac_pydantic.api.version import STAC_API_VERSION +from stac_pydantic.links import Link, Links from stac_pydantic.shared import Asset from eodag.api.product._product import EOProduct @@ -70,16 +71,13 @@ def create_stac_item( settings: Settings = get_settings() - collection_obj = request.app.state.dag.collections_config.get(product.collection) - collection = collection_obj.id if collection_obj else product.collection - feature = Item( type="Feature", assets={}, id=product.properties["id"], geometry=product.geometry.__geo_interface__, bbox=product.geometry.bounds, - collection=collection, + collection=product.collection, stac_version=STAC_API_VERSION, ) @@ -91,7 +89,7 @@ def create_stac_item( quoted_id = quote(feature["id"]) asset_proxy_url = ( - (download_base_url + f"data/{product.provider}/{collection}/{quoted_id}") + (download_base_url + f"data/{product.provider}/{product.collection}/{quoted_id}") if extension_is_enabled("DataDownload") else None ) @@ -185,11 +183,12 @@ def create_stac_item( if provider := eodag_args.get("provider", None): retrieve_body["federation:backends"] = [provider] + extra_links = Links(root=[Link(**link) for link in feature.get("links", [])]) feature["links"] = ItemLinks( - collection_id=collection, + collection_id=product.collection, item_id=quoted_id, retrieve_body=retrieve_body, request=request, - ).get_links(extensions=extension_names, extra_links=feature.get("links"), request_json=request_json) + ).get_links(extensions=extension_names, extra_links=extra_links, request_json=request_json) return feature diff --git a/stac_fastapi/eodag/models/stac_metadata.py b/stac_fastapi/eodag/models/stac_metadata.py index 52f5a7ab..8b086e15 100644 --- a/stac_fastapi/eodag/models/stac_metadata.py +++ b/stac_fastapi/eodag/models/stac_metadata.py @@ -36,6 +36,7 @@ from stac_pydantic.shared import Provider from typing_extensions import Self +from eodag.api.provider import Provider as EodagProvider from stac_fastapi.eodag.extensions.stac import ( BaseStacExtension, ) @@ -205,13 +206,20 @@ def get_federation_backend_dict(request: Request, provider_name: str) -> dict[st :param provider_name: provider name :return: Federation backend dictionary """ - provider = next( - p for p in request.app.state.dag.providers.values() if provider_name in [p.name, getattr(p, "group", None)] + provider: EodagProvider = next( + cast(EodagProvider, p) + for p in request.app.state.dag.providers.values() + if provider_name in [p.name, p.metadata.get("group", None)] + ) + provider_status = next( + (status for p_id, status in request.app.state.providers_status.items() if provider.name == p_id), + {}, ) return { - "title": provider.group or provider.name, - "description": provider.title, - "url": provider.url, + "title": provider.metadata.get("group", None) or provider.name, + "description": provider.metadata.get("description", None), + "url": provider.metadata.get("url", None), + **provider_status, } diff --git a/tests/conftest.py b/tests/conftest.py index 788eafe6..b51ce03f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,9 +30,9 @@ import pytest from eodag import EODataAccessGateway from eodag.api.product.metadata_mapping import OFFLINE_STATUS, ONLINE_STATUS -from eodag.api.provider import Provider, ProviderConfig, ProvidersDict +from eodag.api.provider import Provider, ProvidersDict from eodag.api.search_result import SearchResult -from eodag.config import PluginConfig +from eodag.config import PluginConfig, ProviderConfig from eodag.plugins.authentication.aws_auth import AwsAuth from eodag.plugins.authentication.base import Authentication from eodag.plugins.authentication.openid_connect import OIDCRefreshTokenBase diff --git a/tests/test_dag.py b/tests/test_dag.py index dc70d9ac..e71206b4 100644 --- a/tests/test_dag.py +++ b/tests/test_dag.py @@ -6,7 +6,8 @@ import pytest from eodag import EODataAccessGateway from eodag.api.collection import Collection, CollectionsDict, CollectionsList -from eodag.api.provider import Provider, ProviderConfig, ProvidersDict +from eodag.api.provider import Provider, ProvidersDict +from eodag.config import ProviderConfig from eodag.utils.exceptions import RequestError, TimeOutError from fastapi import FastAPI from pytest_mock import MockerFixture diff --git a/tests/test_search.py b/tests/test_search.py index ca78a020..204c39c0 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -38,33 +38,6 @@ async def test_request_params_invalid(bbox, request_not_valid, defaults): await request_not_valid(f"search?collections={defaults.collection}&bbox={bbox}") -async def test_invalid_post_search_request(request_not_valid): - """ - Test if an invalid POST /search request body returns an error - """ - post_body = { - "collections": ["S2_MSI_L1C"], - "limit": 1, - "sortby": [{"field": "datetime", "direction": "asc"}], - "query": {"federation:backends": {"eq": "creodias_s3"}}, - "ecmwf:year": {"eq": "2000"}, # additional parameter outside of query - } - await request_not_valid("search", "POST", post_body) - - -async def test_valid_post_search_request(request_valid): - """ - Test if a valid POST /search request body passes validation - """ - post_body = { - "collections": ["S2_MSI_L1C"], - "limit": 1, - "sortby": [{"field": "datetime", "direction": "asc"}], - "query": {"federation:backends": {"eq": "creodias_s3"}}, - } - await request_valid(url="search", method="POST", post_data=post_body) - - @pytest.mark.parametrize("input_bbox,expected_geom", [(None, None), ("bbox_csv", "bbox_list")]) async def test_request_params_valid(request_valid, defaults, input_bbox, expected_geom): """