Skip to content

Commit 42033c1

Browse files
anesson-csalambarejlahovnik
authored
feat: implement polling mechanism (#5)
implements a polling mechanism for products that have to be ordered: - When a search is done for an orderable product, no `assets` are returned but in the `links` section, a `retrieve` link with a POST body is given. With the link the product can be ordered. - The status of the order can be check using the `self` link. - When the order is ready, the `downloadLink` is available. --------- Co-authored-by: LAMBARE Aubin <aubin.lambare@cs-soprasteria.com> Co-authored-by: LAMBARE Aubin <aubin.lambare@csgroup.eu> Co-authored-by: jlahovnik <julia.lahovnik@csgroup.eu>
1 parent f9b43b9 commit 42033c1

19 files changed

Lines changed: 767 additions & 186 deletions

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
<img src="https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png" alt="FastAPI" height=100 />
66
</p>
77

8-
98
[EODAG](https://github.com/CS-SI/eodag) backend for [stac-fastapi](https://github.com/stac-utils/stac-fastapi), the [FastAPI](https://fastapi.tiangolo.com/) implementation of the [STAC API spec](https://github.com/radiantearth/stac-api-spec)
109

1110
stac-fastapi-eodag combines the capabilities of EODAG and STAC FastAPI to provide a powerful, unified API for accessing Earth observation data from various providers.

stac_fastapi/eodag/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919

2020
ITEM_PROPERTIES_EXCLUDE = {
2121
"_id": True,
22+
"productType": True,
2223
"downloadLink": True,
2324
"orderLink": True,
25+
"orderStatus": True,
26+
"orderStatusLink": True,
27+
"searchLink": True,
2428
"missionStartDate": True,
2529
"missionEndDate": True,
2630
"keywords": True,

stac_fastapi/eodag/core.py

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@
4444

4545
from eodag import SearchResult
4646
from eodag.api.core import DEFAULT_ITEMS_PER_PAGE
47+
from eodag.plugins.search.build_search_result import ECMWFSearch
4748
from eodag.utils import deepcopy, get_geometry_from_various
48-
from eodag.utils.exceptions import NoMatchingProductType
49+
from eodag.utils.exceptions import NoMatchingProductType as EodagNoMatchingProductType
4950
from stac_fastapi.eodag.config import get_settings
5051
from stac_fastapi.eodag.cql_evaluate import EodagEvaluator
51-
from stac_fastapi.eodag.errors import ResponseSearchError
52+
from stac_fastapi.eodag.errors import NoMatchingProductType, ResponseSearchError
5253
from stac_fastapi.eodag.landing_page import CustomCoreClient
5354
from stac_fastapi.eodag.models.links import (
5455
CollectionLinks,
@@ -132,10 +133,25 @@ def _get_collection(self, product_type: dict[str, Any], request: Request) -> Col
132133

133134
collection["id"] = product_type["ID"]
134135

136+
# keep only federation backends which allow order mechanism
137+
# to create "retrieve" collection links from them
138+
def has_ecmwf_search_plugin(federation_backends, request):
139+
for fb in federation_backends:
140+
search_plugins = request.app.state.dag._plugins_manager.get_search_plugins(provider=fb)
141+
if any(isinstance(plugin, ECMWFSearch) for plugin in search_plugins):
142+
return True
143+
return False
144+
135145
extension_names = [type(ext).__name__ for ext in self.extensions]
136-
collection["links"] = CollectionLinks(collection_id=collection["id"], request=request).get_links(
137-
extensions=extension_names, extra_links=product_type.get("links", []) + collection.get("links", [])
138-
)
146+
if self.extension_is_enabled("CollectionOrderExtension") and not has_ecmwf_search_plugin(
147+
federation_backends, request
148+
):
149+
extension_names.remove("CollectionOrderExtension")
150+
151+
collection["links"] = CollectionLinks(
152+
collection_id=collection["id"],
153+
request=request,
154+
).get_links(extensions=extension_names, extra_links=product_type.get("links", []) + collection.get("links", []))
139155

140156
collection["providers"] = merge_providers(
141157
collection.get("providers", []) + [get_provider_dict(request, fb) for fb in federation_backends]
@@ -144,40 +160,42 @@ def _get_collection(self, product_type: dict[str, Any], request: Request) -> Col
144160
return collection
145161

146162
def _search_base(self, search_request: BaseSearchPostRequest, request: Request) -> ItemCollection:
163+
eodag_args = prepare_search_base_args(search_request=search_request, model=self.stac_metadata_model)
164+
165+
request.state.eodag_args = eodag_args
166+
147167
# check if the collection exists
148-
if search_request.collections:
168+
if product_type := eodag_args.get("productType"):
149169
all_pt = request.app.state.dag.list_product_types(fetch_providers=False)
150170
# only check the first collection (EODAG search only support a single collection)
151-
existing_pt = [pt for pt in all_pt if pt["ID"] == search_request.collections[0]]
171+
existing_pt = [pt for pt in all_pt if pt["ID"] == product_type]
152172
if not existing_pt:
153-
raise NoMatchingProductType(f"Collection {search_request.collections[0]} does not exist.")
173+
raise NoMatchingProductType(f"Collection {product_type} does not exist.")
154174
else:
155175
raise HTTPException(status_code=400, detail="A collection is required")
156176

157177
# get products by ids
158-
if search_request.ids:
178+
if ids := eodag_args.pop("ids", []):
159179
search_result = SearchResult([])
160-
ids = search_request.ids
161180
for item_id in ids:
162-
search_request.ids = [item_id]
163-
base_args = prepare_search_base_args(search_request=search_request, model=self.stac_metadata_model)
164-
search_result.extend(request.app.state.dag.search(**base_args))
181+
eodag_args["id"] = item_id
182+
search_result.extend(request.app.state.dag.search(**eodag_args))
165183
search_result.number_matched = len(search_result)
166184
else:
167185
# search without ids
168-
base_args = prepare_search_base_args(search_request=search_request, model=self.stac_metadata_model)
169-
search_result = request.app.state.dag.search(**base_args)
186+
search_result = request.app.state.dag.search(**eodag_args)
170187

171188
if search_result.errors and not len(search_result):
172189
raise ResponseSearchError(search_result.errors, self.stac_metadata_model)
173190

174191
request_json = loop.run_until_complete(request.json()) if request.method == "POST" else None
175192

176193
features: list[Item] = []
194+
extension_names = [type(ext).__name__ for ext in self.extensions]
177195

178196
for product in search_result:
179197
feature = create_stac_item(
180-
product, self.stac_metadata_model, self.extension_is_enabled, request, request_json
198+
product, self.stac_metadata_model, self.extension_is_enabled, request, extension_names, request_json
181199
)
182200
features.append(feature)
183201

@@ -198,7 +216,6 @@ def _search_base(self, search_request: BaseSearchPostRequest, request: Request)
198216
):
199217
next_page = search_request.page + 1
200218

201-
extension_names = [type(ext).__name__ for ext in self.extensions]
202219
collection["links"] = PagingLinks(
203220
request=request,
204221
next=next_page,
@@ -246,7 +263,7 @@ async def all_collections(
246263
guessed_product_types = request.app.state.dag.guess_product_type(
247264
free_text=q, missionStartDate=start, missionEndDate=end
248265
)
249-
except NoMatchingProductType:
266+
except EodagNoMatchingProductType:
250267
product_types = []
251268
else:
252269
product_types = [pt for pt in all_pt if pt["ID"] in guessed_product_types]
@@ -416,11 +433,11 @@ def get_search(
416433

417434
if filter_expr:
418435
if filter_lang == "cql2-text":
419-
ast = parse_cql2_text(filter_expr)
420-
base_args["filter_expr"] = str2json("filter_expr", to_cql2(ast)) # type: ignore
421-
base_args["filter-lang"] = "cql2-json"
422-
elif filter_lang == "cql-json":
423-
base_args["filter_expr"] = str2json(filter_expr)
436+
filter_expr = to_cql2(parse_cql2_text(filter_expr))
437+
filter_lang = "cql2-json"
438+
439+
base_args["filter"] = str2json("filter_expr", filter_expr)
440+
base_args["filter_lang"] = "cql2-json"
424441

425442
# Remove None values from dict
426443
clean = {}
@@ -433,7 +450,7 @@ def get_search(
433450
except ValidationError as err:
434451
raise HTTPException(status_code=400, detail=f"Invalid parameters provided {err}") from err
435452

436-
return self.post_search(search_request, request)
453+
return self._search_base(search_request, request)
437454

438455
async def get_item(self, item_id: str, collection_id: str, request: Request, **kwargs: Any) -> Item:
439456
"""
@@ -555,9 +572,8 @@ def prepare_search_base_args(search_request: BaseSearchPostRequest, model: type[
555572
if search_request.collections:
556573
base_args["productType"] = search_request.collections[0]
557574

558-
# handle only one id from here (pre-filtered in _search_base)
559575
if search_request.ids:
560-
base_args["id"] = search_request.ids[0]
576+
base_args["ids"] = search_request.ids
561577

562578
# merge all eodag search arguments
563579
base_args = base_args | sort_by | eodag_filter | eodag_query

stac_fastapi/eodag/eodag_types/queryables.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class QueryablesGetParams(BaseModel):
2828
"""Store GET Queryables query params"""
2929

3030
collection: Optional[str] = Field(default=None, serialization_alias="productType")
31+
provider: Optional[str] = Field(default=None)
3132
start_datetime: Union[list[str], str, None] = Field(default=None)
3233
end_datetime: Union[list[str], str, None] = Field(default=None)
3334
datetime: Union[list[str], str, None] = Field(default=None)

stac_fastapi/eodag/extensions/collection_order.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,22 @@
2020
import logging
2121
from typing import (
2222
Annotated,
23+
Optional,
2324
cast,
2425
)
2526

2627
import attr
2728
from eodag.api.core import EODataAccessGateway
2829
from eodag.api.product._product import EOProduct
2930
from eodag.api.product.metadata_mapping import OFFLINE_STATUS
30-
from fastapi import APIRouter, FastAPI, Path, Query, Request
31-
from stac_fastapi.api.routes import create_async_endpoint
32-
from stac_fastapi.types.errors import NotFoundError
31+
from fastapi import APIRouter, Depends, FastAPI, Path, Request
32+
from pydantic import BaseModel, ConfigDict, Field
33+
from stac_fastapi.api.errors import NotFoundError
34+
from stac_fastapi.api.routes import _wrap_response, sync_to_async
3335
from stac_fastapi.types.extension import ApiExtension
3436
from stac_fastapi.types.search import APIRequest
3537
from stac_fastapi.types.stac import Item
3638

37-
from stac_fastapi.eodag.errors import (
38-
MisconfiguredError,
39-
)
4039
from stac_fastapi.eodag.models.stac_metadata import (
4140
CommonStacMetadata,
4241
create_stac_item,
@@ -45,6 +44,20 @@
4544
logger = logging.getLogger(__name__)
4645

4746

47+
class CollectionOrderBody(BaseModel):
48+
"""Collection order request body."""
49+
50+
federation_backends: Optional[list[str]] = Field(
51+
default=None,
52+
description=(
53+
"Federation backends filter. Default is None which means no filter is applied. Only one value is supported"
54+
),
55+
alias="federation:backends",
56+
)
57+
58+
model_config = ConfigDict(extra="allow", json_schema_extra={"examples": [{"date": "string", "variable": "string"}]})
59+
60+
4861
@attr.s
4962
class BaseCollectionOrderClient:
5063
"""Defines a pattern for implementing the collection order extension."""
@@ -58,16 +71,18 @@ def extension_is_enabled(self, extension: str) -> bool:
5871

5972
def order_collection(
6073
self,
61-
federation_backend: str,
6274
collection_id: str,
63-
dc_qs: str,
6475
request: Request,
76+
request_body: CollectionOrderBody,
6577
) -> Item:
6678
"""Order a product with its collection id and a fake id"""
6779

6880
dag = cast(EODataAccessGateway, request.app.state.dag)
6981

70-
search_results = dag.search(id="fake_id", productType=collection_id, provider=federation_backend, _dc_qs=dc_qs)
82+
federation_backend = request_body.federation_backends[0] if request_body.federation_backends else None
83+
84+
request_params = request_body.model_dump(exclude={"federation_backends": True})
85+
search_results = dag.search(productType=collection_id, provider=federation_backend, **request_params)
7186
if len(search_results) > 0:
7287
product = cast(EOProduct, search_results[0])
7388

@@ -76,62 +91,51 @@ def order_collection(
7691
f"Could not find any item in {collection_id} collection for backend {federation_backend}.",
7792
)
7893

79-
if not getattr(product.downloader, "order_download", None):
80-
raise MisconfiguredError("Product downloader must have a the order method")
81-
8294
auth = product.downloader_auth.authenticate() if product.downloader_auth else None
8395

8496
if product.properties.get("orderLink") is None or product.properties.get("storageStatus") != OFFLINE_STATUS:
8597
raise NotFoundError(
8698
"Product is not orderable. Please download it directly.",
8799
)
88100

89-
if product.properties.get("orderStatus"):
90-
raise NotFoundError(
91-
"Product has been ordered previously. Please request the polling endpoint before download it."
92-
)
93-
94101
raise_error = False
95102
if product.downloader is None:
96103
logger.error("No downloader available for %s", product)
97104
raise_error = True
98-
99-
elif not hasattr(product.downloader, "order_download"):
100-
logger.error("No order_download method available for %s of %s", product.downloader, product)
105+
elif not hasattr(product.downloader, "order"):
106+
logger.error("No order() method available for %s of %s", product.downloader, product)
101107
raise_error = True
102108
else:
103109
logger.debug("Order product")
104-
_ = product.downloader.order_download(product=product, auth=auth)
110+
product.downloader.order(product=product, auth=auth, timeout=-1)
105111

106112
if raise_error or product.properties.get("orderId") is None:
107113
raise NotFoundError(
108114
"Download order failed. It can be due to a lack of product found, so you "
109-
f"may change 'dc_qs' argument. The one used for this order was: {dc_qs}"
115+
"may change the body of the request."
110116
)
117+
extension_names = [type(ext).__name__ for ext in self.extensions]
111118

112-
return create_stac_item(product, self.stac_metadata_model, self.extension_is_enabled, request)
119+
return create_stac_item(product, self.stac_metadata_model, self.extension_is_enabled, request, extension_names)
113120

114121

115122
@attr.s
116123
class CollectionOrderUri(APIRequest):
117124
"""Order collection."""
118125

119-
federation_backend: Annotated[str, Path(description="Federation backend name")] = attr.ib()
120126
collection_id: Annotated[str, Path(description="Collection ID")] = attr.ib()
121-
dc_qs: Annotated[str, Query(description="Datacube query string")] = attr.ib()
122127

123128

124129
@attr.s
125130
class CollectionOrderExtension(ApiExtension):
126131
"""Collection Order extension.
127132
128-
The order-collection extension allow to order a collection directly through the EODAG STAC
129-
server.
133+
The order-collection extension allow to order a collection directly through the EODAG STAC server.
130134
131135
Usage:
132136
------
133137
134-
``POST /collections/{collection_id}/{federation_backend}/orders``
138+
``POST /collections/{collection_id}/order``
135139
"""
136140

137141
client: BaseCollectionOrderClient = attr.ib(factory=BaseCollectionOrderClient)
@@ -144,10 +148,20 @@ def register(self, app: FastAPI) -> None:
144148
:param app: Target FastAPI application.
145149
:returns: None
146150
"""
151+
func = sync_to_async(self.client.order_collection)
152+
153+
async def _retrieve_endpoint(
154+
request: Request,
155+
request_data: CollectionOrderBody,
156+
request_path: CollectionOrderUri = Depends(), # noqa: B008
157+
):
158+
"""Retrieve endpoint."""
159+
return _wrap_response(await func(request=request, request_body=request_data, **request_path.kwargs()))
160+
147161
self.router.prefix = app.state.router_prefix
148162
self.router.add_api_route(
149163
name="Order collection",
150-
path="/collections/{collection_id}/{federation_backend}/orders",
164+
path="/collections/{collection_id}/order",
151165
methods=["POST"],
152166
responses={
153167
200: {
@@ -156,6 +170,6 @@ def register(self, app: FastAPI) -> None:
156170
},
157171
}
158172
},
159-
endpoint=create_async_endpoint(self.client.order_collection, CollectionOrderUri),
173+
endpoint=_retrieve_endpoint,
160174
)
161175
app.include_router(self.router, tags=["Collection order"])

0 commit comments

Comments
 (0)