Skip to content

Commit f5494d6

Browse files
committed
feat: use eodag with database
1 parent 5a6dc68 commit f5494d6

5 files changed

Lines changed: 71 additions & 185 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ license = { file = "LICENSE" }
88
requires-python = ">= 3.9"
99
dependencies = [
1010
"attr",
11-
"eodag[all-providers] @ git+https://github.com/CS-SI/eodag.git@develop",
11+
"eodag[all-providers] @ git+https://github.com/CS-SI/eodag.git@db-for-collections-storage",
1212
"fastapi",
1313
"geojson",
1414
"geojson-pydantic",

stac_fastapi/eodag/app.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@
114114

115115
# collection_search extensions
116116
cs_extensions_map = {
117-
"query": QueryExtension(conformance_classes=[QueryConformanceClasses.COLLECTIONS]),
118117
"offset-pagination": OffsetPaginationExtension(),
119118
"collection-search": CollectionSearchExtension(),
120119
"free-text": FreeTextExtension(conformance_classes=[FreeTextConformanceClasses.COLLECTIONS]),

stac_fastapi/eodag/core.py

Lines changed: 52 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import asyncio
2323
import logging
2424
import re
25-
from typing import TYPE_CHECKING, Any, cast
25+
from typing import TYPE_CHECKING, Any, Optional, cast
2626
from urllib.parse import unquote_plus
2727

2828
import attr
@@ -36,7 +36,6 @@
3636
from stac_fastapi.api.models import create_post_request_model
3737
from stac_fastapi.types.errors import NotFoundError
3838
from stac_fastapi.types.requests import get_base_url
39-
from stac_fastapi.types.rfc3339 import str_to_interval
4039
from stac_fastapi.types.search import BaseSearchPostRequest
4140
from stac_fastapi.types.stac import Collection, Collections, Item, ItemCollection
4241
from stac_pydantic.links import Relations
@@ -46,8 +45,6 @@
4645
from eodag.api.collection import Collection as EodagCollection
4746
from eodag.api.collection import CollectionsList
4847
from eodag.plugins.search.build_search_result import ECMWFSearch
49-
from eodag.utils import deepcopy, get_geometry_from_various
50-
from eodag.utils.exceptions import NoMatchingCollection as EodagNoMatchingCollection
5148
from stac_fastapi.eodag.client import CustomCoreClient
5249
from stac_fastapi.eodag.config import get_settings
5350
from stac_fastapi.eodag.constants import DEFAULT_LIMIT
@@ -62,15 +59,13 @@
6259
)
6360
from stac_fastapi.eodag.models.stac_metadata import CommonStacMetadata
6461
from stac_fastapi.eodag.utils import (
65-
check_poly_is_point,
66-
dt_range_to_eodag,
6762
format_datetime_range,
6863
is_dict_str_any,
6964
str2json,
7065
)
7166

7267
if TYPE_CHECKING:
73-
from typing import Optional, Union
68+
from typing import Union
7469

7570
from fastapi import Request
7671
from pydantic import BaseModel
@@ -94,52 +89,11 @@ def _get_collection(
9489
self, collection: EodagCollection, request: Request, collections_providers: dict[str, set]
9590
) -> Collection:
9691
"""Convert a EODAG produt type to a STAC collection."""
97-
# extend collection with external stac collection if any
98-
extended_collection = Collection(deepcopy(request.app.state.ext_stac_collections.get(collection.id, {})))
99-
extended_collection["type"] = "Collection"
100-
101-
platform_value = [p for p in (collection.platform or "").split(",") if p]
102-
constellation = [c for c in (collection.constellation or "").split(",") if c]
103-
processing_level = [pl for pl in (collection.processing_level or "").split(",") if pl]
104-
instruments = collection.instruments or []
105-
federation_backends = collections_providers.get(collection._id, set())
106-
107-
summaries: dict[str, Any] = {
108-
"platform": platform_value,
109-
"constellation": constellation,
110-
"processing:level": processing_level,
111-
"instruments": instruments,
112-
"federation:backends": federation_backends,
113-
}
114-
extended_collection["summaries"] = {
115-
**(getattr(collection, "summaries", {}) or {}),
116-
**{k: v for k, v in summaries.items() if v},
117-
}
118-
119-
extended_collection["extent"] = {
120-
"spatial": extended_collection.get("extent", {}).get("spatial")
121-
or collection.extent.spatial.to_dict()
122-
or {"bbox": [[-180.0, -90.0, 180.0, 90.0]]},
123-
"temporal": extended_collection.get("extent", {}).get("temporal")
124-
or collection.extent.temporal.to_dict()
125-
or {"interval": [[None, None]]},
126-
}
127-
128-
for key in ["license", "description", "title"]:
129-
if key not in extended_collection and (value := getattr(collection, key)):
130-
extended_collection[key] = value
131-
132-
keywords = collection.keywords or []
133-
keywords = keywords.split(",") if isinstance(keywords, str) else keywords
134-
try:
135-
extended_collection["keywords"] = list(set(keywords + extended_collection.get("keywords", [])))
136-
except TypeError as e:
137-
logger.warning("Could not merge keywords from external collection for %s: %s", collection.id, str(e))
138-
139-
extended_collection["id"] = collection.id
14092

14193
# keep only federation backends which allow order mechanism
14294
# to create "retrieve" collection links from them
95+
# TODO: this needs to be changed: we cannot request the search plugins for each collection, it is too costly.
96+
# TODO: We should find a way to know which federation backends support the order mechanism without requesting the plugins manager
14397
def has_ecmwf_search_plugin(federation_backends, request):
14498
for fb in federation_backends:
14599
search_plugins = request.app.state.dag._plugins_manager.get_search_plugins(provider=fb)
@@ -148,22 +102,22 @@ def has_ecmwf_search_plugin(federation_backends, request):
148102
return False
149103

150104
extension_names = [type(ext).__name__ for ext in self.extensions]
105+
106+
federation_backends = set(
107+
request.app.state.dag.db.get_federation_backends(collection=collection._id, enabled=True)
108+
)
151109
if self.extension_is_enabled("CollectionOrderExtension") and not has_ecmwf_search_plugin(
152110
federation_backends, request
153111
):
154112
extension_names.remove("CollectionOrderExtension")
155113

156-
if collection.links:
157-
extra_links = [link.model_dump() for link in collection.links.root]
158-
else:
159-
extra_links = []
160-
extended_coll_links = extended_collection.get("links", [])
161-
extended_collection["links"] = CollectionLinks(
162-
collection_id=extended_collection["id"],
114+
coll_with_links = collection.model_dump(mode="json", exclude={"alias", "eodag_stac_collection"})
115+
coll_with_links["links"] = CollectionLinks(
116+
collection_id=collection.id,
163117
request=request,
164-
).get_links(extensions=extension_names, extra_links=extra_links + extended_coll_links)
118+
).get_links(extensions=extension_names, extra_links=coll_with_links["links"])
165119

166-
return Collection(**extended_collection)
120+
return Collection(**coll_with_links)
167121

168122
async def _search_base(self, search_request: BaseSearchPostRequest, request: Request) -> ItemCollection:
169123
eodag_args = prepare_search_base_args(search_request=search_request, model=self.stac_metadata_model)
@@ -176,7 +130,7 @@ async def _search_base(self, search_request: BaseSearchPostRequest, request: Req
176130

177131
# check if the collection exists
178132
if collection := eodag_args.get("collection"):
179-
all_coll = await asyncio.to_thread(request.app.state.dag.list_collections, fetch_providers=False)
133+
all_coll = await asyncio.to_thread(request.app.state.dag.list_collections)
180134
# only check the first collection (EODAG search only support a single collection)
181135
existing_coll = [coll for coll in all_coll if coll.id == collection]
182136
if not existing_coll:
@@ -237,9 +191,12 @@ async def all_collections(
237191
bbox: Optional[list[NumType]] = None,
238192
datetime: Optional[str] = None,
239193
limit: Optional[int] = 10,
194+
# Extensions
240195
offset: Optional[int] = 0,
241196
q: Optional[list[str]] = None,
242-
query: Optional[str] = None,
197+
sortby: Optional[list[str]] = None,
198+
filter_expr: Optional[str] = None,
199+
filter_lang: Optional[str] = "cql2-text",
243200
) -> Collections:
244201
"""
245202
Get all collections from EODAG.
@@ -250,7 +207,10 @@ async def all_collections(
250207
:param limit: Maximum number of collections to return.
251208
:param offset: Starting position from which to return collections.
252209
:param q: Query string to filter the collections.
253-
:param query: Query string to filter collections.
210+
:param query: Query string to filter the search.
211+
:param sortby: List of fields to sort the results by.
212+
:param filter_expr: CQL filter to apply to the search.
213+
:param filter_lang: Language of the filter.
254214
:returns: All collections.
255215
:raises HTTPException: If the unsupported bbox parameter is provided.
256216
"""
@@ -260,68 +220,28 @@ async def all_collections(
260220
prev_link: Optional[dict[str, Any]] = None
261221
first_link: Optional[dict[str, Any]] = None
262222

263-
# get provider filter
264-
provider = None
265-
if query:
266-
query_attr = orjson.loads(unquote_plus(query))
267-
parsed_query = parse_query(query_attr)
268-
provider = parsed_query.get("federation:backends")
269-
provider = provider[0] if isinstance(provider, list) else provider
223+
cql2_json = None
224+
if filter_expr:
225+
if filter_lang == "cql2-text":
226+
filter_expr = to_cql2(parse_cql2_text(filter_expr))
227+
filter_lang = "cql2-json"
270228

271-
all_colls = await asyncio.to_thread(
272-
request.app.state.dag.list_collections, provider=provider, fetch_providers=False
229+
cql2_json = str2json("filter_expr", filter_expr)
230+
231+
collections = cast(
232+
CollectionsList,
233+
await asyncio.to_thread(
234+
request.app.state.dag.list_collections,
235+
geometry=bbox,
236+
datetime=datetime,
237+
limit=limit,
238+
q=q,
239+
cql2_json=cql2_json,
240+
sortby=sortby
241+
)
273242
)
274243

275-
# datetime & free-text-search filters
276-
if any((q, datetime)):
277-
start, end = dt_range_to_eodag(str_to_interval(datetime))
278-
279-
# q is always a list, per stac-api free_text extension definiton
280-
# Expanding with AND as default.
281-
free_text = " AND ".join(q or [])
282-
283-
try:
284-
guessed_collections = await asyncio.to_thread(
285-
request.app.state.dag.guess_collection,
286-
free_text=free_text,
287-
start_date=start,
288-
end_date=end,
289-
)
290-
guessed_collections_ids = [coll.id for coll in guessed_collections]
291-
except EodagNoMatchingCollection:
292-
collections = CollectionsList([])
293-
else:
294-
collections = CollectionsList([coll for coll in all_colls if coll.id in guessed_collections_ids])
295-
else:
296-
collections = all_colls
297-
298-
providers = request.app.state.dag.providers
299-
collections_providers: dict[str, set] = {}
300-
for p_name, p in providers.items():
301-
if getattr(p.config, "products", None):
302-
for coll in p.config.products:
303-
if coll not in collections_providers:
304-
collections_providers[coll] = set()
305-
collections_providers[coll].add(p_name)
306-
307-
formatted_collections = [self._get_collection(coll, request, collections_providers) for coll in collections]
308-
309-
# bbox filter
310-
if bbox:
311-
bbox_geom = get_geometry_from_various(geometry=bbox)
312-
313-
default_extent = [[-180.0, -90.0, 180.0, 90.0]]
314-
formatted_collections = [
315-
c
316-
for c in formatted_collections
317-
if check_poly_is_point(
318-
get_geometry_from_various( # type: ignore
319-
geometry=c.get("extent", {}).get("spatial", {}).get("bbox", default_extent)[0]
320-
)
321-
).intersection(bbox_geom)
322-
]
323-
324-
total = len(formatted_collections)
244+
number_matched = cast(int, collections.number_matched)
325245

326246
links = [
327247
{
@@ -336,16 +256,21 @@ async def all_collections(
336256
limit = limit if limit is not None else 10
337257
offset = offset if offset is not None else 0
338258

339-
formatted_collections = formatted_collections[offset : offset + limit]
259+
collections = collections[offset : offset + limit]
260+
# info about number matched was lost during the slice, then restore it
261+
# TODO: find a way to not lose it during the slice
262+
collections.number_matched = number_matched
340263

341-
if offset + limit < total:
264+
if offset + limit < collections.number_matched:
342265
next_link = {"body": {"limit": limit, "offset": offset + limit}}
343266

344267
if offset > 0:
345268
prev_link = {"body": {"limit": limit, "offset": max(0, offset - limit)}}
346269

347270
first_link = {"body": {"limit": limit, "offset": 0}}
348271

272+
formatted_collections = [self._get_collection(coll, request) for coll in collections]
273+
349274
extension_names = [type(ext).__name__ for ext in self.extensions]
350275

351276
paging_links = CollectionSearchPagingLinks(
@@ -357,8 +282,8 @@ async def all_collections(
357282
return Collections(
358283
collections=formatted_collections,
359284
links=links,
360-
numberMatched=total,
361-
numberReturned=len(formatted_collections),
285+
numberMatched=collections.number_matched,
286+
numberReturned=len(collections),
362287
)
363288

364289
async def get_collection(self, collection_id: str, request: Request, **kwargs: Any) -> Collection:
@@ -373,11 +298,8 @@ async def get_collection(self, collection_id: str, request: Request, **kwargs: A
373298
:returns: The collection.
374299
:raises NotFoundError: If the collection does not exist.
375300
"""
376-
all_collections = await asyncio.to_thread(request.app.state.dag.list_collections, fetch_providers=False)
377-
collection = next(
378-
(c for c in all_collections if c.id == collection_id),
379-
None,
380-
)
301+
collection = cast(Optional[EodagCollection], await asyncio.to_thread(request.app.state.dag.get_collection, id=collection_id))
302+
381303
if collection is None:
382304
raise NotFoundError(f"Collection {collection_id} does not exist.")
383305

0 commit comments

Comments
 (0)