Skip to content

feat: streaming per-period Icechunk ingest — full #64 implementation#144

Draft
turban wants to merge 81 commits into
mainfrom
feat/icechunk-ingest
Draft

feat: streaming per-period Icechunk ingest — full #64 implementation#144
turban wants to merge 81 commits into
mainfrom
feat/icechunk-ingest

Conversation

@turban
Copy link
Copy Markdown
Contributor

@turban turban commented May 19, 2026

Summary

Complete implementation of the streaming per-period ingest architecture from #64, building on the Track 1 probe spike (notebook, branch probe-spike).

Track 2 — per-period ingest

climate_api/ingest/protocol.pyGridSpec dataclass and IngestionPlugin Protocol. Plugins implement three sync methods and never touch zarr:

class IngestionPlugin(Protocol):
    max_concurrency: int      # parallel fetch limit
    commit_batch_size: int    # cursor checkpoint interval

    def probe(self, bbox, **params) -> GridSpec: ...
    def periods(self, start, end) -> list[str]: ...
    def fetch_period(self, period_id, bbox, **params) -> xr.Dataset: ...

Methods are sync — the orchestrator runs probe and fetch_period in asyncio.to_thread so I/O-bound plugins run in the thread pool without managing their own executor. periods is called directly (pure computation, no I/O).

enumerate_periods(start, end, period_type, cutoff=None) is a shared utility for standard period enumeration (daily/hourly/monthly/yearly with optional availability cutoff). Built-in plugins call it from periods(); plugin authors can import it alongside GridSpec.

GridSpec gains extra_dims: dict[str, int] for multidimensional stores. rechunk_time and pyramid are optional class attributes read via getattr and excluded from the Protocol — plugins that omit them still pass the isinstance check (Python 3.13 checks non-callable protocol members).

climate_api/ingest/orchestrator.pyrun_ingest() async loop:

  • Creates all fetch tasks upfront; awaits in chronological order so writes are sequential while fetches overlap (bounded by max_concurrency)
  • Opens a fresh Icechunk writable session per period; saves the job cursor every commit_batch_size periods
  • Resume: reads last_committed from job cursor; falls back to reading the store's committed time coordinates
  • Cancellation: checks is_cancel_requested() before each period; raises JobCancelledError
  • After ingest: rechunk pass (if rechunk_time set), pyramid build (if pyramid=True), then repo.expire_snapshots() to prune intermediate snapshots
  • run_ingest_sync() wrapper calls asyncio.run() for use in the threaded job framework

climate_api/ingest/store.pyopen_or_create_repo(), read_committed_period_ids(), rechunk_store(), and build_pyramid_store() helpers.

Track 3 — store-based sync

Sync reads committed time coordinates from the Icechunk store via read_committed_period_ids() and appends only missing periods — replaces the previous changed_files mtime-diff approach. Sync availability is derived entirely from plugin.periods(); the sync.availability.latest_available_function YAML field and climate_api.providers.availability module have been removed.

Track 4 — in-place rechunk

After initial ingest, rechunk_store() rewrites the store with a coarser time chunk (e.g. time: 1time: 30) using Icechunk's MVCC. No separate data copy; the previous snapshot is preserved until GC.

Track 5 — multiscale pyramid

After rechunk, build_pyramid_store() builds a multiscale overview pyramid when the plugin declares pyramid = True. Level count is computed automatically from spatial dimensions (512-pixel tile target, 2048×2048 pixel threshold, max 8 levels). The pyramid is written as a DataTree via topozarr.create_pyramid() and committed as a new Icechunk snapshot. open_icechunk_dataset() detects pyramid stores via "multiscales" in root.attrs and opens group "0" automatically.

Built-in plugins

plugins/era5_land.pyEra5LandPlugin for hourly ERA5-Land from DestinE Earth Data Hub: monthly fetch, daily aggregation, timezone-aware UTC offset. max_concurrency = 4, commit_batch_size = 720.

plugins/chirps3.pyChirps3Plugin for CHIRPS v3 daily precipitation: COG range requests, only the configured bbox window is downloaded. max_concurrency = 4, commit_batch_size = 30, rechunk_time = 30.

plugins/worldpop.pyWorldPopPlugin for WorldPop Global2 yearly population: per-country GeoTIFF download, clipped to bbox. max_concurrency = 1, commit_batch_size = 1, pyramid = True. country_code required in ingestion.params.

Data accessor and HTTP serving

  • open_icechunk_dataset() opens an Icechunk store via a readonly MVCC session; detects pyramid stores and opens group "0" automatically
  • get_data() uses open_icechunk_dataset() directly via get_icechunk_path(dataset) — the legacy flat-zarr and NetCDF fallback paths have been removed
  • /zarr/{dataset_id} and /zarr/{dataset_id}/{path} handle ArtifactFormat.ICECHUNK by serving zarr keys through session.store._get_bytes_sync(key); bare group-path requests (e.g. GET /zarr/dataset/0) return an HTML directory listing so fsspec's _ls_real can traverse pyramid subgroups, enabling xr.open_zarr(group="0") over HTTP
  • STAC pyramid assets: href now points to the pyramid root (/zarr/{dataset_id}) so zarr-layer clients select zoom levels by opening numbered subgroups; the sharding_indexed codec is stripped from pyramid-level metadata for JS client compatibility
  • STAC, resample (/processes/resample), and publications all recognise ICECHUNK alongside ZARR

Dataset registry

ingestion.plugin replaces ingestion.function in dataset templates. The registry validator now requires ingestion.plugin. New built-in templates: chirps3.yaml, worldpop.yaml.

Legacy code removed

This PR removes six categories of code that had accumulated since the pre-Icechunk era. 314 lines deleted across 15 files.

  • ArtifactFormat.NETCDF — enum value removed; /datasets/{id}/download endpoint and the NETCDF download link in _dataset_links removed with it
  • ArtifactRecord.path — single-file legacy field removed; all path-fallback logic (record.path or record.asset_paths[0]) replaced with direct asset_paths[0]
  • prefer_zarr parameter — removed from create_artifact, sync_dataset, _store_artifact_record, _upsert_artifact_record, _find_existing_artifact, _find_existing_artifact_in_records, and both sync/system call sites
  • _upgrade_legacy_record — format-migration function removed; _load_records and _mutate_records now deserialise directly with model_validate
  • list_datasets() / open_dataset() — module-level duplicates in client.py removed (superseded by Client.catalog() and Client.open())
  • _clear_xstac_collection_cache() — dead function removed from stac/services.py; tests updated to clear the cache dict directly
  • climate_api/providers/ packageavailability.py and entire module deleted
  • build_dataset_zarr() and the NetCDF-cache-to-zarr pipeline — removed from data_manager/services/downloader.py
  • NetCDF/flat-zarr fallback in get_data() — all deployed stores are Icechunk
  • _validate_sync_availability() — validated a YAML field referencing the deleted providers module

Examples

examples/stac_discover_and_open.py and examples/zarr_direct_access.py simplified: the _spatial_dims() helper and conditional dim-name logic have been removed since all stores normalise spatial dimensions to x and y.

Tests

  • 14 tests in tests/test_ingest_orchestrator.py — network-free, covering resume, cancellation, cursor, idempotency
  • 41 tests in tests/test_ingest_plugins.py — CHIRPS3 and WorldPop plugins (probe, periods, fetch_period, URL construction, period clamping, nodata masking, time encoding)

Documentation

  • extensibility.md: IngestionPlugin added as a first-class extension point with full protocol reference and GridSpec field table
  • adding_custom_datasets.md: plugin path documented; complete plugin skeleton and YAML template example
  • built_in_datasets.md: ingest method and sync behaviour descriptions updated for all three built-in plugins
  • architecture.md: dual-path data lifecycle diagram; IngestionPlugin in the plugin contract section; updated append execution mode description
  • zarr_and_geozarr.md: Icechunk section added covering MVCC snapshot model, pyramid detection, and HTTP key-serving

Breaking changes

ingestion.function removed — dataset templates that relied on ingestion.function must migrate to ingestion.plugin. Built-in templates are unaffected.

sync.availability.latest_available_function removed — the climate_api.providers.availability module and all availability function dispatch are gone. Sync end date is now derived from plugin.periods().

build_dataset_zarr removed — the legacy NetCDF-cache-to-zarr pipeline no longer exists. All ingestion goes through IngestionPlugin → Icechunk.

prefer_zarr parameter removed — any external callers of create_artifact or sync_dataset that passed prefer_zarr must remove the argument.

ArtifactFormat.NETCDF removed — any code referencing this enum value or the /datasets/{id}/download endpoint must be updated.

Related

@turban turban marked this pull request as draft May 19, 2026 15:21
@turban turban changed the title feat: per-period Icechunk ingest — protocol, orchestrator, ERA5-Land plugin (#64 Track 2) feat: streaming per-period Icechunk ingest — full #64 implementation May 19, 2026
@turban turban requested a review from Copilot May 19, 2026 21:37
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements the “streaming per-period ingest” architecture for Icechunk-backed Zarr v3 stores by introducing an IngestionPlugin protocol + orchestrator loop, adding built-in plugins (ERA5-Land, CHIRPS3, WorldPop), and updating the API surface (STAC, /zarr, resample, publications, sync) to recognize ArtifactFormat.ICECHUNK.

Changes:

  • Add new ingest subsystem (climate_api.ingest.*) for per-period streaming writes, resume/cursor support, and optional post-ingest rechunking.
  • Add built-in ingestion plugins for ERA5-Land, CHIRPS3, and WorldPop; update built-in dataset templates to reference ingestion.plugin.
  • Extend serving/discovery/processing paths (STAC, /zarr, resample, publications, sync) to handle ICECHUNK artifacts, and add substantial unit test coverage + documentation updates.

Reviewed changes

Copilot reviewed 39 out of 40 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
uv.lock Adds Icechunk dependency lock entry.
pyproject.toml Adds icechunk>=2.0,<3 runtime dependency.
climate_api/ingestions/schemas.py Introduces ArtifactFormat.ICECHUNK.
climate_api/ingest/protocol.py Defines GridSpec and IngestionPlugin protocol.
climate_api/ingest/orchestrator.py Implements async per-period ingest loop and sync wrapper.
climate_api/ingest/store.py Adds Icechunk repo open/create, committed-period read, and rechunk helper.
climate_api/ingest/init.py Exposes ingest protocol/orchestrator symbols.
climate_api/ingest/plugins/init.py Declares built-in plugin package.
climate_api/ingest/plugins/chirps3.py Adds CHIRPS3 daily plugin implementation.
climate_api/ingest/plugins/era5_land.py Adds ERA5-Land plugin implementation.
climate_api/ingest/plugins/worldpop.py Adds WorldPop yearly plugin implementation.
climate_api/data/datasets/chirps3.yaml Switches template ingestion to plugin + params.
climate_api/data/datasets/era5_land.yaml Switches templates ingestion to plugin + params.
climate_api/data/datasets/worldpop.yaml Switches template ingestion to plugin + params.
climate_api/data_registry/services/datasets.py Updates dataset template validation for ingestion changes.
climate_api/ingestions/services.py Routes ingestion + /zarr store listing/serving through Icechunk-aware code paths.
climate_api/ingestions/routes.py Removes extent country_code forwarding; uses updated create_artifact signature.
climate_api/ingestions/sync_engine.py Adds current_end override and ICECHUNK append support changes.
climate_api/data_accessor/services/accessor.py Adds open_icechunk_dataset() and dataset-coverage helper for pre-opened datasets.
climate_api/stac/services.py Adds ICECHUNK eligibility and Icechunk dataset opening in collection builds.
climate_api/processing/resample.py Allows ICECHUNK as a resampling source format.
climate_api/publications/services.py Ensures Icechunk artifacts are served via /zarr (not pygeoapi) and supports axis inspection.
climate_api/system/routes.py Updates ingestion route call signature (no country_code).
climate_api/data_manager/services/downloader.py Removes legacy download route/helpers; retains Zarr build/utility functionality.
climate_api/data_manager/routes.py Removes legacy internal download endpoint.
tests/test_ingest_orchestrator.py Adds orchestrator tests (resume, cursor, cancellation, rechunk, plugin loading).
tests/test_ingest_plugins.py Adds unit tests for CHIRPS3 + WorldPop plugin logic and mocked fetch behavior.
tests/test_processing_resample.py Extends resample tests for Icechunk and netcdf rejection messaging.
tests/test_stac.py Adds STAC tests for Icechunk artifact inclusion + dataset opening path.
tests/test_datasets.py Adds dataset link test for published Icechunk STAC link.
tests/test_datasets_sync.py Updates sync tests to use store-authoritative current_end for Icechunk.
tests/test_dataset_registry.py Updates registry tests to use ingestion.plugin.
tests/test_config.py Updates plugin-dir tests to use ingestion.plugin.
tests/test_downloader.py Removes tests tied to removed legacy download behavior.
docs/instance_guide.md Updates instance plugin guidance for ingestion plugins.
docs/extensibility.md Documents IngestionPlugin as the ingestion extension point.
docs/adding_custom_datasets.md Replaces download-function docs with ingestion-plugin contract and examples.
docs/built_in_datasets.md Updates built-in dataset ingest + sync behavior descriptions for streaming ingest.
docs/architecture.md Updates lifecycle + extension-point architecture to reflect plugin-driven ingest.
AGENTS.md Updates contributor-facing concept summary to reference ingestion plugins.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingest/protocol.py Outdated
Comment thread climate_api/ingest/store.py
Comment thread climate_api/ingest/store.py
Comment thread climate_api/data_accessor/services/accessor.py Outdated
Comment thread climate_api/ingestions/services.py
Comment thread climate_api/ingestions/services.py
Comment thread climate_api/ingestions/services.py Outdated
Comment thread climate_api/data_registry/services/datasets.py
Comment thread climate_api/ingest/plugins/era5_land.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 40 changed files in this pull request and generated 6 comments.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingestions/services.py
Comment thread climate_api/ingestions/services.py Outdated
Comment thread climate_api/ingestions/services.py Outdated
Comment thread climate_api/ingestions/services.py Outdated
Comment thread climate_api/data_registry/services/datasets.py
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 40 changed files in this pull request and generated 4 comments.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingestions/services.py
Comment thread climate_api/ingestions/services.py Outdated
Comment thread climate_api/ingestions/services.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 40 changed files in this pull request and generated 4 comments.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingest/protocol.py Outdated
Comment thread climate_api/ingestions/services.py Outdated
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 42 out of 43 changed files in this pull request and generated 11 comments.

Comments suppressed due to low confidence (2)

tests/test_ingest_plugins.py:362

  • Same issue as the WorldPop helper above: .rio accessor is used without importing rioxarray to register it. Add an explicit import rioxarray in this test module (or in this helper) before accessing .rio.
    docs/extensibility.md:96
  • Step 5 says the orchestrator “Commits to the Icechunk store after every commit_batch_size periods”, but the implementation commits every period and only checkpoints the cursor every commit_batch_size. This doc should be corrected so plugin authors don’t design around batched commits that won’t happen.
2. Calls `periods()` once to get the full period list; filters against already-committed time coordinates.
3. Creates all fetch tasks upfront so up to `max_concurrency` fetches are in flight simultaneously.
4. Awaits tasks in chronological order so writes are always sequential.
5. Commits to the Icechunk store after every `commit_batch_size` periods.
6. On restart, resumes from the last committed period — a crash loses at most one uncommitted batch.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingest/orchestrator.py
Comment thread climate_api/ingestions/sync_engine.py
Comment thread climate_api/ingestions/services.py
Comment thread climate_api/ingest/plugins/worldpop.py Outdated
Comment thread docs/architecture.md Outdated
Comment thread docs/extensibility.md Outdated
Comment thread docs/built_in_datasets.md Outdated
Comment thread docs/built_in_datasets.md Outdated
Comment thread climate_api/ingestions/services.py
turban and others added 13 commits May 20, 2026 12:29
…plugin

Implements the plugin architecture and orchestration loop from issue #64.

### What's new

**climate_api/ingest/** — new package:
- protocol.py: GridSpec dataclass and IngestionPlugin Protocol
- orchestrator.py: async run_ingest() loop with bounded concurrency,
  per-batch commits, cursor-based resume, and cancel checkpoint
- store.py: Icechunk repository open/create helpers and
  read_committed_period_ids() for store-based resume fallback
- plugins/era5_land.py: Era5LandPlugin — streams hourly ERA5-Land data
  from DestinE Earth Data Hub via remote zarr; handles 0-360 longitude,
  per-month parallel fetches, and availability lag

**Dataset registry** — ingestion.plugin is now a valid alternative to
ingestion.function; datasets can carry both keys during migration.

**era5_land.yaml** — registers Era5LandPlugin alongside the existing
function key so both paths are available.

**ingestions/services.py** — branches on ingestion.plugin: routes ingest
requests through run_ingest_sync and registers the Icechunk store as an
ArtifactFormat.ICECHUNK artifact.

**accessor.py** — adds coverage_from_open_dataset() for callers that
already hold a store handle (e.g. an Icechunk session).

**pyproject.toml** — adds icechunk>=2.0,<3 dependency.

14 tests covering: write all periods, fetch-exactly-once, idempotency,
cursor-based resume, progress reporting, batch commit checkpoints, no-op
on empty period list, cancellation, sync wrapper, load_plugin, and
read_committed_period_ids.

Closes: partially addresses #64 (Tracks 2–4 remain)
Two bugs found during Norway instance testing:

1. Icechunk 2.x sessions do not expose uncommitted writes to subsequent
   zarr.open_group calls, so to_zarr(append_dim='time') on period N+1
   saw an empty store.  Fix: open a fresh writable session per period so
   each append reads the prior committed snapshot.  commit_batch_size now
   controls cursor-save frequency rather than commit frequency.

2. Era5LandPlugin._build_periods ignored the hour component of start/end,
   so 'start=2024-01-01T00 end=2024-01-01T00' generated all 24 hours.
   Fix: compare full period-ID strings (lexicographic) after generating.

Add three unit tests for the period-filter fix.
ERA5-Land source data carries zarr v2 Blosc codec metadata. When xarray
loads the dataset it retains the source encoding, which fails with a
BytesBytesCodec type error when writing to a zarr v3/icechunk store.
Clearing encoding on all vars and coords before returning from fetch
lets the orchestrator write with zarr v3-compatible defaults.
…ct timestamps

When encoding is cleared and xarray picks a default unit for the first
write it may choose "days since YYYY-MM-DD". Subsequent append_dim writes
then encode sub-daily offsets as integer days, shifting all non-midnight
hours to the wrong date. Explicitly setting "hours since 1970-01-01" on
every fetched period ensures the first write establishes a unit that all
appends can faithfully represent as integers.
…s source of truth

Three related changes:

1. plan_sync accepts an optional current_end parameter that overrides the
   artifact metadata's coverage.temporal.end. For Icechunk stores the
   artifact record may lag behind what is actually committed on disk, so
   callers pass the store-authoritative value.

2. _supports_append recognises ArtifactFormat.ICECHUNK and always returns
   True. The orchestrator already handles incremental appends via its own
   read_committed_period_ids call; no sync.execution: append YAML flag
   is required for Icechunk datasets.

3. sync_dataset reads read_committed_period_ids directly from the Icechunk
   store before calling run_sync and passes the result as current_end.
   This ensures the sync plan reflects the true on-disk state rather than
   stale metadata.

Additionally, _create_icechunk_artifact now accepts an optional
ingest_start parameter. For append syncs this is set to the delta_start
so the orchestrator scans only the new period range rather than the full
historical range (e.g. from 1950), which avoids enumerating hundreds of
thousands of already-committed periods needlessly.
… time: N)

Adds rechunk_store(store_path, time_chunk=N) in store.py. The function
opens the latest committed snapshot via a readonly session, rewrites all
variables with the new time chunk size via a writable session (using dask
for lazy chunk-by-chunk processing), and commits the result as a new
Icechunk snapshot. MVCC ensures the previous snapshot is preserved if the
rechunk fails.

run_ingest / run_ingest_sync gain a rechunk_time: int | None parameter.
When set, rechunk_store is called after all periods have been committed.

Era5LandPlugin declares rechunk_time = 12 (twelve hourly periods per chunk).
_create_icechunk_artifact reads this attribute and passes it to run_ingest_sync
only on initial ingest (ingest_start is None); sync appends skip rechunking
to avoid rewriting the full store on every small update.
CHIRPS3 fetches daily COG files via HTTP range requests — one period per
day, capped to the last complete published month (lag ~1–2 months). Four
concurrent fetches via thread pool; commits batched at 30 periods (~1/month).

WorldPop downloads per-country GeoTIFFs for global2 (2015–2030, 100 m
constrained) or global1 (2000–2020). Max concurrency 1 — full-country
files can be several hundred MB. Both plugins reuse the probe-estimate
pattern (no network call) to derive GridSpec from known pixel resolution.

Includes dataset YAML registrations and full unit test coverage (41 tests).
open_icechunk_dataset opens an Icechunk store via a readonly MVCC session
and is wired into every place open_zarr_dataset is used: the publications
provider axes, STAC collection builder, and resample source loader.

The zarr-serving HTTP routes handle ArtifactFormat.ICECHUNK by opening the
store via repo.readonly_session("main") and serving all zarr keys through
session.store[key] — Icechunk's virtual zarr v3 interface. Directory-style
paths enumerate group members via zarr.open_group. Metadata keys are served
as JSON; chunk keys as raw bytes.

STAC link generation and resample source validation both recognise ICECHUNK
alongside ZARR. pygeoapi registration is skipped for Icechunk artifacts
(pygeoapi reads plain zarr directories, not Icechunk repos).
GridSpec gains an extra_dims field for plugins that produce multidimensional
stores (e.g. WorldPop age/sex with dims {age_group: 20, sex: 2}). The
orchestrator does not act on extra_dims yet; the field exposes the intent
to protocol users and prepares for future chunk-shape computation.

The orchestrator calls repo.expire_snapshots(older_than=now) after the
ingest loop (and rechunk pass) completes. Each period write creates one
Icechunk snapshot; for a 35-year daily ingest this produces ~12,775
intermediate snapshots. expire_snapshots marks them as expired without
touching chunk data — the "main" branch ref preserves HEAD. Run
garbage_collect separately to reclaim manifest storage.

The repo is reopened after rechunk_store() so expire_snapshots operates
on the post-rechunk HEAD rather than the stale pre-rechunk repo handle.
extensibility.md: add IngestionPlugin as a first-class extension point with
full protocol reference, GridSpec field table, and a comparison table for
choosing between function and plugin approaches.

adding_custom_datasets.md: update overview to mention both ingestion paths;
update ingestion field reference table with ingestion.plugin and
ingestion.params; add IngestionPlugin skeleton and dataset template example
at end of doc.

built_in_datasets.md: replace stale "Sync behaviour" descriptions with
accurate Ingest method + Sync behaviour paragraphs for CHIRPS3 (COG range
requests), ERA5-Land (remote zarr, monthly fetch), and WorldPop (per-country
GeoTIFF download). Add country_code note to WorldPop.

architecture.md: replace single-path data lifecycle diagram with dual-path
diagram (function path vs plugin path); add IngestionPlugin to the plugin
contract section; update the append execution mode section to distinguish
cache-based (function path) from Icechunk-based (plugin path) behaviour.
Drop the legacy NetCDF-file-based ingestion.function download path entirely.
All datasets now ingest through IngestionPlugin directly into Icechunk stores.

- Remove ingestion.function and default_params from YAML dataset configs
- Require ingestion.plugin in dataset validation (was optional fallback)
- Remove download_dataset, _validate_spatial_coverage, and related routes
- Remove country_code param from create_artifact/run_sync (was only used
  by the legacy download path); inject it automatically from extent.country_code
  into any plugin whose constructor declares the parameter (WorldPop only)
- Update docs (architecture, extensibility, adding_custom_datasets, instance_guide)
  and AGENTS.md to reflect plugin-only ingestion

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… datasets

WorldPop GeoTIFFs carry scale_factor/add_offset in variable attributes.
xarray rejects these as conflicting CF encoding keys when appending to a
Zarr store on the second period. Strip all CF encoding keys from attrs
after loading so append_dim writes succeed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…gs84

Datasets like WorldPop are stored in WGS84 (EPSG:4326) while a deployment
may be configured with a projected CRS (e.g. EPSG:32633). Using the deployment
CRS as proj:code caused the map viewer to wrongly treat WGS84 coordinates as
projected, breaking rendering. spatial_wgs84 coverage was also computed by
reprojecting already-WGS84 coordinates as if they were UTM metres.

Fix: read the actual CRS from the dataset's spatial_ref coordinate (written by
rioxarray) and use it for both proj:code in the STAC collection and native_crs
in coverage computation. Falls back to the configured deployment CRS when no
spatial_ref is present.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 53 out of 54 changed files in this pull request and generated 10 comments.

Comment thread climate_api/ingest/orchestrator.py Outdated
Comment thread climate_api/ingest/orchestrator.py
Comment thread climate_api/ingest/protocol.py Outdated
Comment thread climate_api/ingest/protocol.py Outdated
Comment thread climate_api/ingest/store.py
Comment thread climate_api/stac/services.py
Comment thread climate_api/stac/services.py
Comment thread climate_api/ingestions/sync_engine.py
Comment thread climate_api/ingestions/sync_engine.py
Comment thread climate_api/ingestions/routes.py
turban and others added 28 commits May 20, 2026 14:14
…ng paths

- orchestrator.py: update module docstring to say "sync methods" and clarify
  asyncio.to_thread wrapping; protocol.py: same in IngestionPlugin docstring
- GridSpec docstring: clarify shape is logged but not used for chunking;
  the field drives GeoZarr attribute writes, not store encoding
- stac/services.py + sync_engine.py: add Path.exists() check before
  open_or_create_repo() in read-only probe paths — the function creates a
  new empty Icechunk repo when the path is missing, which is an unintended
  side-effect during STAC generation and sync planning

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…arify docs

- Remove load_cursor from run_ingest/run_ingest_sync/create_artifact/
  execute_ingest: resume is always store-based (read_committed_period_ids);
  the parameter accepted a value but never called it
- Collapse already_done/done_offset duplicate in orchestrator.py
- ERA5LandPlugin.probe: compute shape from known 0.1° resolution (no network
  I/O); update _select_bbox to use the constant instead of lazy dask compute.
  Resolves "metadata-only probe" docstring claim and x_dim/y_dim mismatch
- protocol.py: periods() docstring — must be pure computation, no I/O;
  rechunk_time docstring — clarify omitting the attribute is valid
- WorldPopPlugin: document full-rebuild-on-sync and memory bound in docstring
- Add test_run_ingest_preserves_committed_periods_on_fetch_error: verifies
  periods committed before a mid-run fetch exception are retained in the store
- Add asyncio.run() event-loop precondition note to run_ingest_sync docstring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Icechunk is an internal storage detail; the /zarr/ HTTP endpoint always
serves standard Zarr to clients regardless of backing format.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Both Ingest and Sync now stream SSE progress events via fetch() instead
of blocking until redirect. The manage page shows a progress bar and
period counter (N / total) while the job runs, then navigates on
completion or surfaces errors inline without a page reload.

Backend: on_progress threaded through sync_dataset → run_sync →
create_artifact_fn so both code paths emit progress events.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Without Cache-Control: no-cache and a periodic keepalive comment,
uvicorn/nginx buffer the text/event-stream response so no events
reach the browser until the connection closes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Opening the DestinE zarr store (auth handshake + metadata fetch) on
every fetch_period call made each of 145+ periods pay the full
connection overhead. Cache the opened and longitude-corrected dataset
on the instance; subsequent calls reuse it. Double-checked lock guards
against concurrent opens when max_concurrency=4.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Required by FastAPI for form parsing in the manage routes
(await request.form()). Was missing from pyproject.toml, causing
install failures in downstream projects.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
xstac crashes with ValueError when the time dimension has only one
element (0-d scalar after sel). Expand to a 1-element array so
min/max concatenation succeeds. Fixes 503 when viewing a dataset
whose first ingest batch has only committed a single period.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A date-only end like '2026-01-01' was normalizing to T00, causing hourly
ingests to include only a single period instead of all 24 hours of the
day. Now is_end=True maps a date-only input to T23.

Also expand scalar time dimension before xstac call to avoid a crash when
a dataset contains exactly one committed period.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ERA5-Land stores have a scalar valid_time coordinate alongside the true
time dimension. The old hasattr check matched the scalar first, giving
coverage start == end == last-fetched-hour instead of the full range.
Checking ds.dims instead ensures only actual array dimensions are returned.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Datasets like ERA5-Land have no spatial_ref coordinate but do have
x/y dimensions with units=degrees_east/degrees_north. The old detection
only checked spatial_ref, causing the deployment CRS (EPSG:32633 for
Norway) to be used instead of EPSG:4326 — producing a wrong proj:code
in the STAC collection and a garbage bbox from a false UTM→WGS84
reprojection. Now both _detect_dataset_crs and _read_crs_from_spatial_ref
fall back to dimension attribute inspection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
_is_pyramid_zarr only checked for a "0/" subdirectory, which exists
in flat zarr pyramid stores but not in Icechunk stores. Icechunk stores
use an opaque internal layout (chunks/, manifests/, etc.) regardless of
whether the data has pyramid sub-groups. Now falls back to opening the
Icechunk store and checking if "0" is a root group member, so that
the STAC zarr asset href correctly points to /zarr/{id}/0 instead of
the root group, allowing the map viewer to render pyramid data.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sending 4 concurrent requests to data.chc.ucsb.edu triggered a CrowdSec
IP ban (HTTP 403). Serial fetching is sufficient; each COG range request
completes in under a second. Also adds retry logic for 429/503 responses.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Static stores (time_dim=False, e.g. Copernicus DEM) have no time
coordinate. get_time_dim raises ValueError; catch it and return
start=end=None so artifact creation completes normally.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
A static (time_dim=False) store has no time coordinate, so
read_committed_period_ids previously always returned empty, causing
every re-ingest to re-fetch all source tiles. If the store has spatial
data with non-zero dimensions, return {"static"} so the orchestrator
treats it as already complete and skips the download.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
CoverageTemporal.start/end now accept None so that datasets ingested
with time_dim=False (e.g. Copernicus DEM elevation) can be recorded
without a temporal extent.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two crashes for static stores (time_dim=False):
1. parse_period_string_to_datetime(None) in temporal extent — guard with
   conditional so None start/end pass through as None (valid STAC).
2. get_time_dim(ds) raises ValueError — catch it and set time_dimension=None
   so xstac omits the time cube:dimension for timeless datasets.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nually

xstac requires a temporal_dimension and raises KeyError when None is
passed and no CF T axis exists. For static (time_dim=False) datasets,
bypass xstac entirely and build only x/y spatial cube:dimensions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…/end

Static datasets have temporal.start=None and temporal.end=None.
Return early with [[null, null]] STAC interval instead of crashing on
parse_period_string_to_datetime(None).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
zarr-layer reads the multiscales attribute at the root to select the
appropriate overview level based on viewport zoom. Pointing it at /0
(full resolution) bypassed this and caused all-chunks-at-full-res rendering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a display.palette alternative to display.colormap + range in dataset
YAML. A palette is a dict mapping uint8 pixel value → hex color, which is
emitted as climate_api:palette in the STAC renders object.

The map viewer builds a 256-entry LUT directly (lut[value] = color) and
passes it to zarr-layer with clim=[0,255] so no rescaling occurs — each
class value indexes its own color. Legend range labels are suppressed for
palette mode since the axis has no numeric meaning.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Wrap long Field descriptions in ingestions/schemas.py
- Remove unused artifact_path assignment in stac/services.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- accessor.py: annotate start/end as str | None before try/except
- sync_engine.py: guard against None current_end for temporal/release sync
- stac/services.py: handle None temporal bounds independently when formatting
- test_stac.py: update pyramid href assertions to expect root URL (not /0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants